aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/utils/proc.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mlia/utils/proc.py')
-rw-r--r--src/mlia/utils/proc.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/src/mlia/utils/proc.py b/src/mlia/utils/proc.py
new file mode 100644
index 0000000..39aca43
--- /dev/null
+++ b/src/mlia/utils/proc.py
@@ -0,0 +1,164 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Utils related to process management."""
+import os
+import signal
+import subprocess
+import time
+from abc import ABC
+from abc import abstractmethod
+from contextlib import contextmanager
+from contextlib import suppress
+from pathlib import Path
+from typing import Any
+from typing import Generator
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+
+class OutputConsumer(ABC):
+ """Base class for the output consumers."""
+
+ @abstractmethod
+ def feed(self, line: str) -> None:
+ """Feed new line to the consumerr."""
+
+
+class RunningCommand:
+ """Running command."""
+
+ def __init__(self, process: subprocess.Popen) -> None:
+ """Init running command instance."""
+ self.process = process
+ self._output_consumers: Optional[List[OutputConsumer]] = None
+
+ def is_alive(self) -> bool:
+ """Return true if process is still alive."""
+ return self.process.poll() is None
+
+ def exit_code(self) -> Optional[int]:
+ """Return process's return code."""
+ return self.process.poll()
+
+ def stdout(self) -> Iterable[str]:
+ """Return std output of the process."""
+ assert self.process.stdout is not None
+
+ for line in self.process.stdout:
+ yield line
+
+ def kill(self) -> None:
+ """Kill the process."""
+ self.process.kill()
+
+ def send_signal(self, signal_num: int) -> None:
+ """Send signal to the process."""
+ self.process.send_signal(signal_num)
+
+ @property
+ def output_consumers(self) -> Optional[List[OutputConsumer]]:
+ """Property output_consumers."""
+ return self._output_consumers
+
+ @output_consumers.setter
+ def output_consumers(self, output_consumers: List[OutputConsumer]) -> None:
+ """Set output consumers."""
+ self._output_consumers = output_consumers
+
+ def consume_output(self) -> None:
+ """Pass program's output to the consumers."""
+ if self.process is None or self.output_consumers is None:
+ return
+
+ for line in self.stdout():
+ for consumer in self.output_consumers:
+ with suppress():
+ consumer.feed(line)
+
+ def stop(
+ self, wait: bool = True, num_of_attempts: int = 5, interval: float = 0.5
+ ) -> None:
+ """Stop execution."""
+ try:
+ if not self.is_alive():
+ return
+
+ self.process.send_signal(signal.SIGINT)
+ self.consume_output()
+
+ if not wait:
+ return
+
+ for _ in range(num_of_attempts):
+ time.sleep(interval)
+ if not self.is_alive():
+ break
+ else:
+ raise Exception("Unable to stop running command")
+ finally:
+ self._close_fd()
+
+ def _close_fd(self) -> None:
+ """Close file descriptors."""
+
+ def close(file_descriptor: Any) -> None:
+ """Check and close file."""
+ if file_descriptor is not None and hasattr(file_descriptor, "close"):
+ file_descriptor.close()
+
+ close(self.process.stdout)
+ close(self.process.stderr)
+
+ def wait(self, redirect_output: bool = False) -> None:
+ """Redirect process output to stdout and wait for completion."""
+ if redirect_output:
+ for line in self.stdout():
+ print(line, end="")
+
+ self.process.wait()
+
+
+class CommandExecutor:
+ """Command executor."""
+
+ @staticmethod
+ def execute(command: List[str]) -> Tuple[int, bytes, bytes]:
+ """Execute the command."""
+ result = subprocess.run(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
+ )
+
+ return (result.returncode, result.stdout, result.stderr)
+
+ @staticmethod
+ def submit(command: List[str]) -> RunningCommand:
+ """Submit command for the execution."""
+ process = subprocess.Popen( # pylint: disable=consider-using-with
+ command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, # redirect command stderr to stdout
+ universal_newlines=True,
+ bufsize=1,
+ )
+
+ return RunningCommand(process)
+
+
+@contextmanager
+def working_directory(
+ working_dir: Path, create_dir: bool = False
+) -> Generator[Path, None, None]:
+ """Temporary change working directory."""
+ current_working_dir = Path.cwd()
+
+ if create_dir:
+ working_dir.mkdir()
+
+ os.chdir(working_dir)
+
+ try:
+ yield working_dir
+ finally:
+ os.chdir(current_working_dir)