aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/backend/corstone/performance.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mlia/backend/corstone/performance.py')
-rw-r--r--src/mlia/backend/corstone/performance.py233
1 files changed, 233 insertions, 0 deletions
diff --git a/src/mlia/backend/corstone/performance.py b/src/mlia/backend/corstone/performance.py
new file mode 100644
index 0000000..5aabfa5
--- /dev/null
+++ b/src/mlia/backend/corstone/performance.py
@@ -0,0 +1,233 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for backend integration."""
+from __future__ import annotations
+
+import logging
+from abc import ABC
+from abc import abstractmethod
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Literal
+
+from mlia.backend.executor.output_consumer import Base64OutputConsumer
+from mlia.backend.executor.output_consumer import OutputConsumer
+from mlia.backend.executor.runner import BackendRunner
+from mlia.backend.executor.runner import ExecutionParams
+from mlia.backend.install import get_application_name
+from mlia.backend.install import get_system_name
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class DeviceInfo:
+ """Device information."""
+
+ device_type: Literal["ethos-u55", "ethos-u65"]
+ mac: int
+
+
+@dataclass
+class ModelInfo:
+ """Model info."""
+
+ model_path: Path
+
+
+@dataclass
+class PerformanceMetrics:
+ """Performance metrics parsed from generic inference output."""
+
+ npu_active_cycles: int
+ npu_idle_cycles: int
+ npu_total_cycles: int
+ npu_axi0_rd_data_beat_received: int
+ npu_axi0_wr_data_beat_written: int
+ npu_axi1_rd_data_beat_received: int
+
+
+class LogWriter(OutputConsumer):
+ """Redirect output to the logger."""
+
+ def feed(self, line: str) -> bool:
+ """Process line from the output."""
+ logger.debug(line.strip())
+ return False
+
+
+class GenericInferenceOutputParser(Base64OutputConsumer):
+ """Generic inference app output parser."""
+
+ def __init__(self) -> None:
+ """Init generic inference output parser instance."""
+ super().__init__()
+ self._map = {
+ "NPU ACTIVE": "npu_active_cycles",
+ "NPU IDLE": "npu_idle_cycles",
+ "NPU TOTAL": "npu_total_cycles",
+ "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received",
+ "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written",
+ "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received",
+ }
+
+ @property
+ def result(self) -> dict:
+ """Merge the raw results and map the names to the right output names."""
+ merged_result = {}
+ for raw_result in self.parsed_output:
+ for profiling_result in raw_result:
+ for sample in profiling_result["samples"]:
+ name, values = (sample["name"], sample["value"])
+ if name in merged_result:
+ raise KeyError(
+ f"Duplicate key '{name}' in base64 output.",
+ )
+ new_name = self._map[name]
+ merged_result[new_name] = values[0]
+ return merged_result
+
+ def is_ready(self) -> bool:
+ """Return true if all expected data has been parsed."""
+ return set(self.result.keys()) == set(self._map.values())
+
+ def missed_keys(self) -> set[str]:
+ """Return a set of the keys that have not been found in the output."""
+ return set(self._map.values()) - set(self.result.keys())
+
+
+class GenericInferenceRunner(ABC):
+ """Abstract class for generic inference runner."""
+
+ def __init__(self, backend_runner: BackendRunner):
+ """Init generic inference runner instance."""
+ self.backend_runner = backend_runner
+
+ def run(
+ self, model_info: ModelInfo, output_consumers: list[OutputConsumer]
+ ) -> None:
+ """Run generic inference for the provided device/model."""
+ execution_params = self.get_execution_params(model_info)
+
+ ctx = self.backend_runner.run_application(execution_params)
+ if ctx.stdout is not None:
+ ctx.stdout = self.consume_output(ctx.stdout, output_consumers)
+
+ @abstractmethod
+ def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
+ """Get execution params for the provided model."""
+
+ def check_system_and_application(self, system_name: str, app_name: str) -> None:
+ """Check if requested system and application installed."""
+ if not self.backend_runner.is_system_installed(system_name):
+ raise Exception(f"System {system_name} is not installed")
+
+ if not self.backend_runner.is_application_installed(app_name, system_name):
+ raise Exception(
+ f"Application {app_name} for the system {system_name} "
+ "is not installed"
+ )
+
+ @staticmethod
+ def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray:
+ """
+ Pass program's output to the consumers and filter it.
+
+ Returns the filtered output.
+ """
+ filtered_output = bytearray()
+ for line_bytes in output.splitlines():
+ line = line_bytes.decode("utf-8")
+ remove_line = False
+ for consumer in consumers:
+ if consumer.feed(line):
+ remove_line = True
+ if not remove_line:
+ filtered_output.extend(line_bytes)
+
+ return filtered_output
+
+
+class GenericInferenceRunnerEthosU(GenericInferenceRunner):
+ """Generic inference runner on U55/65."""
+
+ def __init__(
+ self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str
+ ) -> None:
+ """Init generic inference runner instance."""
+ super().__init__(backend_runner)
+
+ system_name, app_name = self.resolve_system_and_app(device_info, backend)
+ self.system_name = system_name
+ self.app_name = app_name
+ self.device_info = device_info
+
+ @staticmethod
+ def resolve_system_and_app(
+ device_info: DeviceInfo, backend: str
+ ) -> tuple[str, str]:
+ """Find appropriate system and application for the provided device/backend."""
+ try:
+ system_name = get_system_name(backend, device_info.device_type)
+ except KeyError as ex:
+ raise RuntimeError(
+ f"Unsupported device {device_info.device_type} "
+ f"for backend {backend}"
+ ) from ex
+
+ try:
+ app_name = get_application_name(system_name)
+ except KeyError as err:
+ raise RuntimeError(f"System {system_name} is not installed") from err
+
+ return system_name, app_name
+
+ def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
+ """Get execution params for Ethos-U55/65."""
+ self.check_system_and_application(self.system_name, self.app_name)
+
+ system_params = [
+ f"mac={self.device_info.mac}",
+ f"input_file={model_info.model_path.absolute()}",
+ ]
+
+ return ExecutionParams(
+ self.app_name,
+ self.system_name,
+ [],
+ system_params,
+ )
+
+
+def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner:
+ """Get generic runner for provided device and backend."""
+ backend_runner = get_backend_runner()
+ return GenericInferenceRunnerEthosU(backend_runner, device_info, backend)
+
+
+def estimate_performance(
+ model_info: ModelInfo, device_info: DeviceInfo, backend: str
+) -> PerformanceMetrics:
+ """Get performance estimations."""
+ output_parser = GenericInferenceOutputParser()
+ output_consumers = [output_parser, LogWriter()]
+
+ generic_runner = get_generic_runner(device_info, backend)
+ generic_runner.run(model_info, output_consumers)
+
+ if not output_parser.is_ready():
+ missed_data = ",".join(output_parser.missed_keys())
+ logger.debug("Unable to get performance metrics, missed data %s", missed_data)
+ raise Exception("Unable to get performance metrics, insufficient data")
+
+ return PerformanceMetrics(**output_parser.result)
+
+
+def get_backend_runner() -> BackendRunner:
+ """
+ Return BackendRunner instance.
+
+ Note: This is needed for the unit tests.
+ """
+ return BackendRunner()