diff options
author | Benjamin Klimczak <benjamin.klimczak@arm.com> | 2022-07-11 12:33:42 +0100 |
---|---|---|
committer | Benjamin Klimczak <benjamin.klimczak@arm.com> | 2022-07-26 14:08:21 +0100 |
commit | 5d81f37de09efe10f90512e50252be9c36925fcf (patch) | |
tree | b4d7cdfd051da0a6e882bdfcf280fd7ca7b39e57 /src/mlia/backend/manager.py | |
parent | 7899b908c1fe6d86b92a80f3827ddd0ac05b674b (diff) | |
download | mlia-5d81f37de09efe10f90512e50252be9c36925fcf.tar.gz |
MLIA-551 Rework remains of AIET architecture
Re-factoring the code base to further merge the old AIET code into MLIA.
- Remove last traces of the backend type 'tool'
- Controlled systems removed, including SSH protocol, controller,
RunningCommand, locks etc.
- Build command / build dir and deploy functionality removed from
Applications and Systems
- Moving working_dir()
- Replace module 'output_parser' with new module 'output_consumer' and
merge Base64 parsing into it
- Change the output consumption to optionally remove (i.e. actually
consume) lines
- Use Base64 parsing in GenericInferenceOutputParser, replacing the
regex-based parsing and remove the now unused regex parsing
- Remove AIET reporting
- Pre-install applications by moving them to src/mlia/resources/backends
- Rename aiet-config.json to backend-config.json
- Move tests from tests/mlia/ to tests/
- Adapt unit tests to code changes
- Dependencies removed: paramiko, filelock, psutil
- Fix bug in corstone.py: The wrong resource directory was used which
broke the functionality to download backends.
- Use f-string formatting.
- Use logging instead of print.
Change-Id: I768bc3bb6b2eda57d219ad01be4a8e0a74167d76
Diffstat (limited to 'src/mlia/backend/manager.py')
-rw-r--r-- | src/mlia/backend/manager.py | 177 |
1 files changed, 61 insertions, 116 deletions
diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py index 3a1016c..8d8246d 100644 --- a/src/mlia/backend/manager.py +++ b/src/mlia/backend/manager.py @@ -2,27 +2,25 @@ # SPDX-License-Identifier: Apache-2.0 """Module for backend integration.""" import logging -import re from abc import ABC from abc import abstractmethod from dataclasses import dataclass from pathlib import Path -from typing import Any from typing import Dict from typing import List from typing import Literal from typing import Optional +from typing import Set from typing import Tuple from mlia.backend.application import get_available_applications from mlia.backend.application import install_application -from mlia.backend.common import DataPaths from mlia.backend.execution import ExecutionContext from mlia.backend.execution import run_application +from mlia.backend.output_consumer import Base64OutputConsumer +from mlia.backend.output_consumer import OutputConsumer from mlia.backend.system import get_available_systems from mlia.backend.system import install_system -from mlia.utils.proc import OutputConsumer -from mlia.utils.proc import RunningCommand logger = logging.getLogger(__name__) @@ -128,89 +126,55 @@ class ExecutionParams: system: str application_params: List[str] system_params: List[str] - deploy_params: List[str] class LogWriter(OutputConsumer): """Redirect output to the logger.""" - def feed(self, line: str) -> None: + def feed(self, line: str) -> bool: """Process line from the output.""" logger.debug(line.strip()) + return False -class GenericInferenceOutputParser(OutputConsumer): +class GenericInferenceOutputParser(Base64OutputConsumer): """Generic inference app output parser.""" - PATTERNS = { - name: tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns) - for name, patterns in ( - ( - "npu_active_cycles", - ( - r"NPU ACTIVE cycles: (?P<value>\d+)", - r"NPU ACTIVE: (?P<value>\d+) cycles", - ), - ), - ( - "npu_idle_cycles", - ( - r"NPU IDLE cycles: (?P<value>\d+)", - r"NPU IDLE: (?P<value>\d+) cycles", - ), - ), - ( - "npu_total_cycles", - ( - r"NPU TOTAL cycles: (?P<value>\d+)", - r"NPU TOTAL: (?P<value>\d+) cycles", - ), - ), - ( - "npu_axi0_rd_data_beat_received", - ( - r"NPU AXI0_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)", - r"NPU AXI0_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats", - ), - ), - ( - "npu_axi0_wr_data_beat_written", - ( - r"NPU AXI0_WR_DATA_BEAT_WRITTEN beats: (?P<value>\d+)", - r"NPU AXI0_WR_DATA_BEAT_WRITTEN: (?P<value>\d+) beats", - ), - ), - ( - "npu_axi1_rd_data_beat_received", - ( - r"NPU AXI1_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)", - r"NPU AXI1_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats", - ), - ), - ) - } - def __init__(self) -> None: """Init generic inference output parser instance.""" - self.result: Dict = {} - - def feed(self, line: str) -> None: - """Feed new line to the parser.""" - for name, patterns in self.PATTERNS.items(): - for pattern in patterns: - match = pattern.search(line) - - if match: - self.result[name] = int(match["value"]) - return + super().__init__() + self._map = { + "NPU ACTIVE": "npu_active_cycles", + "NPU IDLE": "npu_idle_cycles", + "NPU TOTAL": "npu_total_cycles", + "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received", + "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written", + "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received", + } + + @property + def result(self) -> Dict: + """Merge the raw results and map the names to the right output names.""" + merged_result = {} + for raw_result in self.parsed_output: + for profiling_result in raw_result: + for sample in profiling_result["samples"]: + name, values = (sample["name"], sample["value"]) + if name in merged_result: + raise KeyError( + f"Duplicate key '{name}' in base64 output.", + ) + new_name = self._map[name] + merged_result[new_name] = values[0] + return merged_result def is_ready(self) -> bool: """Return true if all expected data has been parsed.""" - return self.result.keys() == self.PATTERNS.keys() + return set(self.result.keys()) == set(self._map.values()) - def missed_keys(self) -> List[str]: - """Return list of the keys that have not been found in the output.""" - return sorted(self.PATTERNS.keys() - self.result.keys()) + def missed_keys(self) -> Set[str]: + """Return a set of the keys that have not been found in the output.""" + return set(self._map.values()) - set(self.result.keys()) class BackendRunner: @@ -274,24 +238,12 @@ class BackendRunner: @staticmethod def run_application(execution_params: ExecutionParams) -> ExecutionContext: """Run requested application.""" - - def to_data_paths(paths: str) -> DataPaths: - """Split input into two and create new DataPaths object.""" - src, dst = paths.split(sep=":", maxsplit=1) - return DataPaths(Path(src), dst) - - deploy_data_paths = [ - to_data_paths(paths) for paths in execution_params.deploy_params - ] - ctx = run_application( execution_params.application, execution_params.application_params, execution_params.system, execution_params.system_params, - deploy_data_paths, ) - return ctx @staticmethod @@ -305,7 +257,6 @@ class GenericInferenceRunner(ABC): def __init__(self, backend_runner: BackendRunner): """Init generic inference runner instance.""" self.backend_runner = backend_runner - self.running_inference: Optional[RunningCommand] = None def run( self, model_info: ModelInfo, output_consumers: List[OutputConsumer] @@ -315,27 +266,12 @@ class GenericInferenceRunner(ABC): ctx = self.backend_runner.run_application(execution_params) if ctx.stdout is not None: - self.consume_output(ctx.stdout, output_consumers) - - def stop(self) -> None: - """Stop running inference.""" - if self.running_inference is None: - return - - self.running_inference.stop() + ctx.stdout = self.consume_output(ctx.stdout, output_consumers) @abstractmethod def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: """Get execution params for the provided model.""" - def __enter__(self) -> "GenericInferenceRunner": - """Enter context.""" - return self - - def __exit__(self, *_args: Any) -> None: - """Exit context.""" - self.stop() - def check_system_and_application(self, system_name: str, app_name: str) -> None: """Check if requested system and application installed.""" if not self.backend_runner.is_system_installed(system_name): @@ -348,11 +284,23 @@ class GenericInferenceRunner(ABC): ) @staticmethod - def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> None: - """Pass program's output to the consumers.""" - for line in output.decode("utf8").splitlines(): + def consume_output(output: bytearray, consumers: List[OutputConsumer]) -> bytearray: + """ + Pass program's output to the consumers and filter it. + + Returns the filtered output. + """ + filtered_output = bytearray() + for line_bytes in output.splitlines(): + line = line_bytes.decode("utf-8") + remove_line = False for consumer in consumers: - consumer.feed(line) + if consumer.feed(line): + remove_line = True + if not remove_line: + filtered_output.extend(line_bytes) + + return filtered_output class GenericInferenceRunnerEthosU(GenericInferenceRunner): @@ -408,7 +356,6 @@ class GenericInferenceRunnerEthosU(GenericInferenceRunner): self.system_name, [], system_params, - [], ) @@ -422,20 +369,18 @@ def estimate_performance( model_info: ModelInfo, device_info: DeviceInfo, backend: str ) -> PerformanceMetrics: """Get performance estimations.""" - with get_generic_runner(device_info, backend) as generic_runner: - output_parser = GenericInferenceOutputParser() - output_consumers = [output_parser, LogWriter()] + output_parser = GenericInferenceOutputParser() + output_consumers = [output_parser, LogWriter()] - generic_runner.run(model_info, output_consumers) + generic_runner = get_generic_runner(device_info, backend) + generic_runner.run(model_info, output_consumers) - if not output_parser.is_ready(): - missed_data = ",".join(output_parser.missed_keys()) - logger.debug( - "Unable to get performance metrics, missed data %s", missed_data - ) - raise Exception("Unable to get performance metrics, insufficient data") + if not output_parser.is_ready(): + missed_data = ",".join(output_parser.missed_keys()) + logger.debug("Unable to get performance metrics, missed data %s", missed_data) + raise Exception("Unable to get performance metrics, insufficient data") - return PerformanceMetrics(**output_parser.result) + return PerformanceMetrics(**output_parser.result) def get_backend_runner() -> BackendRunner: |