aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/utils/proc.py
diff options
context:
space:
mode:
authorBenjamin Klimczak <benjamin.klimczak@arm.com>2022-06-28 10:29:35 +0100
committerBenjamin Klimczak <benjamin.klimczak@arm.com>2022-07-08 10:57:19 +0100
commitc9b4089b3037b5943565d76242d3016b8776f8d2 (patch)
tree3de24f79dedf0f26f492a7fa1562bf684e13a055 /src/mlia/utils/proc.py
parentba2c7fcccf37e8c81946f0776714c64f73191787 (diff)
downloadmlia-c9b4089b3037b5943565d76242d3016b8776f8d2.tar.gz
MLIA-546 Merge AIET into MLIA
Merge the deprecated AIET interface for backend execution into MLIA: - Execute backends directly (without subprocess and the aiet CLI) - Fix issues with the unit tests - Remove src/aiet and tests/aiet - Re-factor code to replace 'aiet' with 'backend' - Adapt and improve unit tests after re-factoring - Remove dependencies that are not needed anymore (click and cloup) Change-Id: I450734c6a3f705ba9afde41862b29e797e511f7c
Diffstat (limited to 'src/mlia/utils/proc.py')
-rw-r--r--src/mlia/utils/proc.py20
1 files changed, 4 insertions, 16 deletions
diff --git a/src/mlia/utils/proc.py b/src/mlia/utils/proc.py
index 39aca43..18a4305 100644
--- a/src/mlia/utils/proc.py
+++ b/src/mlia/utils/proc.py
@@ -8,7 +8,6 @@ import time
from abc import ABC
from abc import abstractmethod
from contextlib import contextmanager
-from contextlib import suppress
from pathlib import Path
from typing import Any
from typing import Generator
@@ -23,7 +22,7 @@ class OutputConsumer(ABC):
@abstractmethod
def feed(self, line: str) -> None:
- """Feed new line to the consumerr."""
+ """Feed new line to the consumer."""
class RunningCommand:
@@ -32,7 +31,7 @@ class RunningCommand:
def __init__(self, process: subprocess.Popen) -> None:
"""Init running command instance."""
self.process = process
- self._output_consumers: Optional[List[OutputConsumer]] = None
+ self.output_consumers: List[OutputConsumer] = []
def is_alive(self) -> bool:
"""Return true if process is still alive."""
@@ -57,25 +56,14 @@ class RunningCommand:
"""Send signal to the process."""
self.process.send_signal(signal_num)
- @property
- def output_consumers(self) -> Optional[List[OutputConsumer]]:
- """Property output_consumers."""
- return self._output_consumers
-
- @output_consumers.setter
- def output_consumers(self, output_consumers: List[OutputConsumer]) -> None:
- """Set output consumers."""
- self._output_consumers = output_consumers
-
def consume_output(self) -> None:
"""Pass program's output to the consumers."""
- if self.process is None or self.output_consumers is None:
+ if self.process is None or not self.output_consumers:
return
for line in self.stdout():
for consumer in self.output_consumers:
- with suppress():
- consumer.feed(line)
+ consumer.feed(line)
def stop(
self, wait: bool = True, num_of_attempts: int = 5, interval: float = 0.5