aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/backend/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mlia/backend/manager.py')
-rw-r--r--src/mlia/backend/manager.py177
1 files changed, 61 insertions, 116 deletions
diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py
index 3a1016c..8d8246d 100644
--- a/src/mlia/backend/manager.py
+++ b/src/mlia/backend/manager.py
@@ -2,27 +2,25 @@
# SPDX-License-Identifier: Apache-2.0
"""Module for backend integration."""
import logging
-import re
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
from pathlib import Path
-from typing import Any
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
+from typing import Set
from typing import Tuple
from mlia.backend.application import get_available_applications
from mlia.backend.application import install_application
-from mlia.backend.common import DataPaths
from mlia.backend.execution import ExecutionContext
from mlia.backend.execution import run_application
+from mlia.backend.output_consumer import Base64OutputConsumer
+from mlia.backend.output_consumer import OutputConsumer
from mlia.backend.system import get_available_systems
from mlia.backend.system import install_system
-from mlia.utils.proc import OutputConsumer
-from mlia.utils.proc import RunningCommand
logger = logging.getLogger(__name__)
@@ -128,89 +126,55 @@ class ExecutionParams:
system: str
application_params: List[str]
system_params: List[str]
- deploy_params: List[str]
class LogWriter(OutputConsumer):
"""Redirect output to the logger."""
- def feed(self, line: str) -> None:
+ def feed(self, line: str) -> bool:
"""Process line from the output."""
logger.debug(line.strip())
+ return False
-class GenericInferenceOutputParser(OutputConsumer):
+class GenericInferenceOutputParser(Base64OutputConsumer):
"""Generic inference app output parser."""
- PATTERNS = {
- name: tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns)
- for name, patterns in (
- (
- "npu_active_cycles",
- (
- r"NPU ACTIVE cycles: (?P<value>\d+)",
- r"NPU ACTIVE: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_idle_cycles",
- (
- r"NPU IDLE cycles: (?P<value>\d+)",
- r"NPU IDLE: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_total_cycles",
- (
- r"NPU TOTAL cycles: (?P<value>\d+)",
- r"NPU TOTAL: (?P<value>\d+) cycles",
- ),
- ),
- (
- "npu_axi0_rd_data_beat_received",
- (
- r"NPU AXI0_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
- r"NPU AXI0_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
- ),
- ),
- (
- "npu_axi0_wr_data_beat_written",
- (
- r"NPU AXI0_WR_DATA_BEAT_WRITTEN beats: (?P<value>\d+)",
- r"NPU AXI0_WR_DATA_BEAT_WRITTEN: (?P<value>\d+) beats",
- ),
- ),
- (
- "npu_axi1_rd_data_beat_received",
- (
- r"NPU AXI1_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
- r"NPU AXI1_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
- ),
- ),
- )
- }
-
def __init__(self) -> None:
"""Init generic inference output parser instance."""
- self.result: Dict = {}
-
- def feed(self, line: str) -> None:
- """Feed new line to the parser."""
- for name, patterns in self.PATTERNS.items():
- for pattern in patterns:
- match = pattern.search(line)
-
- if match:
- self.result[name] = int(match["value"])
- return
+ super().__init__()
+ self._map = {
+ "NPU ACTIVE": "npu_active_cycles",
+ "NPU IDLE": "npu_idle_cycles",
+ "NPU TOTAL": "npu_total_cycles",
+ "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received",
+ "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written",
+ "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received",
+ }
+
+ @property
+ def result(self) -> Dict:
+ """Merge the raw results and map the names to the right output names."""
+ merged_result = {}
+ for raw_result in self.parsed_output:
+ for profiling_result in raw_result:
+ for sample in profiling_result["samples"]:
+ name, values = (sample["name"], sample["value"])
+ if name in merged_result:
+ raise KeyError(
+ f"Duplicate key '{name}' in base64 output.",
+ )
+ new_name = self._map[name]
+ merged_result[new_name] = values[0]
+ return merged_result
def is_ready(self) -> bool:
"""Return true if all expected data has been parsed."""
- return self.result.keys() == self.PATTERNS.keys()
+ return set(self.result.keys()) == set(self._map.values())
- def missed_keys(self) -> List[str]:
- """Return list of the keys that have not been found in the output."""
- return sorted(self.PATTERNS.keys() - self.result.keys())
+ def missed_keys(self) -> Set[str]:
+ """Return a set of the keys that have not been found in the output."""
+ return set(self._map.values()) - set(self.result.keys())
class BackendRunner:
@@ -274,24 +238,12 @@ class BackendRunner:
@staticmethod
def run_application(execution_params: ExecutionParams) -> ExecutionContext:
"""Run requested application."""
-
- def to_data_paths(paths: str) -> DataPaths:
- """Split input into two and create new DataPaths object."""
- src, dst = paths.split(sep=":", maxsplit=1)
- return DataPaths(Path(src), dst)
-
- deploy_data_paths = [
- to_data_paths(paths) for paths in execution_params.deploy_params
- ]
-
ctx = run_application(
execution_params.application,
execution_params.application_params,
execution_params.system,
execution_params.system_params,
- deploy_data_paths,
)
-
return ctx
@staticmethod
@@ -305,7 +257,6 @@ class GenericInferenceRunner(ABC):
def __init__(self, backend_runner: BackendRunner):
"""Init generic inference runner instance."""
self.backend_runner = backend_runner
- self.running_inference: Optional[RunningCommand] = None
def run(
self, model_info: ModelInfo, output_consumers: List[OutputConsumer]
@@ -315,27 +266,12 @@ class GenericInferenceRunner(ABC):
ctx = self.backend_runner.run_application(execution_params)
if ctx.stdout is not None:
- self.consume_output(ctx.stdout, output_consumers)
-
- def stop(self) -> None:
- """Stop running inference."""
- if self.running_inference is None:
- return
-
- self.running_inference.stop()
+ ctx.stdout = self.consume_output(ctx.stdout, output_consumers)
@abstractmethod
def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
"""Get execution params for the provided model."""
- def __enter__(self) -> "GenericInferenceRunner":
- """Enter context."""
- return self
-
- def __exit__(self, *_args: Any) -> None:
- """Exit context."""
- self.stop()
-
def check_system_and_application(self, system_name: str, app_name: str) -> None:
"""Check if requested system and application installed."""
if not self.backend_runner.is_system_installed(system_name):
@@ -348,11 +284,23 @@ class GenericInferenceRunner(ABC):
)
@staticmethod
- def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> None:
- """Pass program's output to the consumers."""
- for line in output.decode("utf8").splitlines():
+ def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> bytearray:
+ """
+ Pass program's output to the consumers and filter it.
+
+ Returns the filtered output.
+ """
+ filtered_output = bytearray()
+ for line_bytes in output.splitlines():
+ line = line_bytes.decode("utf-8")
+ remove_line = False
for consumer in consumers:
- consumer.feed(line)
+ if consumer.feed(line):
+ remove_line = True
+ if not remove_line:
+ filtered_output.extend(line_bytes)
+
+ return filtered_output
class GenericInferenceRunnerEthosU(GenericInferenceRunner):
@@ -408,7 +356,6 @@ class GenericInferenceRunnerEthosU(GenericInferenceRunner):
self.system_name,
[],
system_params,
- [],
)
@@ -422,20 +369,18 @@ def estimate_performance(
model_info: ModelInfo, device_info: DeviceInfo, backend: str
) -> PerformanceMetrics:
"""Get performance estimations."""
- with get_generic_runner(device_info, backend) as generic_runner:
- output_parser = GenericInferenceOutputParser()
- output_consumers = [output_parser, LogWriter()]
+ output_parser = GenericInferenceOutputParser()
+ output_consumers = [output_parser, LogWriter()]
- generic_runner.run(model_info, output_consumers)
+ generic_runner = get_generic_runner(device_info, backend)
+ generic_runner.run(model_info, output_consumers)
- if not output_parser.is_ready():
- missed_data = ",".join(output_parser.missed_keys())
- logger.debug(
- "Unable to get performance metrics, missed data %s", missed_data
- )
- raise Exception("Unable to get performance metrics, insufficient data")
+ if not output_parser.is_ready():
+ missed_data = ",".join(output_parser.missed_keys())
+ logger.debug("Unable to get performance metrics, missed data %s", missed_data)
+ raise Exception("Unable to get performance metrics, insufficient data")
- return PerformanceMetrics(**output_parser.result)
+ return PerformanceMetrics(**output_parser.result)
def get_backend_runner() -> BackendRunner: