aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/utils/proc.py
blob: 39aca43302b5cdb828724a97573654b820d47bd6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
"""Utils related to process management."""
import os
import signal
import subprocess
import time
from abc import ABC
from abc import abstractmethod
from contextlib import contextmanager
from contextlib import suppress
from pathlib import Path
from typing import Any
from typing import Generator
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple


class OutputConsumer(ABC):
    """Base class for the output consumers."""

    @abstractmethod
    def feed(self, line: str) -> None:
        """Feed new line to the consumerr."""


class RunningCommand:
    """Running command."""

    def __init__(self, process: subprocess.Popen) -> None:
        """Init running command instance."""
        self.process = process
        self._output_consumers: Optional[List[OutputConsumer]] = None

    def is_alive(self) -> bool:
        """Return true if process is still alive."""
        return self.process.poll() is None

    def exit_code(self) -> Optional[int]:
        """Return process's return code."""
        return self.process.poll()

    def stdout(self) -> Iterable[str]:
        """Return std output of the process."""
        assert self.process.stdout is not None

        for line in self.process.stdout:
            yield line

    def kill(self) -> None:
        """Kill the process."""
        self.process.kill()

    def send_signal(self, signal_num: int) -> None:
        """Send signal to the process."""
        self.process.send_signal(signal_num)

    @property
    def output_consumers(self) -> Optional[List[OutputConsumer]]:
        """Property output_consumers."""
        return self._output_consumers

    @output_consumers.setter
    def output_consumers(self, output_consumers: List[OutputConsumer]) -> None:
        """Set output consumers."""
        self._output_consumers = output_consumers

    def consume_output(self) -> None:
        """Pass program's output to the consumers."""
        if self.process is None or self.output_consumers is None:
            return

        for line in self.stdout():
            for consumer in self.output_consumers:
                with suppress():
                    consumer.feed(line)

    def stop(
        self, wait: bool = True, num_of_attempts: int = 5, interval: float = 0.5
    ) -> None:
        """Stop execution."""
        try:
            if not self.is_alive():
                return

            self.process.send_signal(signal.SIGINT)
            self.consume_output()

            if not wait:
                return

            for _ in range(num_of_attempts):
                time.sleep(interval)
                if not self.is_alive():
                    break
            else:
                raise Exception("Unable to stop running command")
        finally:
            self._close_fd()

    def _close_fd(self) -> None:
        """Close file descriptors."""

        def close(file_descriptor: Any) -> None:
            """Check and close file."""
            if file_descriptor is not None and hasattr(file_descriptor, "close"):
                file_descriptor.close()

        close(self.process.stdout)
        close(self.process.stderr)

    def wait(self, redirect_output: bool = False) -> None:
        """Redirect process output to stdout and wait for completion."""
        if redirect_output:
            for line in self.stdout():
                print(line, end="")

        self.process.wait()


class CommandExecutor:
    """Command executor."""

    @staticmethod
    def execute(command: List[str]) -> Tuple[int, bytes, bytes]:
        """Execute the command."""
        result = subprocess.run(
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
        )

        return (result.returncode, result.stdout, result.stderr)

    @staticmethod
    def submit(command: List[str]) -> RunningCommand:
        """Submit command for the execution."""
        process = subprocess.Popen(  # pylint: disable=consider-using-with
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # redirect command stderr to stdout
            universal_newlines=True,
            bufsize=1,
        )

        return RunningCommand(process)


@contextmanager
def working_directory(
    working_dir: Path, create_dir: bool = False
) -> Generator[Path, None, None]:
    """Temporary change working directory."""
    current_working_dir = Path.cwd()

    if create_dir:
        working_dir.mkdir()

    os.chdir(working_dir)

    try:
        yield working_dir
    finally:
        os.chdir(current_working_dir)