diff options
Diffstat (limited to 'src/mlia/backend/system.py')
-rw-r--r-- | src/mlia/backend/system.py | 229 |
1 files changed, 59 insertions, 170 deletions
diff --git a/src/mlia/backend/system.py b/src/mlia/backend/system.py index 469083e..ff85bf3 100644 --- a/src/mlia/backend/system.py +++ b/src/mlia/backend/system.py @@ -6,9 +6,7 @@ from typing import Any from typing import cast from typing import Dict from typing import List -from typing import Optional from typing import Tuple -from typing import Union from mlia.backend.common import Backend from mlia.backend.common import ConfigurationException @@ -17,72 +15,12 @@ from mlia.backend.common import get_backend_directories from mlia.backend.common import load_config from mlia.backend.common import remove_backend from mlia.backend.config import SystemConfig -from mlia.backend.controller import SystemController -from mlia.backend.controller import SystemControllerSingleInstance from mlia.backend.fs import get_backends_path -from mlia.backend.protocol import ProtocolFactory -from mlia.backend.protocol import SupportsClose -from mlia.backend.protocol import SupportsConnection -from mlia.backend.protocol import SupportsDeploy +from mlia.backend.proc import run_and_wait from mlia.backend.source import create_destination_and_install from mlia.backend.source import get_source -def get_available_systems_directory_names() -> List[str]: - """Return a list of directory names for all avialable systems.""" - return [entry.name for entry in get_backend_directories("systems")] - - -def get_available_systems() -> List["System"]: - """Return a list with all available systems.""" - available_systems = [] - for config_json in get_backend_configs("systems"): - config_entries = cast(List[SystemConfig], (load_config(config_json))) - for config_entry in config_entries: - config_entry["config_location"] = config_json.parent.absolute() - system = load_system(config_entry) - available_systems.append(system) - - return sorted(available_systems, key=lambda system: system.name) - - -def get_system(system_name: str) -> Optional["System"]: - """Return a system instance with the same name passed as argument.""" - available_systems = get_available_systems() - for system in available_systems: - if system_name == system.name: - return system - return None - - -def install_system(source_path: Path) -> None: - """Install new system.""" - try: - source = get_source(source_path) - config = cast(List[SystemConfig], source.config()) - systems_to_install = [load_system(entry) for entry in config] - except Exception as error: - raise ConfigurationException("Unable to read system definition") from error - - if not systems_to_install: - raise ConfigurationException("No system definition found") - - available_systems = get_available_systems() - already_installed = [s for s in systems_to_install if s in available_systems] - if already_installed: - names = [system.name for system in already_installed] - raise ConfigurationException( - "Systems [{}] are already installed".format(",".join(names)) - ) - - create_destination_and_install(source, get_backends_path("systems")) - - -def remove_system(directory_name: str) -> None: - """Remove system.""" - remove_backend(directory_name, "systems") - - class System(Backend): """System class.""" @@ -90,59 +28,33 @@ class System(Backend): """Construct the System class using the dictionary passed.""" super().__init__(config) - self._setup_data_transfer(config) self._setup_reporting(config) - def _setup_data_transfer(self, config: SystemConfig) -> None: - data_transfer_config = config.get("data_transfer") - protocol = ProtocolFactory().get_protocol( - data_transfer_config, cwd=self.config_location - ) - self.protocol = protocol - def _setup_reporting(self, config: SystemConfig) -> None: self.reporting = config.get("reporting") - def run(self, command: str, retry: bool = True) -> Tuple[int, bytearray, bytearray]: + def run(self, command: str) -> Tuple[int, bytearray, bytearray]: """ Run command on the system. Returns a tuple: (exit_code, stdout, stderr) """ - return self.protocol.run(command, retry) - - def deploy(self, src: Path, dst: str, retry: bool = True) -> None: - """Deploy files to the system.""" - if isinstance(self.protocol, SupportsDeploy): - self.protocol.deploy(src, dst, retry) - - @property - def supports_deploy(self) -> bool: - """Check if protocol supports deploy operation.""" - return isinstance(self.protocol, SupportsDeploy) - - @property - def connectable(self) -> bool: - """Check if protocol supports connection.""" - return isinstance(self.protocol, SupportsConnection) - - def establish_connection(self) -> bool: - """Establish connection with the system.""" - if not isinstance(self.protocol, SupportsConnection): + cwd = self.config_location + if not isinstance(cwd, Path) or not cwd.is_dir(): raise ConfigurationException( - "System {} does not support connections".format(self.name) + f"System has invalid config location: {cwd}", ) - return self.protocol.establish_connection() + stdout = bytearray() + stderr = bytearray() - def connection_details(self) -> Tuple[str, int]: - """Return connection details.""" - if not isinstance(self.protocol, SupportsConnection): - raise ConfigurationException( - "System {} does not support connections".format(self.name) - ) - - return self.protocol.connection_details() + return run_and_wait( + command, + cwd=cwd, + terminate_on_error=True, + out=stdout, + err=stderr, + ) def __eq__(self, other: object) -> bool: """Overload operator ==.""" @@ -157,7 +69,6 @@ class System(Backend): "type": "system", "name": self.name, "description": self.description, - "data_transfer_protocol": self.protocol.protocol, "commands": self._get_command_details(), "annotations": self.annotations, } @@ -165,88 +76,66 @@ class System(Backend): return output -class StandaloneSystem(System): - """StandaloneSystem class.""" - +def get_available_systems_directory_names() -> List[str]: + """Return a list of directory names for all avialable systems.""" + return [entry.name for entry in get_backend_directories("systems")] -def get_controller( - single_instance: bool, pid_file_path: Optional[Path] = None -) -> SystemController: - """Get system controller.""" - if single_instance: - return SystemControllerSingleInstance(pid_file_path) - return SystemController() +def get_available_systems() -> List[System]: + """Return a list with all available systems.""" + available_systems = [] + for config_json in get_backend_configs("systems"): + config_entries = cast(List[SystemConfig], (load_config(config_json))) + for config_entry in config_entries: + config_entry["config_location"] = config_json.parent.absolute() + system = load_system(config_entry) + available_systems.append(system) + return sorted(available_systems, key=lambda system: system.name) -class ControlledSystem(System): - """ControlledSystem class.""" - def __init__(self, config: SystemConfig): - """Construct the ControlledSystem class using the dictionary passed.""" - super().__init__(config) - self.controller: Optional[SystemController] = None - - def start( - self, - commands: List[str], - single_instance: bool = True, - pid_file_path: Optional[Path] = None, - ) -> None: - """Launch the system.""" - if ( - not isinstance(self.config_location, Path) - or not self.config_location.is_dir() - ): - raise ConfigurationException( - "System {} has wrong config location".format(self.name) - ) +def get_system(system_name: str) -> System: + """Return a system instance with the same name passed as argument.""" + available_systems = get_available_systems() + for system in available_systems: + if system_name == system.name: + return system + raise ConfigurationException(f"System '{system_name}' not found.") - self.controller = get_controller(single_instance, pid_file_path) - self.controller.start(commands, self.config_location) - def is_running(self) -> bool: - """Check if system is running.""" - if not self.controller: - return False +def install_system(source_path: Path) -> None: + """Install new system.""" + try: + source = get_source(source_path) + config = cast(List[SystemConfig], source.config()) + systems_to_install = [load_system(entry) for entry in config] + except Exception as error: + raise ConfigurationException("Unable to read system definition") from error - return self.controller.is_running() + if not systems_to_install: + raise ConfigurationException("No system definition found") - def get_output(self) -> Tuple[str, str]: - """Return system output.""" - if not self.controller: - return "", "" + available_systems = get_available_systems() + already_installed = [s for s in systems_to_install if s in available_systems] + if already_installed: + names = [system.name for system in already_installed] + raise ConfigurationException( + f"Systems [{','.join(names)}] are already installed." + ) - return self.controller.get_output() + create_destination_and_install(source, get_backends_path("systems")) - def stop(self, wait: bool = False) -> None: - """Stop the system.""" - if not self.controller: - raise Exception("System has not been started") - if isinstance(self.protocol, SupportsClose): - try: - self.protocol.close() - except Exception as error: # pylint: disable=broad-except - print(error) - self.controller.stop(wait) +def remove_system(directory_name: str) -> None: + """Remove system.""" + remove_backend(directory_name, "systems") -def load_system(config: SystemConfig) -> Union[StandaloneSystem, ControlledSystem]: +def load_system(config: SystemConfig) -> System: """Load system based on it's execution type.""" - data_transfer = config.get("data_transfer", {}) - protocol = data_transfer.get("protocol") populate_shared_params(config) - if protocol == "ssh": - return ControlledSystem(config) - - if protocol == "local": - return StandaloneSystem(config) - - raise ConfigurationException( - "Unsupported execution type for protocol {}".format(protocol) - ) + return System(config) def populate_shared_params(config: SystemConfig) -> None: @@ -264,7 +153,7 @@ def populate_shared_params(config: SystemConfig) -> None: raise ConfigurationException("All shared parameters should have aliases") commands = config.get("commands", {}) - for cmd_name in ["build", "run"]: + for cmd_name in ["run"]: command = commands.get(cmd_name) if command is None: commands[cmd_name] = [] @@ -275,7 +164,7 @@ def populate_shared_params(config: SystemConfig) -> None: only_aliases = all(p.get("alias") for p in cmd_user_params) if not only_aliases: raise ConfigurationException( - "All parameters for command {} should have aliases".format(cmd_name) + f"All parameters for command {cmd_name} should have aliases." ) merged_by_alias = { **{p.get("alias"): p for p in shared_user_params}, |