diff options
Diffstat (limited to 'src/mlia/backend/corstone/performance.py')
-rw-r--r-- | src/mlia/backend/corstone/performance.py | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/src/mlia/backend/corstone/performance.py b/src/mlia/backend/corstone/performance.py new file mode 100644 index 0000000..5aabfa5 --- /dev/null +++ b/src/mlia/backend/corstone/performance.py @@ -0,0 +1,233 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for backend integration.""" +from __future__ import annotations + +import logging +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +from mlia.backend.executor.output_consumer import Base64OutputConsumer +from mlia.backend.executor.output_consumer import OutputConsumer +from mlia.backend.executor.runner import BackendRunner +from mlia.backend.executor.runner import ExecutionParams +from mlia.backend.install import get_application_name +from mlia.backend.install import get_system_name + + +logger = logging.getLogger(__name__) + + +@dataclass +class DeviceInfo: + """Device information.""" + + device_type: Literal["ethos-u55", "ethos-u65"] + mac: int + + +@dataclass +class ModelInfo: + """Model info.""" + + model_path: Path + + +@dataclass +class PerformanceMetrics: + """Performance metrics parsed from generic inference output.""" + + npu_active_cycles: int + npu_idle_cycles: int + npu_total_cycles: int + npu_axi0_rd_data_beat_received: int + npu_axi0_wr_data_beat_written: int + npu_axi1_rd_data_beat_received: int + + +class LogWriter(OutputConsumer): + """Redirect output to the logger.""" + + def feed(self, line: str) -> bool: + """Process line from the output.""" + logger.debug(line.strip()) + return False + + +class GenericInferenceOutputParser(Base64OutputConsumer): + """Generic inference app output parser.""" + + def __init__(self) -> None: + """Init generic inference output parser instance.""" + super().__init__() + self._map = { + "NPU ACTIVE": "npu_active_cycles", + "NPU IDLE": "npu_idle_cycles", + "NPU TOTAL": "npu_total_cycles", + "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received", + "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written", + "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received", + } + + @property + def result(self) -> dict: + """Merge the raw results and map the names to the right output names.""" + merged_result = {} + for raw_result in self.parsed_output: + for profiling_result in raw_result: + for sample in profiling_result["samples"]: + name, values = (sample["name"], sample["value"]) + if name in merged_result: + raise KeyError( + f"Duplicate key '{name}' in base64 output.", + ) + new_name = self._map[name] + merged_result[new_name] = values[0] + return merged_result + + def is_ready(self) -> bool: + """Return true if all expected data has been parsed.""" + return set(self.result.keys()) == set(self._map.values()) + + def missed_keys(self) -> set[str]: + """Return a set of the keys that have not been found in the output.""" + return set(self._map.values()) - set(self.result.keys()) + + +class GenericInferenceRunner(ABC): + """Abstract class for generic inference runner.""" + + def __init__(self, backend_runner: BackendRunner): + """Init generic inference runner instance.""" + self.backend_runner = backend_runner + + def run( + self, model_info: ModelInfo, output_consumers: list[OutputConsumer] + ) -> None: + """Run generic inference for the provided device/model.""" + execution_params = self.get_execution_params(model_info) + + ctx = self.backend_runner.run_application(execution_params) + if ctx.stdout is not None: + ctx.stdout = self.consume_output(ctx.stdout, output_consumers) + + @abstractmethod + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for the provided model.""" + + def check_system_and_application(self, system_name: str, app_name: str) -> None: + """Check if requested system and application installed.""" + if not self.backend_runner.is_system_installed(system_name): + raise Exception(f"System {system_name} is not installed") + + if not self.backend_runner.is_application_installed(app_name, system_name): + raise Exception( + f"Application {app_name} for the system {system_name} " + "is not installed" + ) + + @staticmethod + def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray: + """ + Pass program's output to the consumers and filter it. + + Returns the filtered output. + """ + filtered_output = bytearray() + for line_bytes in output.splitlines(): + line = line_bytes.decode("utf-8") + remove_line = False + for consumer in consumers: + if consumer.feed(line): + remove_line = True + if not remove_line: + filtered_output.extend(line_bytes) + + return filtered_output + + +class GenericInferenceRunnerEthosU(GenericInferenceRunner): + """Generic inference runner on U55/65.""" + + def __init__( + self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str + ) -> None: + """Init generic inference runner instance.""" + super().__init__(backend_runner) + + system_name, app_name = self.resolve_system_and_app(device_info, backend) + self.system_name = system_name + self.app_name = app_name + self.device_info = device_info + + @staticmethod + def resolve_system_and_app( + device_info: DeviceInfo, backend: str + ) -> tuple[str, str]: + """Find appropriate system and application for the provided device/backend.""" + try: + system_name = get_system_name(backend, device_info.device_type) + except KeyError as ex: + raise RuntimeError( + f"Unsupported device {device_info.device_type} " + f"for backend {backend}" + ) from ex + + try: + app_name = get_application_name(system_name) + except KeyError as err: + raise RuntimeError(f"System {system_name} is not installed") from err + + return system_name, app_name + + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for Ethos-U55/65.""" + self.check_system_and_application(self.system_name, self.app_name) + + system_params = [ + f"mac={self.device_info.mac}", + f"input_file={model_info.model_path.absolute()}", + ] + + return ExecutionParams( + self.app_name, + self.system_name, + [], + system_params, + ) + + +def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner: + """Get generic runner for provided device and backend.""" + backend_runner = get_backend_runner() + return GenericInferenceRunnerEthosU(backend_runner, device_info, backend) + + +def estimate_performance( + model_info: ModelInfo, device_info: DeviceInfo, backend: str +) -> PerformanceMetrics: + """Get performance estimations.""" + output_parser = GenericInferenceOutputParser() + output_consumers = [output_parser, LogWriter()] + + generic_runner = get_generic_runner(device_info, backend) + generic_runner.run(model_info, output_consumers) + + if not output_parser.is_ready(): + missed_data = ",".join(output_parser.missed_keys()) + logger.debug("Unable to get performance metrics, missed data %s", missed_data) + raise Exception("Unable to get performance metrics, insufficient data") + + return PerformanceMetrics(**output_parser.result) + + +def get_backend_runner() -> BackendRunner: + """ + Return BackendRunner instance. + + Note: This is needed for the unit tests. + """ + return BackendRunner() |