aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenjamin Klimczak <benjamin.klimczak@arm.com>2022-07-11 12:33:42 +0100
committerBenjamin Klimczak <benjamin.klimczak@arm.com>2022-07-26 14:08:21 +0100
commit5d81f37de09efe10f90512e50252be9c36925fcf (patch)
treeb4d7cdfd051da0a6e882bdfcf280fd7ca7b39e57 /src
parent7899b908c1fe6d86b92a80f3827ddd0ac05b674b (diff)
downloadmlia-5d81f37de09efe10f90512e50252be9c36925fcf.tar.gz
MLIA-551 Rework remains of AIET architecture
Re-factoring the code base to further merge the old AIET code into MLIA. - Remove last traces of the backend type 'tool' - Controlled systems removed, including SSH protocol, controller, RunningCommand, locks etc. - Build command / build dir and deploy functionality removed from Applications and Systems - Moving working_dir() - Replace module 'output_parser' with new module 'output_consumer' and merge Base64 parsing into it - Change the output consumption to optionally remove (i.e. actually consume) lines - Use Base64 parsing in GenericInferenceOutputParser, replacing the regex-based parsing and remove the now unused regex parsing - Remove AIET reporting - Pre-install applications by moving them to src/mlia/resources/backends - Rename aiet-config.json to backend-config.json - Move tests from tests/mlia/ to tests/ - Adapt unit tests to code changes - Dependencies removed: paramiko, filelock, psutil - Fix bug in corstone.py: The wrong resource directory was used which broke the functionality to download backends. - Use f-string formatting. - Use logging instead of print. Change-Id: I768bc3bb6b2eda57d219ad01be4a8e0a74167d76
Diffstat (limited to 'src')
-rw-r--r--src/mlia/backend/application.py23
-rw-r--r--src/mlia/backend/common.py35
-rw-r--r--src/mlia/backend/config.py26
-rw-r--r--src/mlia/backend/controller.py134
-rw-r--r--src/mlia/backend/execution.py524
-rw-r--r--src/mlia/backend/fs.py36
-rw-r--r--src/mlia/backend/manager.py177
-rw-r--r--src/mlia/backend/output_consumer.py66
-rw-r--r--src/mlia/backend/output_parser.py176
-rw-r--r--src/mlia/backend/proc.py89
-rw-r--r--src/mlia/backend/protocol.py325
-rw-r--r--src/mlia/backend/source.py18
-rw-r--r--src/mlia/backend/system.py229
-rw-r--r--src/mlia/core/reporting.py2
-rw-r--r--src/mlia/nn/tensorflow/tflite_metrics.py33
-rw-r--r--src/mlia/resources/backend_configs/systems/SYSTEMS.txt (renamed from src/mlia/resources/aiet/systems/SYSTEMS.txt)0
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json (renamed from src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json)8
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json (renamed from src/mlia/resources/aiet/systems/corstone-300/aiet-config.json)8
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json (renamed from src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json)4
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json (renamed from src/mlia/resources/aiet/systems/corstone-310/aiet-config.json)4
-rw-r--r--src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/.gitignore6
-rw-r--r--src/mlia/resources/backends/applications/APPLICATIONS.txt (renamed from src/mlia/resources/aiet/applications/APPLICATIONS.txt)2
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json)1
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf)bin426496 -> 426496 bytes
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json)1
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license (renamed from src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf)bin426544 -> 426544 bytes
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json)1
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json.license (renamed from src/mlia/resources/aiet/systems/corstone-300/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf)bin2524028 -> 2524028 bytes
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json)1
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license (renamed from src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf)bin426488 -> 426488 bytes
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json)1
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license (renamed from src/mlia/resources/aiet/systems/corstone-310/aiet-config.json.license)0
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf)bin426536 -> 426536 bytes
-rw-r--r--src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license (renamed from src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license)0
-rw-r--r--src/mlia/tools/metadata/corstone.py28
-rw-r--r--src/mlia/utils/filesystem.py18
-rw-r--r--src/mlia/utils/proc.py152
49 files changed, 312 insertions, 1816 deletions
diff --git a/src/mlia/backend/application.py b/src/mlia/backend/application.py
index eb85212..4b04324 100644
--- a/src/mlia/backend/application.py
+++ b/src/mlia/backend/application.py
@@ -11,10 +11,9 @@ from typing import Optional
from mlia.backend.common import Backend
from mlia.backend.common import ConfigurationException
-from mlia.backend.common import DataPaths
from mlia.backend.common import get_backend_configs
from mlia.backend.common import get_backend_directories
-from mlia.backend.common import load_application_or_tool_configs
+from mlia.backend.common import load_application_configs
from mlia.backend.common import load_config
from mlia.backend.common import remove_backend
from mlia.backend.config import ApplicationConfig
@@ -75,7 +74,7 @@ def install_application(source_path: Path) -> None:
if already_installed:
names = {application.name for application in already_installed}
raise ConfigurationException(
- "Applications [{}] are already installed".format(",".join(names))
+ f"Applications [{','.join(names)}] are already installed."
)
create_destination_and_install(source, get_backends_path("applications"))
@@ -105,7 +104,6 @@ class Application(Backend):
super().__init__(config)
self.supported_systems = config.get("supported_systems", [])
- self.deploy_data = config.get("deploy_data", [])
def __eq__(self, other: object) -> bool:
"""Overload operator ==."""
@@ -122,21 +120,6 @@ class Application(Backend):
"""Check if the application can run on the system passed as argument."""
return system_name in self.supported_systems
- def get_deploy_data(self) -> List[DataPaths]:
- """Validate and return data specified in the config file."""
- if self.config_location is None:
- raise ConfigurationException(
- "Unable to get application {} config location".format(self.name)
- )
-
- deploy_data = []
- for item in self.deploy_data:
- src, dst = item
- src_full_path = self.config_location / src
- assert src_full_path.exists(), "{} does not exists".format(src_full_path)
- deploy_data.append(DataPaths(src_full_path, dst))
- return deploy_data
-
def get_details(self) -> Dict[str, Any]:
"""Return dictionary with information about the Application instance."""
output = {
@@ -180,7 +163,7 @@ def load_applications(config: ExtendedApplicationConfig) -> List[Application]:
supported systems. For each supported system this function will return separate
Application instance with appropriate configuration.
"""
- configs = load_application_or_tool_configs(config, ApplicationConfig)
+ configs = load_application_configs(config, ApplicationConfig)
applications = [Application(cfg) for cfg in configs]
for application in applications:
application.remove_unused_params()
diff --git a/src/mlia/backend/common.py b/src/mlia/backend/common.py
index 2bbb9d3..e61d6b6 100644
--- a/src/mlia/backend/common.py
+++ b/src/mlia/backend/common.py
@@ -33,7 +33,7 @@ from mlia.backend.fs import remove_resource
from mlia.backend.fs import ResourceType
-BACKEND_CONFIG_FILE: Final[str] = "aiet-config.json"
+BACKEND_CONFIG_FILE: Final[str] = "backend-config.json"
class ConfigurationException(Exception):
@@ -126,10 +126,6 @@ class Backend(ABC):
self.description = config.get("description", "")
self.config_location = config.get("config_location")
self.variables = config.get("variables", {})
- self.build_dir = config.get("build_dir")
- self.lock = config.get("lock", False)
- if self.build_dir:
- self.build_dir = self._substitute_variables(self.build_dir)
self.annotations = config.get("annotations", {})
self._parse_commands_and_params(config)
@@ -145,7 +141,7 @@ class Backend(ABC):
command = self.commands.get(command_name)
if not command:
- raise AttributeError("Unknown command: '{}'".format(command_name))
+ raise AttributeError(f"Unknown command: '{command_name}'")
# Iterate over all available parameters until we have a match.
for param in command.params:
@@ -209,7 +205,7 @@ class Backend(ABC):
def var_value(match: Match) -> str:
var_name = match["var_name"]
if var_name not in self.variables:
- raise ConfigurationException("Unknown variable {}".format(var_name))
+ raise ConfigurationException(f"Unknown variable {var_name}")
return self.variables[var_name]
@@ -312,7 +308,7 @@ class Backend(ABC):
command = self.commands.get(command_name)
if not command:
raise ConfigurationException(
- "Command '{}' could not be found.".format(command_name)
+ f"Command '{command_name}' could not be found."
)
commands_to_run = []
@@ -394,7 +390,7 @@ class Command:
if repeated_aliases:
raise ConfigurationException(
- "Non unique aliases {}".format(", ".join(repeated_aliases))
+ f"Non-unique aliases {', '.join(repeated_aliases)}"
)
both_name_and_alias = [
@@ -404,9 +400,8 @@ class Command:
]
if both_name_and_alias:
raise ConfigurationException(
- "Aliases {} could not be used as parameter name".format(
- ", ".join(both_name_and_alias)
- )
+ f"Aliases {', '.join(both_name_and_alias)} could not be used "
+ "as parameter name."
)
def get_details(self) -> Dict:
@@ -449,12 +444,12 @@ def resolve_all_parameters(
return str_val
-def load_application_or_tool_configs(
+def load_application_configs(
config: Any,
config_type: Type[Any],
is_system_required: bool = True,
) -> Any:
- """Get one config for each system supported by the application/tool.
+ """Get one config for each system supported by the application.
The configuration could contain different parameters/commands for different
supported systems. For each supported system this function will return separate
@@ -501,15 +496,13 @@ def load_application_or_tool_configs(
if params_default and params_tool:
if any(not p.get("alias") for p in params_default):
raise ConfigurationException(
- "Default parameters for command {} should have aliases".format(
- command_name
- )
+ f"Default parameters for command {command_name} "
+ "should have aliases"
)
if any(not p.get("alias") for p in params_tool):
raise ConfigurationException(
- "{} parameters for command {} should have aliases".format(
- system_name, command_name
- )
+ f"{system_name} parameters for command {command_name} "
+ "should have aliases."
)
merged_by_alias = {
@@ -519,8 +512,6 @@ def load_application_or_tool_configs(
params[command_name] = list(merged_by_alias.values())
merged_config["user_params"] = params
- merged_config["build_dir"] = system.get("build_dir", config.get("build_dir"))
- merged_config["lock"] = system.get("lock", config.get("lock", False))
merged_config["variables"] = {
**config.get("variables", {}),
**system.get("variables", {}),
diff --git a/src/mlia/backend/config.py b/src/mlia/backend/config.py
index 657adef..9a56fa9 100644
--- a/src/mlia/backend/config.py
+++ b/src/mlia/backend/config.py
@@ -4,9 +4,7 @@
from pathlib import Path
from typing import Dict
from typing import List
-from typing import Literal
from typing import Optional
-from typing import Tuple
from typing import TypedDict
from typing import Union
@@ -29,9 +27,7 @@ class ExecutionConfig(TypedDict, total=False):
commands: Dict[str, List[str]]
user_params: UserParamsConfig
- build_dir: str
variables: Dict[str, str]
- lock: bool
class NamedExecutionConfig(ExecutionConfig):
@@ -53,39 +49,17 @@ class ApplicationConfig(BaseBackendConfig, total=False):
"""Application configuration."""
supported_systems: List[str]
- deploy_data: List[Tuple[str, str]]
class ExtendedApplicationConfig(BaseBackendConfig, total=False):
"""Extended application configuration."""
supported_systems: List[NamedExecutionConfig]
- deploy_data: List[Tuple[str, str]]
-
-
-class ProtocolConfig(TypedDict, total=False):
- """Protocol config."""
-
- protocol: Literal["local", "ssh"]
-
-
-class SSHConfig(ProtocolConfig, total=False):
- """SSH configuration."""
-
- username: str
- password: str
- hostname: str
- port: str
-
-
-class LocalProtocolConfig(ProtocolConfig, total=False):
- """Local protocol config."""
class SystemConfig(BaseBackendConfig, total=False):
"""System configuration."""
- data_transfer: Union[SSHConfig, LocalProtocolConfig]
reporting: Dict[str, Dict]
diff --git a/src/mlia/backend/controller.py b/src/mlia/backend/controller.py
deleted file mode 100644
index f1b68a9..0000000
--- a/src/mlia/backend/controller.py
+++ /dev/null
@@ -1,134 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Controller backend module."""
-import time
-from pathlib import Path
-from typing import List
-from typing import Optional
-from typing import Tuple
-
-import psutil
-import sh
-
-from mlia.backend.common import ConfigurationException
-from mlia.backend.fs import read_file_as_string
-from mlia.backend.proc import execute_command
-from mlia.backend.proc import get_stdout_stderr_paths
-from mlia.backend.proc import read_process_info
-from mlia.backend.proc import save_process_info
-from mlia.backend.proc import terminate_command
-from mlia.backend.proc import terminate_external_process
-
-
-class SystemController:
- """System controller class."""
-
- def __init__(self) -> None:
- """Create new instance of service controller."""
- self.cmd: Optional[sh.RunningCommand] = None
- self.out_path: Optional[Path] = None
- self.err_path: Optional[Path] = None
-
- def before_start(self) -> None:
- """Run actions before system start."""
-
- def after_start(self) -> None:
- """Run actions after system start."""
-
- def start(self, commands: List[str], cwd: Path) -> None:
- """Start system."""
- if not isinstance(cwd, Path) or not cwd.is_dir():
- raise ConfigurationException("Wrong working directory {}".format(cwd))
-
- if len(commands) != 1:
- raise ConfigurationException("System should have only one command to run")
-
- startup_command = commands[0]
- if not startup_command:
- raise ConfigurationException("No startup command provided")
-
- self.before_start()
-
- self.out_path, self.err_path = get_stdout_stderr_paths(startup_command)
-
- self.cmd = execute_command(
- startup_command,
- cwd,
- bg=True,
- out=str(self.out_path),
- err=str(self.err_path),
- )
-
- self.after_start()
-
- def stop(
- self, wait: bool = False, wait_period: float = 0.5, number_of_attempts: int = 20
- ) -> None:
- """Stop system."""
- if self.cmd is not None and self.is_running():
- terminate_command(self.cmd, wait, wait_period, number_of_attempts)
-
- def is_running(self) -> bool:
- """Check if underlying process is running."""
- return self.cmd is not None and self.cmd.is_alive()
-
- def get_output(self) -> Tuple[str, str]:
- """Return application output."""
- if self.cmd is None or self.out_path is None or self.err_path is None:
- return ("", "")
-
- return (read_file_as_string(self.out_path), read_file_as_string(self.err_path))
-
-
-class SystemControllerSingleInstance(SystemController):
- """System controller with support of system's single instance."""
-
- def __init__(self, pid_file_path: Optional[Path] = None) -> None:
- """Create new instance of the service controller."""
- super().__init__()
- self.pid_file_path = pid_file_path
-
- def before_start(self) -> None:
- """Run actions before system start."""
- self._check_if_previous_instance_is_running()
-
- def after_start(self) -> None:
- """Run actions after system start."""
- self._save_process_info()
-
- def _check_if_previous_instance_is_running(self) -> None:
- """Check if another instance of the system is running."""
- process_info = read_process_info(self._pid_file())
-
- for item in process_info:
- try:
- process = psutil.Process(item.pid)
- same_process = (
- process.name() == item.name
- and process.exe() == item.executable
- and process.cwd() == item.cwd
- )
- if same_process:
- print(
- "Stopping previous instance of the system [{}]".format(item.pid)
- )
- terminate_external_process(process)
- except psutil.NoSuchProcess:
- pass
-
- def _save_process_info(self, wait_period: float = 2) -> None:
- """Save information about system's processes."""
- if self.cmd is None or not self.is_running():
- return
-
- # give some time for the system to start
- time.sleep(wait_period)
-
- save_process_info(self.cmd.process.pid, self._pid_file())
-
- def _pid_file(self) -> Path:
- """Return path to file which is used for saving process info."""
- if not self.pid_file_path:
- raise Exception("No pid file path presented")
-
- return self.pid_file_path
diff --git a/src/mlia/backend/execution.py b/src/mlia/backend/execution.py
index 749ccdb..5340a47 100644
--- a/src/mlia/backend/execution.py
+++ b/src/mlia/backend/execution.py
@@ -1,167 +1,49 @@
# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
"""Application execution module."""
-import itertools
-import json
-import random
+import logging
import re
-import string
-import sys
-import time
-from collections import defaultdict
-from contextlib import contextmanager
-from contextlib import ExitStack
-from pathlib import Path
-from typing import Any
-from typing import Callable
from typing import cast
-from typing import ContextManager
-from typing import Dict
-from typing import Generator
from typing import List
from typing import Optional
-from typing import Sequence
from typing import Tuple
-from typing import TypedDict
-
-from filelock import FileLock
-from filelock import Timeout
from mlia.backend.application import Application
from mlia.backend.application import get_application
from mlia.backend.common import Backend
from mlia.backend.common import ConfigurationException
-from mlia.backend.common import DataPaths
from mlia.backend.common import Param
-from mlia.backend.common import parse_raw_parameter
-from mlia.backend.common import resolve_all_parameters
-from mlia.backend.fs import recreate_directory
-from mlia.backend.fs import remove_directory
-from mlia.backend.fs import valid_for_filename
-from mlia.backend.output_parser import Base64OutputParser
-from mlia.backend.output_parser import OutputParser
-from mlia.backend.output_parser import RegexOutputParser
-from mlia.backend.proc import run_and_wait
-from mlia.backend.system import ControlledSystem
from mlia.backend.system import get_system
-from mlia.backend.system import StandaloneSystem
from mlia.backend.system import System
+logger = logging.getLogger(__name__)
+
class AnotherInstanceIsRunningException(Exception):
"""Concurrent execution error."""
-class ConnectionException(Exception):
- """Connection exception."""
-
-
-class ExecutionParams(TypedDict, total=False):
- """Execution parameters."""
-
- disable_locking: bool
- unique_build_dir: bool
-
-
-class ExecutionContext:
+class ExecutionContext: # pylint: disable=too-few-public-methods
"""Command execution context."""
- # pylint: disable=too-many-arguments,too-many-instance-attributes
def __init__(
self,
app: Application,
app_params: List[str],
- system: Optional[System],
+ system: System,
system_params: List[str],
- custom_deploy_data: Optional[List[DataPaths]] = None,
- execution_params: Optional[ExecutionParams] = None,
- report_file: Optional[Path] = None,
):
"""Init execution context."""
self.app = app
self.app_params = app_params
- self.custom_deploy_data = custom_deploy_data or []
self.system = system
self.system_params = system_params
- self.execution_params = execution_params or ExecutionParams()
- self.report_file = report_file
-
- self.reporter: Optional[Reporter]
- if self.report_file:
- # Create reporter with output parsers
- parsers: List[OutputParser] = []
- if system and system.reporting:
- # Add RegexOutputParser, if it is configured in the system
- parsers.append(RegexOutputParser("system", system.reporting["regex"]))
- # Add Base64 parser for applications
- parsers.append(Base64OutputParser("application"))
- self.reporter = Reporter(parsers=parsers)
- else:
- self.reporter = None # No reporter needed.
self.param_resolver = ParamResolver(self)
- self._resolved_build_dir: Optional[Path] = None
self.stdout: Optional[bytearray] = None
self.stderr: Optional[bytearray] = None
- @property
- def is_deploy_needed(self) -> bool:
- """Check if application requires data deployment."""
- return len(self.app.get_deploy_data()) > 0 or len(self.custom_deploy_data) > 0
-
- @property
- def is_locking_required(self) -> bool:
- """Return true if any form of locking required."""
- return not self._disable_locking() and (
- self.app.lock or (self.system is not None and self.system.lock)
- )
-
- @property
- def is_build_required(self) -> bool:
- """Return true if application build required."""
- return "build" in self.app.commands
-
- @property
- def is_unique_build_dir_required(self) -> bool:
- """Return true if unique build dir required."""
- return self.execution_params.get("unique_build_dir", False)
-
- def build_dir(self) -> Path:
- """Return resolved application build dir."""
- if self._resolved_build_dir is not None:
- return self._resolved_build_dir
-
- if (
- not isinstance(self.app.config_location, Path)
- or not self.app.config_location.is_dir()
- ):
- raise ConfigurationException(
- "Application {} has wrong config location".format(self.app.name)
- )
-
- _build_dir = self.app.build_dir
- if _build_dir:
- _build_dir = resolve_all_parameters(_build_dir, self.param_resolver)
-
- if not _build_dir:
- raise ConfigurationException(
- "No build directory defined for the app {}".format(self.app.name)
- )
-
- if self.is_unique_build_dir_required:
- random_suffix = "".join(
- random.choices(string.ascii_lowercase + string.digits, k=7)
- )
- _build_dir = "{}_{}".format(_build_dir, random_suffix)
-
- self._resolved_build_dir = self.app.config_location / _build_dir
- return self._resolved_build_dir
-
- def _disable_locking(self) -> bool:
- """Return true if locking should be disabled."""
- return self.execution_params.get("disable_locking", False)
-
class ParamResolver:
"""Parameter resolver."""
@@ -187,7 +69,7 @@ class ParamResolver:
i = int(index_or_alias)
if i not in range(len(resolved_params)):
raise ConfigurationException(
- "Invalid index {} for user params of command {}".format(i, cmd_name)
+ f"Invalid index {i} for user params of command {cmd_name}"
)
param_value, param = resolved_params[i]
else:
@@ -198,9 +80,8 @@ class ParamResolver:
if param is None:
raise ConfigurationException(
- "No user parameter for command '{}' with alias '{}'.".format(
- cmd_name, index_or_alias
- )
+ f"No user parameter for command '{cmd_name}' with "
+ f"alias '{index_or_alias}'."
)
if param_value:
@@ -220,17 +101,12 @@ class ParamResolver:
param_name = param.name
separator = "" if param.name.endswith("=") else " "
- return "{param_name}{separator}{param_value}".format(
- param_name=param_name,
- separator=separator,
- param_value=param_value,
- )
+ return f"{param_name}{separator}{param_value}"
if param.name is None:
raise ConfigurationException(
- "Missing user parameter with alias '{}' for command '{}'.".format(
- index_or_alias, cmd_name
- )
+ f"Missing user parameter with alias '{index_or_alias}' for "
+ f"command '{cmd_name}'."
)
return param.name # flag: just return the parameter name
@@ -242,12 +118,12 @@ class ParamResolver:
if backend_type == "system":
backend = cast(Backend, self.ctx.system)
backend_params = self.ctx.system_params
- else: # Application or Tool backend
+ else: # Application backend
backend = cast(Backend, self.ctx.app)
backend_params = self.ctx.app_params
if cmd_name not in backend.commands:
- raise ConfigurationException("Command {} not found".format(cmd_name))
+ raise ConfigurationException(f"Command {cmd_name} not found")
if return_params:
params = backend.resolved_parameters(cmd_name, backend_params)
@@ -255,7 +131,7 @@ class ParamResolver:
i = int(index_or_alias)
if i not in range(len(params)):
raise ConfigurationException(
- "Invalid parameter index {} for command {}".format(i, cmd_name)
+ f"Invalid parameter index {i} for command {cmd_name}"
)
param_value = params[i][0]
@@ -269,20 +145,19 @@ class ParamResolver:
if not param_value:
raise ConfigurationException(
(
- "No value for parameter with index or alias {} of command {}"
- ).format(index_or_alias, cmd_name)
+ "No value for parameter with index or "
+ f"alias {index_or_alias} of command {cmd_name}."
+ )
)
return param_value
if not index_or_alias.isnumeric():
- raise ConfigurationException("Bad command index {}".format(index_or_alias))
+ raise ConfigurationException(f"Bad command index {index_or_alias}")
i = int(index_or_alias)
commands = backend.build_command(cmd_name, backend_params, self.param_resolver)
if i not in range(len(commands)):
- raise ConfigurationException(
- "Invalid index {} for command {}".format(i, cmd_name)
- )
+ raise ConfigurationException(f"Invalid index {i} for command {cmd_name}")
return commands[i]
@@ -290,11 +165,11 @@ class ParamResolver:
"""Resolve variable value."""
if backend_type == "system":
backend = cast(Backend, self.ctx.system)
- else: # Application or Tool backend
+ else: # Application backend
backend = cast(Backend, self.ctx.app)
if var_name not in backend.variables:
- raise ConfigurationException("Unknown variable {}".format(var_name))
+ raise ConfigurationException(f"Unknown variable {var_name}")
return backend.variables[var_name]
@@ -309,7 +184,7 @@ class ParamResolver:
# "system.commands.run.params:0"
# Note: 'software' is included for backward compatibility.
commands_and_params_match = re.match(
- r"(?P<type>application|software|tool|system)[.]commands[.]"
+ r"(?P<type>application|software|system)[.]commands[.]"
r"(?P<name>\w+)"
r"(?P<params>[.]params|)[:]"
r"(?P<index_or_alias>\w+)",
@@ -329,7 +204,7 @@ class ParamResolver:
# Note: 'software' is included for backward compatibility.
variables_match = re.match(
- r"(?P<type>application|software|tool|system)[.]variables:(?P<var_name>\w+)",
+ r"(?P<type>application|software|system)[.]variables:(?P<var_name>\w+)",
param_name,
)
if variables_match:
@@ -344,9 +219,7 @@ class ParamResolver:
index_or_alias = user_params_match["index_or_alias"]
return self.resolve_user_params(cmd_name, index_or_alias, resolved_params)
- raise ConfigurationException(
- "Unable to resolve parameter {}".format(param_name)
- )
+ raise ConfigurationException(f"Unable to resolve parameter {param_name}")
def param_resolver(
self,
@@ -357,24 +230,14 @@ class ParamResolver:
"""Resolve parameter value based on current execution context."""
# Note: 'software.*' is included for backward compatibility.
resolved_param = None
- if param_name in ["application.name", "tool.name", "software.name"]:
+ if param_name in ["application.name", "software.name"]:
resolved_param = self.ctx.app.name
- elif param_name in [
- "application.description",
- "tool.description",
- "software.description",
- ]:
+ elif param_name in ["application.description", "software.description"]:
resolved_param = self.ctx.app.description
elif self.ctx.app.config_location and (
- param_name
- in ["application.config_dir", "tool.config_dir", "software.config_dir"]
+ param_name in ["application.config_dir", "software.config_dir"]
):
resolved_param = str(self.ctx.app.config_location.absolute())
- elif self.ctx.app.build_dir and (
- param_name
- in ["application.build_dir", "tool.build_dir", "software.build_dir"]
- ):
- resolved_param = str(self.ctx.build_dir().absolute())
elif self.ctx.system is not None:
if param_name == "system.name":
resolved_param = self.ctx.system.name
@@ -397,82 +260,6 @@ class ParamResolver:
return self.param_resolver(param_name, cmd_name, resolved_params)
-class Reporter:
- """Report metrics from the simulation output."""
-
- def __init__(self, parsers: Optional[List[OutputParser]] = None) -> None:
- """Create an empty reporter (i.e. no parsers registered)."""
- self.parsers: List[OutputParser] = parsers if parsers is not None else []
- self._report: Dict[str, Any] = defaultdict(lambda: defaultdict(dict))
-
- def parse(self, output: bytearray) -> None:
- """Parse output and append parsed metrics to internal report dict."""
- for parser in self.parsers:
- # Merge metrics from different parsers (do not overwrite)
- self._report[parser.name]["metrics"].update(parser(output))
-
- def get_filtered_output(self, output: bytearray) -> bytearray:
- """Filter the output according to each parser."""
- for parser in self.parsers:
- output = parser.filter_out_parsed_content(output)
- return output
-
- def report(self, ctx: ExecutionContext) -> Dict[str, Any]:
- """Add static simulation info to parsed data and return the report."""
- report: Dict[str, Any] = defaultdict(dict)
- # Add static simulation info
- report.update(self._static_info(ctx))
- # Add metrics parsed from the output
- for key, val in self._report.items():
- report[key].update(val)
- return report
-
- @staticmethod
- def save(report: Dict[str, Any], report_file: Path) -> None:
- """Save the report to a JSON file."""
- with open(report_file, "w", encoding="utf-8") as file:
- json.dump(report, file, indent=4)
-
- @staticmethod
- def _compute_all_params(cli_params: List[str], backend: Backend) -> Dict[str, str]:
- """
- Build a dict of all parameters, {name:value}.
-
- Param values taken from command line if specified, defaults otherwise.
- """
- # map of params passed from the cli ["p1=v1","p2=v2"] -> {"p1":"v1", "p2":"v2"}
- app_params_map = dict(parse_raw_parameter(expr) for expr in cli_params)
-
- # a map of params declared in the application, with values taken from the CLI,
- # defaults otherwise
- all_params = {
- (p.alias or p.name): app_params_map.get(
- cast(str, p.name), cast(str, p.default_value)
- )
- for cmd in backend.commands.values()
- for p in cmd.params
- }
- return cast(Dict[str, str], all_params)
-
- @staticmethod
- def _static_info(ctx: ExecutionContext) -> Dict[str, Any]:
- """Extract static simulation information from the context."""
- if ctx.system is None:
- raise ValueError("No system available to report.")
-
- info = {
- "system": {
- "name": ctx.system.name,
- "params": Reporter._compute_all_params(ctx.system_params, ctx.system),
- },
- "application": {
- "name": ctx.app.name,
- "params": Reporter._compute_all_params(ctx.app_params, ctx.app),
- },
- }
- return info
-
-
def validate_parameters(
backend: Backend, command_names: List[str], params: List[str]
) -> None:
@@ -487,9 +274,8 @@ def validate_parameters(
if not acceptable:
backend_type = "System" if isinstance(backend, System) else "Application"
raise ValueError(
- "{} parameter '{}' not valid for command '{}'".format(
- backend_type, param, " or ".join(command_names)
- )
+ f"{backend_type} parameter '{param}' not valid for "
+ f"command '{' or '.join(command_names)}'."
)
@@ -500,16 +286,14 @@ def get_application_by_name_and_system(
applications = get_application(application_name, system_name)
if not applications:
raise ValueError(
- "Application '{}' doesn't support the system '{}'".format(
- application_name, system_name
- )
+ f"Application '{application_name}' doesn't support the "
+ f"system '{system_name}'."
)
if len(applications) != 1:
raise ValueError(
- "Error during getting application {} for the system {}".format(
- application_name, system_name
- )
+ f"Error during getting application {application_name} for the "
+ f"system {system_name}."
)
return applications[0]
@@ -521,259 +305,41 @@ def get_application_and_system(
"""Return application and system by provided names."""
system = get_system(system_name)
if not system:
- raise ValueError("System {} is not found".format(system_name))
+ raise ValueError(f"System {system_name} is not found.")
application = get_application_by_name_and_system(application_name, system_name)
return application, system
-# pylint: disable=too-many-arguments
def run_application(
application_name: str,
application_params: List[str],
system_name: str,
system_params: List[str],
- custom_deploy_data: List[DataPaths],
- report_file: Optional[Path] = None,
) -> ExecutionContext:
"""Run application on the provided system."""
application, system = get_application_and_system(application_name, system_name)
- validate_parameters(application, ["build", "run"], application_params)
- validate_parameters(system, ["build", "run"], system_params)
-
- execution_params = ExecutionParams()
- if isinstance(system, StandaloneSystem):
- execution_params["disable_locking"] = True
- execution_params["unique_build_dir"] = True
+ validate_parameters(application, ["run"], application_params)
+ validate_parameters(system, ["run"], system_params)
ctx = ExecutionContext(
app=application,
app_params=application_params,
system=system,
system_params=system_params,
- custom_deploy_data=custom_deploy_data,
- execution_params=execution_params,
- report_file=report_file,
)
- with build_dir_manager(ctx):
- if ctx.is_build_required:
- execute_application_command_build(ctx)
-
- execute_application_command_run(ctx)
-
- return ctx
-
-
-def execute_application_command_build(ctx: ExecutionContext) -> None:
- """Execute application command 'build'."""
- with ExitStack() as context_stack:
- for manager in get_context_managers("build", ctx):
- context_stack.enter_context(manager(ctx))
-
- build_dir = ctx.build_dir()
- recreate_directory(build_dir)
-
- build_commands = ctx.app.build_command(
- "build", ctx.app_params, ctx.param_resolver
- )
- execute_commands_locally(build_commands, build_dir)
-
-
-def execute_commands_locally(commands: List[str], cwd: Path) -> None:
- """Execute list of commands locally."""
- for command in commands:
- print("Running: {}".format(command))
- run_and_wait(
- command, cwd, terminate_on_error=True, out=sys.stdout, err=sys.stderr
- )
-
-
-def execute_application_command_run(ctx: ExecutionContext) -> None:
- """Execute application command."""
- assert ctx.system is not None, "System must be provided."
- if ctx.is_deploy_needed and not ctx.system.supports_deploy:
- raise ConfigurationException(
- "System {} does not support data deploy".format(ctx.system.name)
- )
-
- with ExitStack() as context_stack:
- for manager in get_context_managers("run", ctx):
- context_stack.enter_context(manager(ctx))
-
- print("Generating commands to execute")
- commands_to_run = build_run_commands(ctx)
-
- if ctx.system.connectable:
- establish_connection(ctx)
-
- if ctx.system.supports_deploy:
- deploy_data(ctx)
-
- for command in commands_to_run:
- print("Running: {}".format(command))
- exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command)
-
- if exit_code != 0:
- print("Application exited with exit code {}".format(exit_code))
-
- if ctx.reporter:
- ctx.reporter.parse(ctx.stdout)
- ctx.stdout = ctx.reporter.get_filtered_output(ctx.stdout)
-
- if ctx.reporter:
- report = ctx.reporter.report(ctx)
- ctx.reporter.save(report, cast(Path, ctx.report_file))
-
-
-def establish_connection(
- ctx: ExecutionContext, retries: int = 90, interval: float = 15.0
-) -> None:
- """Establish connection with the system."""
- assert ctx.system is not None, "System is required."
- host, port = ctx.system.connection_details()
- print(
- "Trying to establish connection with '{}:{}' - "
- "{} retries every {} seconds ".format(host, port, retries, interval),
- end="",
+ logger.debug("Generating commands to execute")
+ commands_to_run = ctx.system.build_command(
+ "run", ctx.system_params, ctx.param_resolver
)
- try:
- for _ in range(retries):
- print(".", end="", flush=True)
-
- if ctx.system.establish_connection():
- break
-
- if isinstance(ctx.system, ControlledSystem) and not ctx.system.is_running():
- print(
- "\n\n---------- {} execution failed ----------".format(
- ctx.system.name
- )
- )
- stdout, stderr = ctx.system.get_output()
- print(stdout)
- print(stderr)
-
- raise Exception("System is not running")
+ for command in commands_to_run:
+ logger.debug("Running: %s", command)
+ exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command)
- wait(interval)
- else:
- raise ConnectionException("Couldn't connect to '{}:{}'.".format(host, port))
- finally:
- print()
-
-
-def wait(interval: float) -> None:
- """Wait for a period of time."""
- time.sleep(interval)
-
-
-def deploy_data(ctx: ExecutionContext) -> None:
- """Deploy data to the system."""
- assert ctx.system is not None, "System is required."
- for item in itertools.chain(ctx.app.get_deploy_data(), ctx.custom_deploy_data):
- print("Deploying {} onto {}".format(item.src, item.dst))
- ctx.system.deploy(item.src, item.dst)
-
-
-def build_run_commands(ctx: ExecutionContext) -> List[str]:
- """Build commands to run application."""
- if isinstance(ctx.system, StandaloneSystem):
- return ctx.system.build_command("run", ctx.system_params, ctx.param_resolver)
-
- return ctx.app.build_command("run", ctx.app_params, ctx.param_resolver)
-
-
-@contextmanager
-def controlled_system_manager(ctx: ExecutionContext) -> Generator[None, None, None]:
- """Context manager used for system initialisation before run."""
- system = cast(ControlledSystem, ctx.system)
- commands = system.build_command("run", ctx.system_params, ctx.param_resolver)
- pid_file_path: Optional[Path] = None
- if ctx.is_locking_required:
- file_lock_path = get_file_lock_path(ctx)
- pid_file_path = file_lock_path.parent / "{}.pid".format(file_lock_path.stem)
-
- system.start(commands, ctx.is_locking_required, pid_file_path)
- try:
- yield
- finally:
- print("Shutting down sequence...")
- print("Stopping {}... (It could take few seconds)".format(system.name))
- system.stop(wait=True)
- print("{} stopped successfully.".format(system.name))
-
-
-@contextmanager
-def lock_execution_manager(ctx: ExecutionContext) -> Generator[None, None, None]:
- """Lock execution manager."""
- file_lock_path = get_file_lock_path(ctx)
- file_lock = FileLock(str(file_lock_path))
-
- try:
- file_lock.acquire(timeout=1)
- except Timeout as error:
- raise AnotherInstanceIsRunningException() from error
-
- try:
- yield
- finally:
- file_lock.release()
-
-
-def get_file_lock_path(ctx: ExecutionContext, lock_dir: Path = Path("/tmp")) -> Path:
- """Get file lock path."""
- lock_modules = []
- if ctx.app.lock:
- lock_modules.append(ctx.app.name)
- if ctx.system is not None and ctx.system.lock:
- lock_modules.append(ctx.system.name)
- lock_filename = ""
- if lock_modules:
- lock_filename = "_".join(["middleware"] + lock_modules) + ".lock"
-
- if lock_filename:
- lock_filename = resolve_all_parameters(lock_filename, ctx.param_resolver)
- lock_filename = valid_for_filename(lock_filename)
-
- if not lock_filename:
- raise ConfigurationException("No filename for lock provided")
-
- if not isinstance(lock_dir, Path) or not lock_dir.is_dir():
- raise ConfigurationException(
- "Invalid directory {} for lock files provided".format(lock_dir)
- )
-
- return lock_dir / lock_filename
-
-
-@contextmanager
-def build_dir_manager(ctx: ExecutionContext) -> Generator[None, None, None]:
- """Build directory manager."""
- try:
- yield
- finally:
- if (
- ctx.is_build_required
- and ctx.is_unique_build_dir_required
- and ctx.build_dir().is_dir()
- ):
- remove_directory(ctx.build_dir())
+ if exit_code != 0:
+ logger.warning("Application exited with exit code %i", exit_code)
-
-def get_context_managers(
- command_name: str, ctx: ExecutionContext
-) -> Sequence[Callable[[ExecutionContext], ContextManager[None]]]:
- """Get context manager for the system."""
- managers = []
-
- if ctx.is_locking_required:
- managers.append(lock_execution_manager)
-
- if command_name == "run":
- if isinstance(ctx.system, ControlledSystem):
- managers.append(controlled_system_manager)
-
- return managers
+ return ctx
diff --git a/src/mlia/backend/fs.py b/src/mlia/backend/fs.py
index 9979fcb..9fb53b1 100644
--- a/src/mlia/backend/fs.py
+++ b/src/mlia/backend/fs.py
@@ -4,7 +4,6 @@
import re
import shutil
from pathlib import Path
-from typing import Any
from typing import Literal
from typing import Optional
@@ -30,7 +29,7 @@ def get_backends_path(name: ResourceType) -> Path:
if resource_path.is_dir():
return resource_path
- raise ResourceWarning("Resource '{}' not found.".format(name))
+ raise ResourceWarning(f"Resource '{name}' not found.")
def copy_directory_content(source: Path, destination: Path) -> None:
@@ -51,10 +50,10 @@ def remove_resource(resource_directory: str, resource_type: ResourceType) -> Non
resource_location = resources / resource_directory
if not resource_location.exists():
- raise Exception("Resource {} does not exist".format(resource_directory))
+ raise Exception(f"Resource {resource_directory} does not exist")
if not resource_location.is_dir():
- raise Exception("Wrong resource {}".format(resource_directory))
+ raise Exception(f"Wrong resource {resource_directory}")
shutil.rmtree(resource_location)
@@ -74,7 +73,7 @@ def recreate_directory(directory_path: Optional[Path]) -> None:
if directory_path.exists() and not directory_path.is_dir():
raise Exception(
- "Path {} does exist and it is not a directory".format(str(directory_path))
+ f"Path {str(directory_path)} does exist and it is not a directory."
)
if directory_path.is_dir():
@@ -83,33 +82,6 @@ def recreate_directory(directory_path: Optional[Path]) -> None:
directory_path.mkdir()
-def read_file(file_path: Path, mode: Optional[str] = None) -> Any:
- """Read file as string or bytearray."""
- if file_path.is_file():
- if mode is not None:
- # Ignore pylint warning because mode can be 'binary' as well which
- # is not compatible with specifying encodings.
- with open(file_path, mode) as file: # pylint: disable=unspecified-encoding
- return file.read()
- else:
- with open(file_path, encoding="utf-8") as file:
- return file.read()
-
- if mode == "rb":
- return b""
- return ""
-
-
-def read_file_as_string(file_path: Path) -> str:
- """Read file as string."""
- return str(read_file(file_path))
-
-
-def read_file_as_bytearray(file_path: Path) -> bytearray:
- """Read a file as bytearray."""
- return bytearray(read_file(file_path, mode="rb"))
-
-
def valid_for_filename(value: str, replacement: str = "") -> str:
"""Replace non alpha numeric characters."""
return re.sub(r"[^\w.]", replacement, value, flags=re.ASCII)
diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py
index 3a1016c..8d8246d 100644
--- a/src/mlia/backend/manager.py
+++ b/src/mlia/backend/manager.py
@@ -2,27 +2,25 @@
# SPDX-License-Identifier: Apache-2.0
"""Module for backend integration."""
import logging
-import re
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
from pathlib import Path
-from typing import Any
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
+from typing import Set
from typing import Tuple
from mlia.backend.application import get_available_applications
from mlia.backend.application import install_application
-from mlia.backend.common import DataPaths
from mlia.backend.execution import ExecutionContext
from mlia.backend.execution import run_application
+from mlia.backend.output_consumer import Base64OutputConsumer
+from mlia.backend.output_consumer import OutputConsumer
from mlia.backend.system import get_available_systems
from mlia.backend.system import install_system
-from mlia.utils.proc import OutputConsumer
-from mlia.utils.proc import RunningCommand
logger = logging.getLogger(__name__)
@@ -128,89 +126,55 @@ class ExecutionParams:
system: str
application_params: List[str]
system_params: List[str]
- deploy_params: List[str]
class LogWriter(OutputConsumer):
"""Redirect output to the logger."""
- def feed(self, line: str) -> None:
+ def feed(self, line: str) -> bool:
"""Process line from the output."""
logger.debug(line.strip())
+ return False
-class GenericInferenceOutputParser(OutputConsumer):
+class GenericInferenceOutputParser(Base64OutputConsumer):
"""Generic inference app output parser."""
- PATTERNS = {
- name: tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns)
- for name, patterns in (
- (
- "npu_active_cycles",
- (
- r"NPU ACTIVE cycles: (?P<value>\d+)",
- r"NPU ACTIVE: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_idle_cycles",
- (
- r"NPU IDLE cycles: (?P<value>\d+)",
- r"NPU IDLE: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_total_cycles",
- (
- r"NPU TOTAL cycles: (?P<value>\d+)",
- r"NPU TOTAL: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_axi0_rd_data_beat_received",
- (
- r"NPU AXI0_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
- r"NPU AXI0_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
- ),
- ),
- (
- "npu_axi0_wr_data_beat_written",
- (
- r"NPU AXI0_WR_DATA_BEAT_WRITTEN beats: (?P<value>\d+)",
- r"NPU AXI0_WR_DATA_BEAT_WRITTEN: (?P<value>\d+) beats",
- ),
- ),
- (
- "npu_axi1_rd_data_beat_received",
- (
- r"NPU AXI1_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
- r"NPU AXI1_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
- ),
- ),
- )
- }
-
def __init__(self) -> None:
"""Init generic inference output parser instance."""
- self.result: Dict = {}
-
- def feed(self, line: str) -> None:
- """Feed new line to the parser."""
- for name, patterns in self.PATTERNS.items():
- for pattern in patterns:
- match = pattern.search(line)
-
- if match:
- self.result[name] = int(match["value"])
- return
+ super().__init__()
+ self._map = {
+ "NPU ACTIVE": "npu_active_cycles",
+ "NPU IDLE": "npu_idle_cycles",
+ "NPU TOTAL": "npu_total_cycles",
+ "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received",
+ "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written",
+ "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received",
+ }
+
+ @property
+ def result(self) -> Dict:
+ """Merge the raw results and map the names to the right output names."""
+ merged_result = {}
+ for raw_result in self.parsed_output:
+ for profiling_result in raw_result:
+ for sample in profiling_result["samples"]:
+ name, values = (sample["name"], sample["value"])
+ if name in merged_result:
+ raise KeyError(
+ f"Duplicate key '{name}' in base64 output.",
+ )
+ new_name = self._map[name]
+ merged_result[new_name] = values[0]
+ return merged_result
def is_ready(self) -> bool:
"""Return true if all expected data has been parsed."""
- return self.result.keys() == self.PATTERNS.keys()
+ return set(self.result.keys()) == set(self._map.values())
- def missed_keys(self) -> List[str]:
- """Return list of the keys that have not been found in the output."""
- return sorted(self.PATTERNS.keys() - self.result.keys())
+ def missed_keys(self) -> Set[str]:
+ """Return a set of the keys that have not been found in the output."""
+ return set(self._map.values()) - set(self.result.keys())
class BackendRunner:
@@ -274,24 +238,12 @@ class BackendRunner:
@staticmethod
def run_application(execution_params: ExecutionParams) -> ExecutionContext:
"""Run requested application."""
-
- def to_data_paths(paths: str) -> DataPaths:
- """Split input into two and create new DataPaths object."""
- src, dst = paths.split(sep=":", maxsplit=1)
- return DataPaths(Path(src), dst)
-
- deploy_data_paths = [
- to_data_paths(paths) for paths in execution_params.deploy_params
- ]
-
ctx = run_application(
execution_params.application,
execution_params.application_params,
execution_params.system,
execution_params.system_params,
- deploy_data_paths,
)
-
return ctx
@staticmethod
@@ -305,7 +257,6 @@ class GenericInferenceRunner(ABC):
def __init__(self, backend_runner: BackendRunner):
"""Init generic inference runner instance."""
self.backend_runner = backend_runner
- self.running_inference: Optional[RunningCommand] = None
def run(
self, model_info: ModelInfo, output_consumers: List[OutputConsumer]
@@ -315,27 +266,12 @@ class GenericInferenceRunner(ABC):
ctx = self.backend_runner.run_application(execution_params)
if ctx.stdout is not None:
- self.consume_output(ctx.stdout, output_consumers)
-
- def stop(self) -> None:
- """Stop running inference."""
- if self.running_inference is None:
- return
-
- self.running_inference.stop()
+ ctx.stdout = self.consume_output(ctx.stdout, output_consumers)
@abstractmethod
def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
"""Get execution params for the provided model."""
- def __enter__(self) -> "GenericInferenceRunner":
- """Enter context."""
- return self
-
- def __exit__(self, *_args: Any) -> None:
- """Exit context."""
- self.stop()
-
def check_system_and_application(self, system_name: str, app_name: str) -> None:
"""Check if requested system and application installed."""
if not self.backend_runner.is_system_installed(system_name):
@@ -348,11 +284,23 @@ class GenericInferenceRunner(ABC):
)
@staticmethod
- def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> None:
- """Pass program's output to the consumers."""
- for line in output.decode("utf8").splitlines():
+ def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> bytearray:
+ """
+ Pass program's output to the consumers and filter it.
+
+ Returns the filtered output.
+ """
+ filtered_output = bytearray()
+ for line_bytes in output.splitlines():
+ line = line_bytes.decode("utf-8")
+ remove_line = False
for consumer in consumers:
- consumer.feed(line)
+ if consumer.feed(line):
+ remove_line = True
+ if not remove_line:
+ filtered_output.extend(line_bytes)
+
+ return filtered_output
class GenericInferenceRunnerEthosU(GenericInferenceRunner):
@@ -408,7 +356,6 @@ class GenericInferenceRunnerEthosU(GenericInferenceRunner):
self.system_name,
[],
system_params,
- [],
)
@@ -422,20 +369,18 @@ def estimate_performance(
model_info: ModelInfo, device_info: DeviceInfo, backend: str
) -> PerformanceMetrics:
"""Get performance estimations."""
- with get_generic_runner(device_info, backend) as generic_runner:
- output_parser = GenericInferenceOutputParser()
- output_consumers = [output_parser, LogWriter()]
+ output_parser = GenericInferenceOutputParser()
+ output_consumers = [output_parser, LogWriter()]
- generic_runner.run(model_info, output_consumers)
+ generic_runner = get_generic_runner(device_info, backend)
+ generic_runner.run(model_info, output_consumers)
- if not output_parser.is_ready():
- missed_data = ",".join(output_parser.missed_keys())
- logger.debug(
- "Unable to get performance metrics, missed data %s", missed_data
- )
- raise Exception("Unable to get performance metrics, insufficient data")
+ if not output_parser.is_ready():
+ missed_data = ",".join(output_parser.missed_keys())
+ logger.debug("Unable to get performance metrics, missed data %s", missed_data)
+ raise Exception("Unable to get performance metrics, insufficient data")
- return PerformanceMetrics(**output_parser.result)
+ return PerformanceMetrics(**output_parser.result)
def get_backend_runner() -> BackendRunner:
diff --git a/src/mlia/backend/output_consumer.py b/src/mlia/backend/output_consumer.py
new file mode 100644
index 0000000..bac4186
--- /dev/null
+++ b/src/mlia/backend/output_consumer.py
@@ -0,0 +1,66 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Output consumers module."""
+import base64
+import json
+import re
+from typing import List
+from typing import Protocol
+from typing import runtime_checkable
+
+
+@runtime_checkable
+class OutputConsumer(Protocol):
+ """Protocol to consume output."""
+
+ def feed(self, line: str) -> bool:
+ """
+ Feed a new line to be parsed.
+
+ Return True if the line should be removed from the output.
+ """
+
+
+class Base64OutputConsumer(OutputConsumer):
+ """
+ Parser to extract base64-encoded JSON from tagged standard output.
+
+ Example of the tagged output:
+ ```
+ # Encoded JSON: {"test": 1234}
+ <metrics>eyJ0ZXN0IjogMTIzNH0</metrics>
+ ```
+ """
+
+ TAG_NAME = "metrics"
+
+ def __init__(self) -> None:
+ """Set up the regular expression to extract tagged strings."""
+ self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)</{self.TAG_NAME}>")
+ self.parsed_output: List = []
+
+ def feed(self, line: str) -> bool:
+ """
+ Parse the output line and save the decoded output.
+
+ Returns True if the line contains tagged output.
+
+ Example:
+ Using the tagged output from the class docs the parser should collect
+ the following:
+ ```
+ [
+ {"test": 1234}
+ ]
+ ```
+ """
+ res_b64 = self._regex.search(line)
+ if res_b64:
+ res_json = base64.b64decode(res_b64.group(1), validate=True)
+ res = json.loads(res_json)
+ self.parsed_output.append(res)
+ # Remove this line from the output, i.e. consume it, as it
+ # does not contain any human readable content.
+ return True
+
+ return False
diff --git a/src/mlia/backend/output_parser.py b/src/mlia/backend/output_parser.py
deleted file mode 100644
index 111772a..0000000
--- a/src/mlia/backend/output_parser.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Definition of output parsers (including base class OutputParser)."""
-import base64
-import json
-import re
-from abc import ABC
-from abc import abstractmethod
-from typing import Any
-from typing import Dict
-from typing import Union
-
-
-class OutputParser(ABC):
- """Abstract base class for output parsers."""
-
- def __init__(self, name: str) -> None:
- """Set up the name of the parser."""
- super().__init__()
- self.name = name
-
- @abstractmethod
- def __call__(self, output: bytearray) -> Dict[str, Any]:
- """Parse the output and return a map of names to metrics."""
- return {}
-
- # pylint: disable=no-self-use
- def filter_out_parsed_content(self, output: bytearray) -> bytearray:
- """
- Filter out the parsed content from the output.
-
- Does nothing by default. Can be overridden in subclasses.
- """
- return output
-
-
-class RegexOutputParser(OutputParser):
- """Parser of standard output data using regular expressions."""
-
- _TYPE_MAP = {"str": str, "float": float, "int": int}
-
- def __init__(
- self,
- name: str,
- regex_config: Dict[str, Dict[str, str]],
- ) -> None:
- """
- Set up the parser with the regular expressions.
-
- The regex_config is mapping from a name to a dict with keys 'pattern'
- and 'type':
- - The 'pattern' holds the regular expression that must contain exactly
- one capturing parenthesis
- - The 'type' can be one of ['str', 'float', 'int'].
-
- Example:
- ```
- {"Metric1": {"pattern": ".*= *(.*)", "type": "str"}}
- ```
-
- The different regular expressions from the config are combined using
- non-capturing parenthesis, i.e. regular expressions must not overlap
- if more than one match per line is expected.
- """
- super().__init__(name)
-
- self._verify_config(regex_config)
- self._regex_cfg = regex_config
-
- # Compile regular expression to match in the output
- self._regex = re.compile(
- "|".join("(?:{0})".format(x["pattern"]) for x in self._regex_cfg.values())
- )
-
- def __call__(self, output: bytearray) -> Dict[str, Union[str, float, int]]:
- """
- Parse the output and return a map of names to metrics.
-
- Example:
- Assuming a regex_config as used as example in `__init__()` and the
- following output:
- ```
- Simulation finished:
- SIMULATION_STATUS = SUCCESS
- Simulation DONE
- ```
- Then calling the parser should return the following dict:
- ```
- {
- "Metric1": "SUCCESS"
- }
- ```
- """
- metrics = {}
- output_str = output.decode("utf-8")
- results = self._regex.findall(output_str)
- for line_result in results:
- for idx, (name, cfg) in enumerate(self._regex_cfg.items()):
- # The result(s) returned by findall() are either a single string
- # or a tuple (depending on the number of groups etc.)
- result = (
- line_result if isinstance(line_result, str) else line_result[idx]
- )
- if result:
- mapped_result = self._TYPE_MAP[cfg["type"]](result)
- metrics[name] = mapped_result
- return metrics
-
- def _verify_config(self, regex_config: Dict[str, Dict[str, str]]) -> None:
- """Make sure we have a valid regex_config.
-
- I.e.
- - Exactly one capturing parenthesis per pattern
- - Correct types
- """
- for name, cfg in regex_config.items():
- # Check that there is one capturing group defined in the pattern.
- regex = re.compile(cfg["pattern"])
- if regex.groups != 1:
- raise ValueError(
- f"Pattern for metric '{name}' must have exactly one "
- f"capturing parenthesis, but it has {regex.groups}."
- )
- # Check if type is supported
- if not cfg["type"] in self._TYPE_MAP:
- raise TypeError(
- f"Type '{cfg['type']}' for metric '{name}' is not "
- f"supported. Choose from: {list(self._TYPE_MAP.keys())}."
- )
-
-
-class Base64OutputParser(OutputParser):
- """
- Parser to extract base64-encoded JSON from tagged standard output.
-
- Example of the tagged output:
- ```
- # Encoded JSON: {"test": 1234}
- <metrics>eyJ0ZXN0IjogMTIzNH0</metrics>
- ```
- """
-
- TAG_NAME = "metrics"
-
- def __init__(self, name: str) -> None:
- """Set up the regular expression to extract tagged strings."""
- super().__init__(name)
- self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)</{self.TAG_NAME}>")
-
- def __call__(self, output: bytearray) -> Dict[str, Any]:
- """
- Parse the output and return a map of index (as string) to decoded JSON.
-
- Example:
- Using the tagged output from the class docs the parser should return
- the following dict:
- ```
- {
- "0": {"test": 1234}
- }
- ```
- """
- metrics = {}
- output_str = output.decode("utf-8")
- results = self._regex.findall(output_str)
- for idx, result_base64 in enumerate(results):
- result_json = base64.b64decode(result_base64, validate=True)
- result = json.loads(result_json)
- metrics[str(idx)] = result
-
- return metrics
-
- def filter_out_parsed_content(self, output: bytearray) -> bytearray:
- """Filter out base64-encoded content from the output."""
- output_str = self._regex.sub("", output.decode("utf-8"))
- return bytearray(output_str.encode("utf-8"))
diff --git a/src/mlia/backend/proc.py b/src/mlia/backend/proc.py
index 90ff414..a4c0be3 100644
--- a/src/mlia/backend/proc.py
+++ b/src/mlia/backend/proc.py
@@ -5,7 +5,6 @@
This module contains all classes and functions for dealing with Linux
processes.
"""
-import csv
import datetime
import logging
import shlex
@@ -14,11 +13,9 @@ import time
from pathlib import Path
from typing import Any
from typing import List
-from typing import NamedTuple
from typing import Optional
from typing import Tuple
-import psutil
from sh import Command
from sh import CommandNotFound
from sh import ErrorReturnCode
@@ -26,6 +23,8 @@ from sh import RunningCommand
from mlia.backend.fs import valid_for_filename
+logger = logging.getLogger(__name__)
+
class CommandFailedException(Exception):
"""Exception for failed command execution."""
@@ -50,7 +49,7 @@ class ShellCommand:
_bg: bool = True,
_out: Any = None,
_err: Any = None,
- _search_paths: Optional[List[Path]] = None
+ _search_paths: Optional[List[Path]] = None,
) -> RunningCommand:
"""Run the shell command with the given arguments.
@@ -86,7 +85,7 @@ class ShellCommand:
"""Construct and returns the paths of stdout/stderr files."""
timestamp = datetime.datetime.now().timestamp()
base_path = Path(base_log_path)
- base = base_path / "{}_{}".format(valid_for_filename(cmd, "_"), timestamp)
+ base = base_path / f"{valid_for_filename(cmd, '_')}_{timestamp}"
stdout = base.with_suffix(".out")
stderr = base.with_suffix(".err")
try:
@@ -164,7 +163,7 @@ def run_and_wait(
except Exception as error:
is_running = running_cmd is not None and running_cmd.is_alive()
if terminate_on_error and is_running:
- print("Terminating ...")
+ logger.debug("Terminating ...")
terminate_command(running_cmd)
raise error
@@ -184,87 +183,15 @@ def terminate_command(
time.sleep(wait_period)
if not running_cmd.is_alive():
return
- print(
- "Unable to terminate process {}. Sending SIGTERM...".format(
- running_cmd.process.pid
- )
+ logger.error(
+ "Unable to terminate process %i. Sending SIGTERM...",
+ running_cmd.process.pid,
)
running_cmd.process.signal_group(signal.SIGTERM)
except ProcessLookupError:
pass
-def terminate_external_process(
- process: psutil.Process,
- wait_period: float = 0.5,
- number_of_attempts: int = 20,
- wait_for_termination: float = 5.0,
-) -> None:
- """Terminate external process."""
- try:
- process.terminate()
- for _ in range(number_of_attempts):
- if not process.is_running():
- return
- time.sleep(wait_period)
-
- if process.is_running():
- process.terminate()
- time.sleep(wait_for_termination)
- except psutil.Error:
- print("Unable to terminate process")
-
-
-class ProcessInfo(NamedTuple):
- """Process information."""
-
- name: str
- executable: str
- cwd: str
- pid: int
-
-
-def save_process_info(pid: int, pid_file: Path) -> None:
- """Save process information to file."""
- try:
- parent = psutil.Process(pid)
- children = parent.children(recursive=True)
- family = [parent] + children
-
- with open(pid_file, "w", encoding="utf-8") as file:
- csv_writer = csv.writer(file)
- for member in family:
- process_info = ProcessInfo(
- name=member.name(),
- executable=member.exe(),
- cwd=member.cwd(),
- pid=member.pid,
- )
- csv_writer.writerow(process_info)
- except psutil.NoSuchProcess:
- # if process does not exist or finishes before
- # function call then nothing could be saved
- # just ignore this exception and exit
- pass
-
-
-def read_process_info(pid_file: Path) -> List[ProcessInfo]:
- """Read information about previous system processes."""
- if not pid_file.is_file():
- return []
-
- result = []
- with open(pid_file, encoding="utf-8") as file:
- csv_reader = csv.reader(file)
- for row in csv_reader:
- name, executable, cwd, pid = row
- result.append(
- ProcessInfo(name=name, executable=executable, cwd=cwd, pid=int(pid))
- )
-
- return result
-
-
def print_command_stdout(command: RunningCommand) -> None:
"""Print the stdout of a command.
diff --git a/src/mlia/backend/protocol.py b/src/mlia/backend/protocol.py
deleted file mode 100644
index ebfe69a..0000000
--- a/src/mlia/backend/protocol.py
+++ /dev/null
@@ -1,325 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Contain protocol related classes and functions."""
-from abc import ABC
-from abc import abstractmethod
-from contextlib import closing
-from pathlib import Path
-from typing import Any
-from typing import cast
-from typing import Iterable
-from typing import Optional
-from typing import Tuple
-from typing import Union
-
-import paramiko
-
-from mlia.backend.common import ConfigurationException
-from mlia.backend.config import LocalProtocolConfig
-from mlia.backend.config import SSHConfig
-from mlia.backend.proc import run_and_wait
-
-
-# Redirect all paramiko thread exceptions to a file otherwise these will be
-# printed to stderr.
-paramiko.util.log_to_file("/tmp/main_paramiko_log.txt", level=paramiko.common.INFO)
-
-
-class SSHConnectionException(Exception):
- """SSH connection exception."""
-
-
-class SupportsClose(ABC):
- """Class indicates support of close operation."""
-
- @abstractmethod
- def close(self) -> None:
- """Close protocol session."""
-
-
-class SupportsDeploy(ABC):
- """Class indicates support of deploy operation."""
-
- @abstractmethod
- def deploy(self, src: Path, dst: str, retry: bool = True) -> None:
- """Abstract method for deploy data."""
-
-
-class SupportsConnection(ABC):
- """Class indicates that protocol uses network connections."""
-
- @abstractmethod
- def establish_connection(self) -> bool:
- """Establish connection with underlying system."""
-
- @abstractmethod
- def connection_details(self) -> Tuple[str, int]:
- """Return connection details (host, port)."""
-
-
-class Protocol(ABC):
- """Abstract class for representing the protocol."""
-
- def __init__(self, iterable: Iterable = (), **kwargs: Any) -> None:
- """Initialize the class using a dict."""
- self.__dict__.update(iterable, **kwargs)
- self._validate()
-
- @abstractmethod
- def _validate(self) -> None:
- """Abstract method for config data validation."""
-
- @abstractmethod
- def run(
- self, command: str, retry: bool = False
- ) -> Tuple[int, bytearray, bytearray]:
- """
- Abstract method for running commands.
-
- Returns a tuple: (exit_code, stdout, stderr)
- """
-
-
-class CustomSFTPClient(paramiko.SFTPClient):
- """Class for creating a custom sftp client."""
-
- def put_dir(self, source: Path, target: str) -> None:
- """Upload the source directory to the target path.
-
- The target directory needs to exists and the last directory of the
- source will be created under the target with all its content.
- """
- # Create the target directory
- self._mkdir(target, ignore_existing=True)
- # Create the last directory in the source on the target
- self._mkdir("{}/{}".format(target, source.name), ignore_existing=True)
- # Go through the whole content of source
- for item in sorted(source.glob("**/*")):
- relative_path = item.relative_to(source.parent)
- remote_target = target / relative_path
- if item.is_file():
- self.put(str(item), str(remote_target))
- else:
- self._mkdir(str(remote_target), ignore_existing=True)
-
- def _mkdir(self, path: str, mode: int = 511, ignore_existing: bool = False) -> None:
- """Extend mkdir functionality.
-
- This version adds an option to not fail if the folder exists.
- """
- try:
- super().mkdir(path, mode)
- except IOError as error:
- if ignore_existing:
- pass
- else:
- raise error
-
-
-class LocalProtocol(Protocol):
- """Class for local protocol."""
-
- protocol: str
- cwd: Path
-
- def run(
- self, command: str, retry: bool = False
- ) -> Tuple[int, bytearray, bytearray]:
- """
- Run command locally.
-
- Returns a tuple: (exit_code, stdout, stderr)
- """
- if not isinstance(self.cwd, Path) or not self.cwd.is_dir():
- raise ConfigurationException("Wrong working directory {}".format(self.cwd))
-
- stdout = bytearray()
- stderr = bytearray()
-
- return run_and_wait(
- command, self.cwd, terminate_on_error=True, out=stdout, err=stderr
- )
-
- def _validate(self) -> None:
- """Validate protocol configuration."""
- assert hasattr(self, "protocol") and self.protocol == "local"
- assert hasattr(self, "cwd")
-
-
-class SSHProtocol(Protocol, SupportsClose, SupportsDeploy, SupportsConnection):
- """Class for SSH protocol."""
-
- protocol: str
- username: str
- password: str
- hostname: str
- port: int
-
- def __init__(self, iterable: Iterable = (), **kwargs: Any) -> None:
- """Initialize the class using a dict."""
- super().__init__(iterable, **kwargs)
- # Internal state to store if the system is connectable. It will be set
- # to true at the first connection instance
- self.client: Optional[paramiko.client.SSHClient] = None
- self.port = int(self.port)
-
- def run(self, command: str, retry: bool = True) -> Tuple[int, bytearray, bytearray]:
- """
- Run command over SSH.
-
- Returns a tuple: (exit_code, stdout, stderr)
- """
- transport = self._get_transport()
- with closing(transport.open_session()) as channel:
- # Enable shell's .profile settings and execute command
- channel.exec_command("bash -l -c '{}'".format(command))
- exit_status = -1
- stdout = bytearray()
- stderr = bytearray()
- while True:
- if channel.exit_status_ready():
- exit_status = channel.recv_exit_status()
- # Call it one last time to read any leftover in the channel
- self._recv_stdout_err(channel, stdout, stderr)
- break
- self._recv_stdout_err(channel, stdout, stderr)
-
- return exit_status, stdout, stderr
-
- def deploy(self, src: Path, dst: str, retry: bool = True) -> None:
- """Deploy src to remote dst over SSH.
-
- src and dst should be path to a file or directory.
- """
- transport = self._get_transport()
- sftp = cast(CustomSFTPClient, CustomSFTPClient.from_transport(transport))
-
- with closing(sftp):
- if src.is_dir():
- sftp.put_dir(src, dst)
- elif src.is_file():
- sftp.put(str(src), dst)
- else:
- raise Exception("Deploy error: file type not supported")
-
- # After the deployment of files, sync the remote filesystem to flush
- # buffers to hard disk
- self.run("sync")
-
- def close(self) -> None:
- """Close protocol session."""
- if self.client is not None:
- print("Try syncing remote file system...")
- # Before stopping the system, we try to run sync to make sure all
- # data are flushed on disk.
- self.run("sync", retry=False)
- self._close_client(self.client)
-
- def establish_connection(self) -> bool:
- """Establish connection with underlying system."""
- if self.client is not None:
- return True
-
- self.client = self._connect()
- return self.client is not None
-
- def _get_transport(self) -> paramiko.transport.Transport:
- """Get transport."""
- self.establish_connection()
-
- if self.client is None:
- raise SSHConnectionException(
- "Couldn't connect to '{}:{}'.".format(self.hostname, self.port)
- )
-
- transport = self.client.get_transport()
- if not transport:
- raise Exception("Unable to get transport")
-
- return transport
-
- def connection_details(self) -> Tuple[str, int]:
- """Return connection details of underlying system."""
- return (self.hostname, self.port)
-
- def _connect(self) -> Optional[paramiko.client.SSHClient]:
- """Try to establish connection."""
- client: Optional[paramiko.client.SSHClient] = None
- try:
- client = paramiko.client.SSHClient()
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- client.connect(
- self.hostname,
- self.port,
- self.username,
- self.password,
- # next parameters should be set to False to disable authentication
- # using ssh keys
- allow_agent=False,
- look_for_keys=False,
- )
- return client
- except (
- # OSError raised on first attempt to connect when running inside Docker
- OSError,
- paramiko.ssh_exception.NoValidConnectionsError,
- paramiko.ssh_exception.SSHException,
- ):
- # even if connection is not established socket could be still
- # open, it should be closed
- self._close_client(client)
-
- return None
-
- @staticmethod
- def _close_client(client: Optional[paramiko.client.SSHClient]) -> None:
- """Close ssh client."""
- try:
- if client is not None:
- client.close()
- except Exception: # pylint: disable=broad-except
- pass
-
- @classmethod
- def _recv_stdout_err(
- cls, channel: paramiko.channel.Channel, stdout: bytearray, stderr: bytearray
- ) -> None:
- """Read from channel to stdout/stder."""
- chunk_size = 512
- if channel.recv_ready():
- stdout_chunk = channel.recv(chunk_size)
- stdout.extend(stdout_chunk)
- if channel.recv_stderr_ready():
- stderr_chunk = channel.recv_stderr(chunk_size)
- stderr.extend(stderr_chunk)
-
- def _validate(self) -> None:
- """Check if there are all the info for establishing the connection."""
- assert hasattr(self, "protocol") and self.protocol == "ssh"
- assert hasattr(self, "username")
- assert hasattr(self, "password")
- assert hasattr(self, "hostname")
- assert hasattr(self, "port")
-
-
-class ProtocolFactory:
- """Factory class to return the appropriate Protocol class."""
-
- @staticmethod
- def get_protocol(
- config: Optional[Union[SSHConfig, LocalProtocolConfig]],
- **kwargs: Union[str, Path, None]
- ) -> Union[SSHProtocol, LocalProtocol]:
- """Return the right protocol instance based on the config."""
- if not config:
- raise ValueError("No protocol config provided")
-
- protocol = config["protocol"]
- if protocol == "ssh":
- return SSHProtocol(config)
-
- if protocol == "local":
- cwd = kwargs.get("cwd")
- return LocalProtocol(config, cwd=cwd)
-
- raise ValueError("Protocol not supported: '{}'".format(protocol))
diff --git a/src/mlia/backend/source.py b/src/mlia/backend/source.py
index dcf6835..f80a774 100644
--- a/src/mlia/backend/source.py
+++ b/src/mlia/backend/source.py
@@ -63,11 +63,11 @@ class DirectorySource(Source):
def install_into(self, destination: Path) -> None:
"""Install source into destination directory."""
if not destination.is_dir():
- raise ConfigurationException("Wrong destination {}".format(destination))
+ raise ConfigurationException(f"Wrong destination {destination}.")
if not self.directory_path.is_dir():
raise ConfigurationException(
- "Directory {} does not exist".format(self.directory_path)
+ f"Directory {self.directory_path} does not exist."
)
copy_directory_content(self.directory_path, destination)
@@ -112,7 +112,7 @@ class TarArchiveSource(Source):
"Archive has no top level directory"
) from error_no_config
- config_path = "{}/{}".format(top_level_dir, BACKEND_CONFIG_FILE)
+ config_path = f"{top_level_dir}/{BACKEND_CONFIG_FILE}"
config_entry = archive.getmember(config_path)
self._has_top_level_folder = True
@@ -149,7 +149,7 @@ class TarArchiveSource(Source):
def install_into(self, destination: Path) -> None:
"""Install source into destination directory."""
if not destination.is_dir():
- raise ConfigurationException("Wrong destination {}".format(destination))
+ raise ConfigurationException(f"Wrong destination {destination}.")
with self._open(self.archive_path) as archive:
archive.extractall(destination)
@@ -157,14 +157,12 @@ class TarArchiveSource(Source):
def _open(self, archive_path: Path) -> TarFile:
"""Open archive file."""
if not archive_path.is_file():
- raise ConfigurationException("File {} does not exist".format(archive_path))
+ raise ConfigurationException(f"File {archive_path} does not exist.")
if archive_path.name.endswith("tar.gz") or archive_path.name.endswith("tgz"):
mode = "r:gz"
else:
- raise ConfigurationException(
- "Unsupported archive type {}".format(archive_path)
- )
+ raise ConfigurationException(f"Unsupported archive type {archive_path}.")
# The returned TarFile object can be used as a context manager (using
# 'with') by the calling instance.
@@ -181,7 +179,7 @@ def get_source(source_path: Path) -> Union[TarArchiveSource, DirectorySource]:
if source_path.is_dir():
return DirectorySource(source_path)
- raise ConfigurationException("Unable to read {}".format(source_path))
+ raise ConfigurationException(f"Unable to read {source_path}.")
def create_destination_and_install(source: Source, resource_path: Path) -> None:
@@ -197,7 +195,7 @@ def create_destination_and_install(source: Source, resource_path: Path) -> None:
if create_destination:
name = source.name()
if not name:
- raise ConfigurationException("Unable to get source name")
+ raise ConfigurationException("Unable to get source name.")
destination = resource_path / name
destination.mkdir()
diff --git a/src/mlia/backend/system.py b/src/mlia/backend/system.py
index 469083e..ff85bf3 100644
--- a/src/mlia/backend/system.py
+++ b/src/mlia/backend/system.py
@@ -6,9 +6,7 @@ from typing import Any
from typing import cast
from typing import Dict
from typing import List
-from typing import Optional
from typing import Tuple
-from typing import Union
from mlia.backend.common import Backend
from mlia.backend.common import ConfigurationException
@@ -17,72 +15,12 @@ from mlia.backend.common import get_backend_directories
from mlia.backend.common import load_config
from mlia.backend.common import remove_backend
from mlia.backend.config import SystemConfig
-from mlia.backend.controller import SystemController
-from mlia.backend.controller import SystemControllerSingleInstance
from mlia.backend.fs import get_backends_path
-from mlia.backend.protocol import ProtocolFactory
-from mlia.backend.protocol import SupportsClose
-from mlia.backend.protocol import SupportsConnection
-from mlia.backend.protocol import SupportsDeploy
+from mlia.backend.proc import run_and_wait
from mlia.backend.source import create_destination_and_install
from mlia.backend.source import get_source
-def get_available_systems_directory_names() -> List[str]:
- """Return a list of directory names for all avialable systems."""
- return [entry.name for entry in get_backend_directories("systems")]
-
-
-def get_available_systems() -> List["System"]:
- """Return a list with all available systems."""
- available_systems = []
- for config_json in get_backend_configs("systems"):
- config_entries = cast(List[SystemConfig], (load_config(config_json)))
- for config_entry in config_entries:
- config_entry["config_location"] = config_json.parent.absolute()
- system = load_system(config_entry)
- available_systems.append(system)
-
- return sorted(available_systems, key=lambda system: system.name)
-
-
-def get_system(system_name: str) -> Optional["System"]:
- """Return a system instance with the same name passed as argument."""
- available_systems = get_available_systems()
- for system in available_systems:
- if system_name == system.name:
- return system
- return None
-
-
-def install_system(source_path: Path) -> None:
- """Install new system."""
- try:
- source = get_source(source_path)
- config = cast(List[SystemConfig], source.config())
- systems_to_install = [load_system(entry) for entry in config]
- except Exception as error:
- raise ConfigurationException("Unable to read system definition") from error
-
- if not systems_to_install:
- raise ConfigurationException("No system definition found")
-
- available_systems = get_available_systems()
- already_installed = [s for s in systems_to_install if s in available_systems]
- if already_installed:
- names = [system.name for system in already_installed]
- raise ConfigurationException(
- "Systems [{}] are already installed".format(",".join(names))
- )
-
- create_destination_and_install(source, get_backends_path("systems"))
-
-
-def remove_system(directory_name: str) -> None:
- """Remove system."""
- remove_backend(directory_name, "systems")
-
-
class System(Backend):
"""System class."""
@@ -90,59 +28,33 @@ class System(Backend):
"""Construct the System class using the dictionary passed."""
super().__init__(config)
- self._setup_data_transfer(config)
self._setup_reporting(config)
- def _setup_data_transfer(self, config: SystemConfig) -> None:
- data_transfer_config = config.get("data_transfer")
- protocol = ProtocolFactory().get_protocol(
- data_transfer_config, cwd=self.config_location
- )
- self.protocol = protocol
-
def _setup_reporting(self, config: SystemConfig) -> None:
self.reporting = config.get("reporting")
- def run(self, command: str, retry: bool = True) -> Tuple[int, bytearray, bytearray]:
+ def run(self, command: str) -> Tuple[int, bytearray, bytearray]:
"""
Run command on the system.
Returns a tuple: (exit_code, stdout, stderr)
"""
- return self.protocol.run(command, retry)
-
- def deploy(self, src: Path, dst: str, retry: bool = True) -> None:
- """Deploy files to the system."""
- if isinstance(self.protocol, SupportsDeploy):
- self.protocol.deploy(src, dst, retry)
-
- @property
- def supports_deploy(self) -> bool:
- """Check if protocol supports deploy operation."""
- return isinstance(self.protocol, SupportsDeploy)
-
- @property
- def connectable(self) -> bool:
- """Check if protocol supports connection."""
- return isinstance(self.protocol, SupportsConnection)
-
- def establish_connection(self) -> bool:
- """Establish connection with the system."""
- if not isinstance(self.protocol, SupportsConnection):
+ cwd = self.config_location
+ if not isinstance(cwd, Path) or not cwd.is_dir():
raise ConfigurationException(
- "System {} does not support connections".format(self.name)
+ f"System has invalid config location: {cwd}",
)
- return self.protocol.establish_connection()
+ stdout = bytearray()
+ stderr = bytearray()
- def connection_details(self) -> Tuple[str, int]:
- """Return connection details."""
- if not isinstance(self.protocol, SupportsConnection):
- raise ConfigurationException(
- "System {} does not support connections".format(self.name)
- )
-
- return self.protocol.connection_details()
+ return run_and_wait(
+ command,
+ cwd=cwd,
+ terminate_on_error=True,
+ out=stdout,
+ err=stderr,
+ )
def __eq__(self, other: object) -> bool:
"""Overload operator ==."""
@@ -157,7 +69,6 @@ class System(Backend):
"type": "system",
"name": self.name,
"description": self.description,
- "data_transfer_protocol": self.protocol.protocol,
"commands": self._get_command_details(),
"annotations": self.annotations,
}
@@ -165,88 +76,66 @@ class System(Backend):
return output
-class StandaloneSystem(System):
- """StandaloneSystem class."""
-
+def get_available_systems_directory_names() -> List[str]:
+ """Return a list of directory names for all avialable systems."""
+ return [entry.name for entry in get_backend_directories("systems")]
-def get_controller(
- single_instance: bool, pid_file_path: Optional[Path] = None
-) -> SystemController:
- """Get system controller."""
- if single_instance:
- return SystemControllerSingleInstance(pid_file_path)
- return SystemController()
+def get_available_systems() -> List[System]:
+ """Return a list with all available systems."""
+ available_systems = []
+ for config_json in get_backend_configs("systems"):
+ config_entries = cast(List[SystemConfig], (load_config(config_json)))
+ for config_entry in config_entries:
+ config_entry["config_location"] = config_json.parent.absolute()
+ system = load_system(config_entry)
+ available_systems.append(system)
+ return sorted(available_systems, key=lambda system: system.name)
-class ControlledSystem(System):
- """ControlledSystem class."""
- def __init__(self, config: SystemConfig):
- """Construct the ControlledSystem class using the dictionary passed."""
- super().__init__(config)
- self.controller: Optional[SystemController] = None
-
- def start(
- self,
- commands: List[str],
- single_instance: bool = True,
- pid_file_path: Optional[Path] = None,
- ) -> None:
- """Launch the system."""
- if (
- not isinstance(self.config_location, Path)
- or not self.config_location.is_dir()
- ):
- raise ConfigurationException(
- "System {} has wrong config location".format(self.name)
- )
+def get_system(system_name: str) -> System:
+ """Return a system instance with the same name passed as argument."""
+ available_systems = get_available_systems()
+ for system in available_systems:
+ if system_name == system.name:
+ return system
+ raise ConfigurationException(f"System '{system_name}' not found.")
- self.controller = get_controller(single_instance, pid_file_path)
- self.controller.start(commands, self.config_location)
- def is_running(self) -> bool:
- """Check if system is running."""
- if not self.controller:
- return False
+def install_system(source_path: Path) -> None:
+ """Install new system."""
+ try:
+ source = get_source(source_path)
+ config = cast(List[SystemConfig], source.config())
+ systems_to_install = [load_system(entry) for entry in config]
+ except Exception as error:
+ raise ConfigurationException("Unable to read system definition") from error
- return self.controller.is_running()
+ if not systems_to_install:
+ raise ConfigurationException("No system definition found")
- def get_output(self) -> Tuple[str, str]:
- """Return system output."""
- if not self.controller:
- return "", ""
+ available_systems = get_available_systems()
+ already_installed = [s for s in systems_to_install if s in available_systems]
+ if already_installed:
+ names = [system.name for system in already_installed]
+ raise ConfigurationException(
+ f"Systems [{','.join(names)}] are already installed."
+ )
- return self.controller.get_output()
+ create_destination_and_install(source, get_backends_path("systems"))
- def stop(self, wait: bool = False) -> None:
- """Stop the system."""
- if not self.controller:
- raise Exception("System has not been started")
- if isinstance(self.protocol, SupportsClose):
- try:
- self.protocol.close()
- except Exception as error: # pylint: disable=broad-except
- print(error)
- self.controller.stop(wait)
+def remove_system(directory_name: str) -> None:
+ """Remove system."""
+ remove_backend(directory_name, "systems")
-def load_system(config: SystemConfig) -> Union[StandaloneSystem, ControlledSystem]:
+def load_system(config: SystemConfig) -> System:
"""Load system based on it's execution type."""
- data_transfer = config.get("data_transfer", {})
- protocol = data_transfer.get("protocol")
populate_shared_params(config)
- if protocol == "ssh":
- return ControlledSystem(config)
-
- if protocol == "local":
- return StandaloneSystem(config)
-
- raise ConfigurationException(
- "Unsupported execution type for protocol {}".format(protocol)
- )
+ return System(config)
def populate_shared_params(config: SystemConfig) -> None:
@@ -264,7 +153,7 @@ def populate_shared_params(config: SystemConfig) -> None:
raise ConfigurationException("All shared parameters should have aliases")
commands = config.get("commands", {})
- for cmd_name in ["build", "run"]:
+ for cmd_name in ["run"]:
command = commands.get(cmd_name)
if command is None:
commands[cmd_name] = []
@@ -275,7 +164,7 @@ def populate_shared_params(config: SystemConfig) -> None:
only_aliases = all(p.get("alias") for p in cmd_user_params)
if not only_aliases:
raise ConfigurationException(
- "All parameters for command {} should have aliases".format(cmd_name)
+ f"All parameters for command {cmd_name} should have aliases."
)
merged_by_alias = {
**{p.get("alias"): p for p in shared_user_params},
diff --git a/src/mlia/core/reporting.py b/src/mlia/core/reporting.py
index 1b75bb4..9006602 100644
--- a/src/mlia/core/reporting.py
+++ b/src/mlia/core/reporting.py
@@ -125,7 +125,7 @@ class Cell:
"""Return cell value."""
if self.fmt:
if isinstance(self.fmt.str_fmt, str):
- return "{:{fmt}}".format(self.value, fmt=self.fmt.str_fmt)
+ return f"{self.value:{self.fmt.str_fmt}}"
if callable(self.fmt.str_fmt):
return self.fmt.str_fmt(self.value)
diff --git a/src/mlia/nn/tensorflow/tflite_metrics.py b/src/mlia/nn/tensorflow/tflite_metrics.py
index 0fe36e0..2252c6b 100644
--- a/src/mlia/nn/tensorflow/tflite_metrics.py
+++ b/src/mlia/nn/tensorflow/tflite_metrics.py
@@ -194,9 +194,7 @@ class TFLiteMetrics:
aggregation_func = cluster_hist
else:
- raise NotImplementedError(
- "ReportClusterMode '{}' not implemented.".format(mode)
- )
+ raise NotImplementedError(f"ReportClusterMode '{mode}' not implemented.")
uniques = {
name: aggregation_func(details)
for name, details in self.filtered_details.items()
@@ -217,10 +215,10 @@ class TFLiteMetrics:
verbose: bool = False,
) -> None:
"""Print a summary of all the model information."""
- print("Model file: {}".format(self.tflite_file))
+ print(f"Model file: {self.tflite_file}")
print("#" * 80)
print(" " * 28 + "### TFLITE SUMMARY ###")
- print("File: {}".format(os.path.abspath(self.tflite_file)))
+ print(f"File: {os.path.abspath(self.tflite_file)}")
print("Input(s):")
self._print_in_outs(self.interpreter.get_input_details(), verbose)
print("Output(s):")
@@ -242,11 +240,11 @@ class TFLiteMetrics:
]
if report_sparsity:
sparsity = calculate_sparsity(weights, sparsity_accumulator)
- row.append("{:.2f}".format(sparsity))
+ row.append(f"{sparsity:.2f}")
rows.append(row)
if verbose:
# Print cluster centroids
- print("{} cluster centroids:".format(name))
+ print(f"{name} cluster centroids:")
# Types need to be ignored for this function call because
# np.unique does not have type annotation while the
# current context does.
@@ -259,9 +257,9 @@ class TFLiteMetrics:
sparsity_accumulator.total_weights
)
if report_sparsity:
- summary_row[header.index("Sparsity")] = "{:.2f}".format(
- sparsity_accumulator.sparsity()
- )
+ summary_row[
+ header.index("Sparsity")
+ ] = f"{sparsity_accumulator.sparsity():.2f}"
rows.append(summary_row)
# Report detailed cluster info
if report_cluster_mode is not None:
@@ -272,7 +270,7 @@ class TFLiteMetrics:
def _print_cluster_details(
self, report_cluster_mode: ReportClusterMode, max_num_clusters: int
) -> None:
- print("{}:\n{}".format(report_cluster_mode.name, report_cluster_mode.value))
+ print(f"{report_cluster_mode.name}:\n{report_cluster_mode.value}")
num_clusters = self.num_unique_weights(report_cluster_mode)
if (
report_cluster_mode == ReportClusterMode.NUM_CLUSTERS_HISTOGRAM
@@ -283,11 +281,9 @@ class TFLiteMetrics:
# histogram for unclustered layers.
for name, value in num_clusters.items():
if len(value) > max_num_clusters:
- num_clusters[name] = "More than {} unique values.".format(
- max_num_clusters
- )
+ num_clusters[name] = f"More than {max_num_clusters} unique values."
for name, nums in num_clusters.items():
- print("- {}: {}".format(self._prettify_name(name), nums))
+ print(f"- {self._prettify_name(name)}: {nums}")
@staticmethod
def _print_in_outs(ios: List[dict], verbose: bool = False) -> None:
@@ -296,9 +292,6 @@ class TFLiteMetrics:
pprint(item)
else:
print(
- "- {} ({}): {}".format(
- item["name"],
- np.dtype(item["dtype"]).name,
- item["shape"],
- )
+ f"- {item['name']} ({np.dtype(item['dtype']).name}): "
+ f"{item['shape']}"
)
diff --git a/src/mlia/resources/aiet/systems/SYSTEMS.txt b/src/mlia/resources/backend_configs/systems/SYSTEMS.txt
index 3861769..3861769 100644
--- a/src/mlia/resources/aiet/systems/SYSTEMS.txt
+++ b/src/mlia/resources/backend_configs/systems/SYSTEMS.txt
diff --git a/src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json b/src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json
index 3ffa548..5c44ebc 100644
--- a/src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json
+++ b/src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json
@@ -7,10 +7,6 @@
"sim_type": "FM",
"variant": "Cortex-M55+Ethos-U55"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"/opt/VHT/VHT_Corstone_SSE-300_Ethos-U55 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
@@ -47,10 +43,6 @@
"sim_type": "FM",
"variant": "Cortex-M55+Ethos-U65"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"/opt/VHT/VHT_Corstone_SSE-300_Ethos-U65 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license b/src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license
+++ b/src/mlia/resources/backend_configs/systems/corstone-300-vht/backend-config.json.license
diff --git a/src/mlia/resources/aiet/systems/corstone-300/aiet-config.json b/src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json
index 6d6785d..41d2fd0 100644
--- a/src/mlia/resources/aiet/systems/corstone-300/aiet-config.json
+++ b/src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json
@@ -7,10 +7,6 @@
"sim_type": "FM",
"variant": "Cortex-M55+Ethos-U55"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"FVP_Corstone_SSE-300_Ethos-U55 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
@@ -47,10 +43,6 @@
"sim_type": "FM",
"variant": "Cortex-M55+Ethos-U65"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"FVP_Corstone_SSE-300_Ethos-U65 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license b/src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license
+++ b/src/mlia/resources/backend_configs/systems/corstone-300/backend-config.json.license
diff --git a/src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json b/src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json
index dbc2622..3ea9a6a 100644
--- a/src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json
+++ b/src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json
@@ -7,10 +7,6 @@
"sim_type": "FM",
"variant": "Cortex-M85+Ethos-U55"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"/opt/VHT/VHT_Corstone_SSE-310 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json.license b/src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json.license
+++ b/src/mlia/resources/backend_configs/systems/corstone-310-vht/backend-config.json.license
diff --git a/src/mlia/resources/aiet/systems/corstone-310/aiet-config.json b/src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json
index 7aa3b0a..d043a2d 100644
--- a/src/mlia/resources/aiet/systems/corstone-310/aiet-config.json
+++ b/src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json
@@ -7,10 +7,6 @@
"sim_type": "FM",
"variant": "Cortex-M85+Ethos-U55"
},
- "data_transfer": {
- "protocol": "local"
- },
- "lock": true,
"commands": {
"run": [
"FVP_Corstone_SSE-310 -a {software.variables:eval_app} {user_params:input_file}@0x90000000 -C {user_params:mac} -C mps3_board.telnetterminal0.start_telnet=0 -C mps3_board.uart0.out_file='-' -C mps3_board.uart0.shutdown_on_eot=1 -C mps3_board.visualisation.disable-visualisation=1 --stat"
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license b/src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json.license
+++ b/src/mlia/resources/backend_configs/systems/corstone-310/backend-config.json.license
diff --git a/src/mlia/resources/backends/applications/.gitignore b/src/mlia/resources/backends/applications/.gitignore
deleted file mode 100644
index 0226166..0000000
--- a/src/mlia/resources/backends/applications/.gitignore
+++ /dev/null
@@ -1,6 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-# Ignore everything in this directory
-*
-# Except this file
-!.gitignore
diff --git a/src/mlia/resources/aiet/applications/APPLICATIONS.txt b/src/mlia/resources/backends/applications/APPLICATIONS.txt
index a702e19..ca1003b 100644
--- a/src/mlia/resources/aiet/applications/APPLICATIONS.txt
+++ b/src/mlia/resources/backends/applications/APPLICATIONS.txt
@@ -4,4 +4,4 @@ SPDX-License-Identifier: Apache-2.0
This directory contains the application packages for the Generic Inference
Runner.
-Each package should contain its own aiet-config.json file.
+Each package should contain its own backend-config.json file.
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json
index 757ccd1..7ee5e00 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json
@@ -10,7 +10,6 @@
"name": "Corstone-300: Cortex-M55+Ethos-U65"
}
],
- "lock": true,
"variables": {
"eval_app": "{software.config_dir}/ethos-u-inference_runner.axf"
}
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
index 4c50e1f..4c50e1f 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
Binary files differ
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
index 8896f92..8896f92 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json
index cb7e113..51ff429 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json
@@ -7,7 +7,6 @@
"name": "Corstone-300: Cortex-M55+Ethos-U55"
}
],
- "lock": true,
"variables": {
"eval_app": "{software.config_dir}/ethos-u-inference_runner.axf"
}
diff --git a/src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/systems/corstone-300-vht/aiet-config.json.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
index 850e2eb..850e2eb 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
Binary files differ
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
index 8896f92..8896f92 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json
index d524f64..b59c85e 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/aiet-config.json
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json
@@ -7,7 +7,6 @@
"name": "Corstone-300: Cortex-M55+Ethos-U65"
}
],
- "lock": true,
"variables": {
"eval_app": "{software.config_dir}/ethos-u-inference_runner.axf"
}
diff --git a/src/mlia/resources/aiet/systems/corstone-300/aiet-config.json.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/systems/corstone-300/aiet-config.json.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/backend-config.json.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf
index f881bb8..f881bb8 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf
Binary files differ
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license
index 8896f92..8896f92 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA/ethos-u-inference_runner.axf.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json
index 2cbab70..69c5e60 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/aiet-config.json
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json
@@ -7,7 +7,6 @@
"name": "Corstone-310: Cortex-M85+Ethos-U55"
}
],
- "lock": true,
"variables": {
"eval_app": "{software.config_dir}/ethos-u-inference_runner.axf"
}
diff --git a/src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json.license b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/systems/corstone-310-vht/aiet-config.json.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/backend-config.json.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
index 846ee33..846ee33 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf
Binary files differ
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
index 8896f92..8896f92 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA/ethos-u-inference_runner.axf.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json
index 01bec74..fbe4a16 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/aiet-config.json
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json
@@ -7,7 +7,6 @@
"name": "Corstone-310: Cortex-M85+Ethos-U55"
}
],
- "lock": true,
"variables": {
"eval_app": "{software.config_dir}/ethos-u-inference_runner.axf"
}
diff --git a/src/mlia/resources/aiet/systems/corstone-310/aiet-config.json.license b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license
index 9b83bfc..9b83bfc 100644
--- a/src/mlia/resources/aiet/systems/corstone-310/aiet-config.json.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/backend-config.json.license
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
index e3eab97..e3eab97 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf
Binary files differ
diff --git a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
index 8896f92..8896f92 100644
--- a/src/mlia/resources/aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
+++ b/src/mlia/resources/backends/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA/ethos-u-inference_runner.axf.license
diff --git a/src/mlia/tools/metadata/corstone.py b/src/mlia/tools/metadata/corstone.py
index a92f81c..6a3c1c8 100644
--- a/src/mlia/tools/metadata/corstone.py
+++ b/src/mlia/tools/metadata/corstone.py
@@ -13,7 +13,6 @@ from typing import List
from typing import Optional
import mlia.backend.manager as backend_manager
-from mlia.backend.fs import get_backend_resources
from mlia.tools.metadata.common import DownloadAndInstall
from mlia.tools.metadata.common import Installation
from mlia.tools.metadata.common import InstallationType
@@ -24,7 +23,7 @@ from mlia.utils.filesystem import all_paths_valid
from mlia.utils.filesystem import copy_all
from mlia.utils.filesystem import get_mlia_resources
from mlia.utils.filesystem import temp_directory
-from mlia.utils.proc import working_directory
+from mlia.utils.filesystem import working_directory
logger = logging.getLogger(__name__)
@@ -76,7 +75,7 @@ class BackendMetadata:
"""Return list of expected resources."""
resources = [self.system_config, *self.apps_resources]
- return (get_backend_resources() / resource for resource in resources)
+ return (get_mlia_resources() / resource for resource in resources)
@property
def supported_platform(self) -> bool:
@@ -314,12 +313,8 @@ def get_corstone_300_installation() -> Installation:
metadata=BackendMetadata(
name="Corstone-300",
description="Corstone-300 FVP",
- system_config="aiet/systems/corstone-300/aiet-config.json",
- apps_resources=[
- "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA",
- "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA",
- "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA",
- ],
+ system_config="backend_configs/systems/corstone-300/backend-config.json",
+ apps_resources=[],
fvp_dir_name="corstone_300",
download_artifact=DownloadArtifact(
name="Corstone-300 FVP",
@@ -346,7 +341,9 @@ def get_corstone_300_installation() -> Installation:
"VHT_Corstone_SSE-300_Ethos-U65",
],
copy_source=False,
- system_config="aiet/systems/corstone-300-vht/aiet-config.json",
+ system_config=(
+ "backends_configs/systems/corstone-300-vht/backend-config.json"
+ ),
),
),
backend_installer=Corstone300Installer(),
@@ -363,11 +360,8 @@ def get_corstone_310_installation() -> Installation:
metadata=BackendMetadata(
name="Corstone-310",
description="Corstone-310 FVP",
- system_config="aiet/systems/corstone-310/aiet-config.json",
- apps_resources=[
- "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA",
- "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA",
- ],
+ system_config="backend_configs/systems/corstone-310/backend-config.json",
+ apps_resources=[],
fvp_dir_name="corstone_310",
download_artifact=None,
supported_platforms=["Linux"],
@@ -386,7 +380,9 @@ def get_corstone_310_installation() -> Installation:
"VHT_Corstone_SSE-310",
],
copy_source=False,
- system_config="aiet/systems/corstone-310-vht/aiet-config.json",
+ system_config=(
+ "backend_configs/systems/corstone-310-vht/backend-config.json"
+ ),
),
),
backend_installer=None,
diff --git a/src/mlia/utils/filesystem.py b/src/mlia/utils/filesystem.py
index 73a88d9..7975905 100644
--- a/src/mlia/utils/filesystem.py
+++ b/src/mlia/utils/filesystem.py
@@ -122,3 +122,21 @@ def copy_all(*paths: Path, dest: Path) -> None:
if path.is_dir():
shutil.copytree(path, dest, dirs_exist_ok=True)
+
+
+@contextmanager
+def working_directory(
+ working_dir: Path, create_dir: bool = False
+) -> Generator[Path, None, None]:
+ """Temporary change working directory."""
+ current_working_dir = Path.cwd()
+
+ if create_dir:
+ working_dir.mkdir()
+
+ os.chdir(working_dir)
+
+ try:
+ yield working_dir
+ finally:
+ os.chdir(current_working_dir)
diff --git a/src/mlia/utils/proc.py b/src/mlia/utils/proc.py
deleted file mode 100644
index 18a4305..0000000
--- a/src/mlia/utils/proc.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Utils related to process management."""
-import os
-import signal
-import subprocess
-import time
-from abc import ABC
-from abc import abstractmethod
-from contextlib import contextmanager
-from pathlib import Path
-from typing import Any
-from typing import Generator
-from typing import Iterable
-from typing import List
-from typing import Optional
-from typing import Tuple
-
-
-class OutputConsumer(ABC):
- """Base class for the output consumers."""
-
- @abstractmethod
- def feed(self, line: str) -> None:
- """Feed new line to the consumer."""
-
-
-class RunningCommand:
- """Running command."""
-
- def __init__(self, process: subprocess.Popen) -> None:
- """Init running command instance."""
- self.process = process
- self.output_consumers: List[OutputConsumer] = []
-
- def is_alive(self) -> bool:
- """Return true if process is still alive."""
- return self.process.poll() is None
-
- def exit_code(self) -> Optional[int]:
- """Return process's return code."""
- return self.process.poll()
-
- def stdout(self) -> Iterable[str]:
- """Return std output of the process."""
- assert self.process.stdout is not None
-
- for line in self.process.stdout:
- yield line
-
- def kill(self) -> None:
- """Kill the process."""
- self.process.kill()
-
- def send_signal(self, signal_num: int) -> None:
- """Send signal to the process."""
- self.process.send_signal(signal_num)
-
- def consume_output(self) -> None:
- """Pass program's output to the consumers."""
- if self.process is None or not self.output_consumers:
- return
-
- for line in self.stdout():
- for consumer in self.output_consumers:
- consumer.feed(line)
-
- def stop(
- self, wait: bool = True, num_of_attempts: int = 5, interval: float = 0.5
- ) -> None:
- """Stop execution."""
- try:
- if not self.is_alive():
- return
-
- self.process.send_signal(signal.SIGINT)
- self.consume_output()
-
- if not wait:
- return
-
- for _ in range(num_of_attempts):
- time.sleep(interval)
- if not self.is_alive():
- break
- else:
- raise Exception("Unable to stop running command")
- finally:
- self._close_fd()
-
- def _close_fd(self) -> None:
- """Close file descriptors."""
-
- def close(file_descriptor: Any) -> None:
- """Check and close file."""
- if file_descriptor is not None and hasattr(file_descriptor, "close"):
- file_descriptor.close()
-
- close(self.process.stdout)
- close(self.process.stderr)
-
- def wait(self, redirect_output: bool = False) -> None:
- """Redirect process output to stdout and wait for completion."""
- if redirect_output:
- for line in self.stdout():
- print(line, end="")
-
- self.process.wait()
-
-
-class CommandExecutor:
- """Command executor."""
-
- @staticmethod
- def execute(command: List[str]) -> Tuple[int, bytes, bytes]:
- """Execute the command."""
- result = subprocess.run(
- command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
- )
-
- return (result.returncode, result.stdout, result.stderr)
-
- @staticmethod
- def submit(command: List[str]) -> RunningCommand:
- """Submit command for the execution."""
- process = subprocess.Popen( # pylint: disable=consider-using-with
- command,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, # redirect command stderr to stdout
- universal_newlines=True,
- bufsize=1,
- )
-
- return RunningCommand(process)
-
-
-@contextmanager
-def working_directory(
- working_dir: Path, create_dir: bool = False
-) -> Generator[Path, None, None]:
- """Temporary change working directory."""
- current_working_dir = Path.cwd()
-
- if create_dir:
- working_dir.mkdir()
-
- os.chdir(working_dir)
-
- try:
- yield working_dir
- finally:
- os.chdir(current_working_dir)