aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/utils/proc.py
diff options
context:
space:
mode:
authorBenjamin Klimczak <benjamin.klimczak@arm.com>2022-07-11 12:33:42 +0100
committerBenjamin Klimczak <benjamin.klimczak@arm.com>2022-07-26 14:08:21 +0100
commit5d81f37de09efe10f90512e50252be9c36925fcf (patch)
treeb4d7cdfd051da0a6e882bdfcf280fd7ca7b39e57 /src/mlia/utils/proc.py
parent7899b908c1fe6d86b92a80f3827ddd0ac05b674b (diff)
downloadmlia-5d81f37de09efe10f90512e50252be9c36925fcf.tar.gz
MLIA-551 Rework remains of AIET architecture
Re-factoring the code base to further merge the old AIET code into MLIA. - Remove last traces of the backend type 'tool' - Controlled systems removed, including SSH protocol, controller, RunningCommand, locks etc. - Build command / build dir and deploy functionality removed from Applications and Systems - Moving working_dir() - Replace module 'output_parser' with new module 'output_consumer' and merge Base64 parsing into it - Change the output consumption to optionally remove (i.e. actually consume) lines - Use Base64 parsing in GenericInferenceOutputParser, replacing the regex-based parsing and remove the now unused regex parsing - Remove AIET reporting - Pre-install applications by moving them to src/mlia/resources/backends - Rename aiet-config.json to backend-config.json - Move tests from tests/mlia/ to tests/ - Adapt unit tests to code changes - Dependencies removed: paramiko, filelock, psutil - Fix bug in corstone.py: The wrong resource directory was used which broke the functionality to download backends. - Use f-string formatting. - Use logging instead of print. Change-Id: I768bc3bb6b2eda57d219ad01be4a8e0a74167d76
Diffstat (limited to 'src/mlia/utils/proc.py')
-rw-r--r--src/mlia/utils/proc.py152
1 files changed, 0 insertions, 152 deletions
diff --git a/src/mlia/utils/proc.py b/src/mlia/utils/proc.py
deleted file mode 100644
index 18a4305..0000000
--- a/src/mlia/utils/proc.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Utils related to process management."""
-import os
-import signal
-import subprocess
-import time
-from abc import ABC
-from abc import abstractmethod
-from contextlib import contextmanager
-from pathlib import Path
-from typing import Any
-from typing import Generator
-from typing import Iterable
-from typing import List
-from typing import Optional
-from typing import Tuple
-
-
-class OutputConsumer(ABC):
- """Base class for the output consumers."""
-
- @abstractmethod
- def feed(self, line: str) -> None:
- """Feed new line to the consumer."""
-
-
-class RunningCommand:
- """Running command."""
-
- def __init__(self, process: subprocess.Popen) -> None:
- """Init running command instance."""
- self.process = process
- self.output_consumers: List[OutputConsumer] = []
-
- def is_alive(self) -> bool:
- """Return true if process is still alive."""
- return self.process.poll() is None
-
- def exit_code(self) -> Optional[int]:
- """Return process's return code."""
- return self.process.poll()
-
- def stdout(self) -> Iterable[str]:
- """Return std output of the process."""
- assert self.process.stdout is not None
-
- for line in self.process.stdout:
- yield line
-
- def kill(self) -> None:
- """Kill the process."""
- self.process.kill()
-
- def send_signal(self, signal_num: int) -> None:
- """Send signal to the process."""
- self.process.send_signal(signal_num)
-
- def consume_output(self) -> None:
- """Pass program's output to the consumers."""
- if self.process is None or not self.output_consumers:
- return
-
- for line in self.stdout():
- for consumer in self.output_consumers:
- consumer.feed(line)
-
- def stop(
- self, wait: bool = True, num_of_attempts: int = 5, interval: float = 0.5
- ) -> None:
- """Stop execution."""
- try:
- if not self.is_alive():
- return
-
- self.process.send_signal(signal.SIGINT)
- self.consume_output()
-
- if not wait:
- return
-
- for _ in range(num_of_attempts):
- time.sleep(interval)
- if not self.is_alive():
- break
- else:
- raise Exception("Unable to stop running command")
- finally:
- self._close_fd()
-
- def _close_fd(self) -> None:
- """Close file descriptors."""
-
- def close(file_descriptor: Any) -> None:
- """Check and close file."""
- if file_descriptor is not None and hasattr(file_descriptor, "close"):
- file_descriptor.close()
-
- close(self.process.stdout)
- close(self.process.stderr)
-
- def wait(self, redirect_output: bool = False) -> None:
- """Redirect process output to stdout and wait for completion."""
- if redirect_output:
- for line in self.stdout():
- print(line, end="")
-
- self.process.wait()
-
-
-class CommandExecutor:
- """Command executor."""
-
- @staticmethod
- def execute(command: List[str]) -> Tuple[int, bytes, bytes]:
- """Execute the command."""
- result = subprocess.run(
- command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
- )
-
- return (result.returncode, result.stdout, result.stderr)
-
- @staticmethod
- def submit(command: List[str]) -> RunningCommand:
- """Submit command for the execution."""
- process = subprocess.Popen( # pylint: disable=consider-using-with
- command,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, # redirect command stderr to stdout
- universal_newlines=True,
- bufsize=1,
- )
-
- return RunningCommand(process)
-
-
-@contextmanager
-def working_directory(
- working_dir: Path, create_dir: bool = False
-) -> Generator[Path, None, None]:
- """Temporary change working directory."""
- current_working_dir = Path.cwd()
-
- if create_dir:
- working_dir.mkdir()
-
- os.chdir(working_dir)
-
- try:
- yield working_dir
- finally:
- os.chdir(current_working_dir)