aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/core/workflow.py
diff options
context:
space:
mode:
authorDmitrii Agibov <dmitrii.agibov@arm.com>2022-09-08 14:24:39 +0100
committerDmitrii Agibov <dmitrii.agibov@arm.com>2022-09-09 17:21:48 +0100
commitf5b293d0927506c2a979a091bf0d07ecc78fa181 (patch)
tree4de585b7cb6ed34da8237063752270189a730a41 /src/mlia/core/workflow.py
parentcde0c6ee140bd108849bff40467d8f18ffc332ef (diff)
downloadmlia-f5b293d0927506c2a979a091bf0d07ecc78fa181.tar.gz
MLIA-386 Simplify typing in the source code
- Enable deferred annotations evaluation - Use builtin types for type hints whenever possible - Use | syntax for union types - Rename mlia.core._typing into mlia.core.typing Change-Id: I3f6ffc02fa069c589bdd9e8bddbccd504285427a
Diffstat (limited to 'src/mlia/core/workflow.py')
-rw-r--r--src/mlia/core/workflow.py15
1 files changed, 7 insertions, 8 deletions
diff --git a/src/mlia/core/workflow.py b/src/mlia/core/workflow.py
index 03f3d1c..d862a86 100644
--- a/src/mlia/core/workflow.py
+++ b/src/mlia/core/workflow.py
@@ -5,16 +5,15 @@
This module contains implementation of the workflow
executors.
"""
+from __future__ import annotations
+
import itertools
from abc import ABC
from abc import abstractmethod
from functools import wraps
from typing import Any
from typing import Callable
-from typing import List
-from typing import Optional
from typing import Sequence
-from typing import Tuple
from mlia.core.advice_generation import Advice
from mlia.core.advice_generation import AdviceEvent
@@ -57,7 +56,7 @@ STAGE_ANALYSIS = (DataAnalysisStageStartedEvent(), DataAnalysisStageFinishedEven
STAGE_ADVICE = (AdviceStageStartedEvent(), AdviceStageFinishedEvent())
-def on_stage(stage_events: Tuple[Event, Event]) -> Callable:
+def on_stage(stage_events: tuple[Event, Event]) -> Callable:
"""Mark start/finish of the stage with appropriate events."""
def wrapper(method: Callable) -> Callable:
@@ -87,7 +86,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor):
collectors: Sequence[DataCollector],
analyzers: Sequence[DataAnalyzer],
producers: Sequence[AdviceProducer],
- startup_events: Optional[Sequence[Event]] = None,
+ startup_events: Sequence[Event] | None = None,
):
"""Init default workflow executor.
@@ -130,7 +129,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor):
self.publish(event)
@on_stage(STAGE_COLLECTION)
- def collect_data(self) -> List[DataItem]:
+ def collect_data(self) -> list[DataItem]:
"""Collect data.
Run each of data collector components and return list of
@@ -148,7 +147,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor):
return collected_data
@on_stage(STAGE_ANALYSIS)
- def analyze_data(self, collected_data: List[DataItem]) -> List[DataItem]:
+ def analyze_data(self, collected_data: list[DataItem]) -> list[DataItem]:
"""Analyze data.
Pass each collected data item into each data analyzer and
@@ -168,7 +167,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor):
return analyzed_data
@on_stage(STAGE_ADVICE)
- def produce_advice(self, analyzed_data: List[DataItem]) -> None:
+ def produce_advice(self, analyzed_data: list[DataItem]) -> None:
"""Produce advice.
Pass each analyzed data item into each advice producer and