diff options
Diffstat (limited to 'src/mlia/core/workflow.py')
-rw-r--r-- | src/mlia/core/workflow.py | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/src/mlia/core/workflow.py b/src/mlia/core/workflow.py index 03f3d1c..d862a86 100644 --- a/src/mlia/core/workflow.py +++ b/src/mlia/core/workflow.py @@ -5,16 +5,15 @@ This module contains implementation of the workflow executors. """ +from __future__ import annotations + import itertools from abc import ABC from abc import abstractmethod from functools import wraps from typing import Any from typing import Callable -from typing import List -from typing import Optional from typing import Sequence -from typing import Tuple from mlia.core.advice_generation import Advice from mlia.core.advice_generation import AdviceEvent @@ -57,7 +56,7 @@ STAGE_ANALYSIS = (DataAnalysisStageStartedEvent(), DataAnalysisStageFinishedEven STAGE_ADVICE = (AdviceStageStartedEvent(), AdviceStageFinishedEvent()) -def on_stage(stage_events: Tuple[Event, Event]) -> Callable: +def on_stage(stage_events: tuple[Event, Event]) -> Callable: """Mark start/finish of the stage with appropriate events.""" def wrapper(method: Callable) -> Callable: @@ -87,7 +86,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor): collectors: Sequence[DataCollector], analyzers: Sequence[DataAnalyzer], producers: Sequence[AdviceProducer], - startup_events: Optional[Sequence[Event]] = None, + startup_events: Sequence[Event] | None = None, ): """Init default workflow executor. @@ -130,7 +129,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor): self.publish(event) @on_stage(STAGE_COLLECTION) - def collect_data(self) -> List[DataItem]: + def collect_data(self) -> list[DataItem]: """Collect data. Run each of data collector components and return list of @@ -148,7 +147,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor): return collected_data @on_stage(STAGE_ANALYSIS) - def analyze_data(self, collected_data: List[DataItem]) -> List[DataItem]: + def analyze_data(self, collected_data: list[DataItem]) -> list[DataItem]: """Analyze data. Pass each collected data item into each data analyzer and @@ -168,7 +167,7 @@ class DefaultWorkflowExecutor(WorkflowExecutor): return analyzed_data @on_stage(STAGE_ADVICE) - def produce_advice(self, analyzed_data: List[DataItem]) -> None: + def produce_advice(self, analyzed_data: list[DataItem]) -> None: """Produce advice. Pass each analyzed data item into each advice producer and |