aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/core/events.py
blob: 10aec868c8df5b71b13fbcfc28339d4d898eb298 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
"""Module for the events and related functionality.

This module represents one of the main component of the workflow -
events publishing and provides a way for delivering results to the
calling application.

Each component of the workflow can generate events of specific type.
Application can subscribe and react to those events.
"""
import traceback
import uuid
from abc import ABC
from abc import abstractmethod
from contextlib import contextmanager
from dataclasses import asdict
from dataclasses import dataclass
from dataclasses import field
from functools import singledispatchmethod
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple

from mlia.core.common import DataItem


@dataclass
class Event:
    """Base class for the events.

    This class is used as a root node of the events class hierarchy.
    """

    event_id: str = field(init=False)

    def __post_init__(self) -> None:
        """Generate unique ID for the event."""
        self.event_id = str(uuid.uuid4())

    def compare_without_id(self, other: "Event") -> bool:
        """Compare two events without event_id field."""
        if not isinstance(other, Event) or self.__class__ != other.__class__:
            return False

        self_as_dict = asdict(self)
        self_as_dict.pop("event_id")

        other_as_dict = asdict(other)
        other_as_dict.pop("event_id")

        return self_as_dict == other_as_dict


@dataclass
class ChildEvent(Event):
    """Child event.

    This class could be used to link event with the parent event.
    """

    parent_event_id: str


@dataclass
class ActionStartedEvent(Event):
    """Action started event.

    This event is published when some action has been started.
    """

    action_type: str
    params: Optional[Dict] = None


@dataclass
class SubActionEvent(ChildEvent):
    """SubAction event.

    This event could be used to represent some action during parent action.
    """

    action_type: str
    params: Optional[Dict] = None


@dataclass
class ActionFinishedEvent(ChildEvent):
    """Action finished event.

    This event is published when some action has been finished.
    """


@dataclass
class SystemEvent(Event):
    """System event.

    System event class represents events that published by components
    of the core module. Most common example is an workflow executor
    that publishes number of system events for starting/completion
    of different stages/workflow.

    Events that published by components outside of core module should not
    use this class as base class.
    """


@dataclass
class ExecutionStartedEvent(SystemEvent):
    """Execution started event.

    This event is published when workflow execution started.
    """


@dataclass
class ExecutionFinishedEvent(SystemEvent):
    """Execution finished event.

    This event is published when workflow execution finished.
    """


@dataclass
class ExecutionFailedEvent(SystemEvent):
    """Execution failed event."""

    err: Exception


@dataclass
class DataCollectionStageStartedEvent(SystemEvent):
    """Data collection stage started.

    This event is published when data collection stage started.
    """


@dataclass
class DataCollectorSkippedEvent(SystemEvent):
    """Data collector skipped event.

    This event is published when particular data collector can
    not provide data for the provided parameters.
    """

    data_collector: str
    reason: str


@dataclass
class DataCollectionStageFinishedEvent(SystemEvent):
    """Data collection stage finished.

    This event is published when data collection stage finished.
    """


@dataclass
class DataAnalysisStageStartedEvent(SystemEvent):
    """Data analysis stage started.

    This event is published when data analysis stage started.
    """


@dataclass
class DataAnalysisStageFinishedEvent(SystemEvent):
    """Data analysis stage finished.

    This event is published when data analysis stage finished.
    """


@dataclass
class AdviceStageStartedEvent(SystemEvent):
    """Advace producing stage started.

    This event is published when advice generation stage started.
    """


@dataclass
class AdviceStageFinishedEvent(SystemEvent):
    """Advace producing stage finished.

    This event is published when advice generation stage finished.
    """


@dataclass
class CollectedDataEvent(SystemEvent):
    """Collected data event.

    This event is published for every collected data item.

    :param data_item: collected data item
    """

    data_item: DataItem


@dataclass
class AnalyzedDataEvent(SystemEvent):
    """Analyzed data event.

    This event is published for every analyzed data item.

    :param data_item: analyzed data item
    """

    data_item: DataItem


class EventHandler:
    """Base class for the event handlers.

    Each event handler should derive from this base class.
    """

    def handle_event(self, event: Event) -> None:
        """Handle the event.

        By default all published events are being passed to each
        registered event handler. It is handler's responsibility
        to filter events that it interested in.
        """


class DebugEventHandler(EventHandler):
    """Event handler for debugging purposes.

    This handler could print every published event to the
    standard output.
    """

    def __init__(self, with_stacktrace: bool = False) -> None:
        """Init event handler.

        :param with_stacktrace: enable printing stacktrace of the
               place where event publishing occurred.
        """
        self.with_stacktrace = with_stacktrace

    def handle_event(self, event: Event) -> None:
        """Handle event."""
        print(f"Got event {event}")

        if self.with_stacktrace:
            traceback.print_stack()


class EventDispatcherMetaclass(type):
    """Metaclass for event dispatching.

    It could be tedious to check type of the published event
    inside event handler. Instead the following convention could be
    established: if method name of the class starts with some
    prefix then it is considered to be event handler of particular
    type.

    This metaclass goes through the list of class methods and
    links all methods with the prefix "on_" to the common dispatcher
    method.
    """

    def __new__(
        cls,
        clsname: str,
        bases: Tuple,
        namespace: Dict[str, Any],
        event_handler_method_prefix: str = "on_",
    ) -> Any:
        """Create event dispatcher and link event handlers."""
        new_class = super().__new__(cls, clsname, bases, namespace)

        @singledispatchmethod
        def dispatcher(_self: Any, _event: Event) -> Any:
            """Event dispatcher."""

        # get all class methods which starts with particular prefix
        event_handler_methods = (
            (item_name, item)
            for item_name in dir(new_class)
            if callable((item := getattr(new_class, item_name)))
            and item_name.startswith(event_handler_method_prefix)
        )

        # link all collected event handlers to one dispatcher method
        for method_name, method in event_handler_methods:
            event_handler = dispatcher.register(method)
            setattr(new_class, method_name, event_handler)

        # override default handle_event method, replace it with the
        # dispatcher
        setattr(new_class, "handle_event", dispatcher)

        return new_class


class EventDispatcher(EventHandler, metaclass=EventDispatcherMetaclass):
    """Event dispatcher."""


class EventPublisher(ABC):
    """Base class for the event publisher.

    Event publisher is a intermidiate component between event emitter
    and event consumer.
    """

    @abstractmethod
    def register_event_handler(self, event_handler: EventHandler) -> None:
        """Register event handler.

        :param event_handler: instance of the event handler
        """

    def register_event_handlers(
        self, event_handlers: Optional[List[EventHandler]]
    ) -> None:
        """Register event handlers.

        Can be used for batch registration of the event handlers:

        :param event_handlers: list of the event handler instances
        """
        if not event_handlers:
            return

        for handler in event_handlers:
            self.register_event_handler(handler)

    @abstractmethod
    def publish_event(self, event: Event) -> None:
        """Publish the event.

        Deliver the event to the all registered event handlers.

        :param event: event instance
        """


class DefaultEventPublisher(EventPublisher):
    """Default event publishing implementation.

    Simple implementation that maintains list of the registered event
    handlers.
    """

    def __init__(self) -> None:
        """Init the event publisher."""
        self.handlers: List[EventHandler] = []

    def register_event_handler(self, event_handler: EventHandler) -> None:
        """Register the event handler.

        :param event_handler: instance of the event handler
        """
        self.handlers.append(event_handler)

    def publish_event(self, event: Event) -> None:
        """Publish the event.

        Publisher does not catch exceptions that could be raised by event handlers.
        """
        for handler in self.handlers:
            handler.handle_event(event)


@contextmanager
def stage(
    publisher: EventPublisher, events: Tuple[Event, Event]
) -> Generator[None, None, None]:
    """Generate events before and after stage.

    This context manager could be used to mark start/finish
    execution of a particular logical part of the workflow.
    """
    started, finished = events

    publisher.publish_event(started)
    yield
    publisher.publish_event(finished)


@contextmanager
def action(
    publisher: EventPublisher, action_type: str, params: Optional[Dict] = None
) -> Generator[None, None, None]:
    """Generate events before and after action."""
    action_started = ActionStartedEvent(action_type, params)
    action_finished = ActionFinishedEvent(action_started.event_id)

    publisher.publish_event(action_started)
    yield
    publisher.publish_event(action_finished)


class SystemEventsHandler(EventDispatcher):
    """System events handler."""

    def on_execution_started(self, event: ExecutionStartedEvent) -> None:
        """Handle ExecutionStarted event."""

    def on_execution_finished(self, event: ExecutionFinishedEvent) -> None:
        """Handle ExecutionFinished event."""

    def on_execution_failed(self, event: ExecutionFailedEvent) -> None:
        """Handle ExecutionFailed event."""

    def on_data_collection_stage_started(
        self, event: DataCollectionStageStartedEvent
    ) -> None:
        """Handle DataCollectionStageStarted event."""

    def on_data_collection_stage_finished(
        self, event: DataCollectionStageFinishedEvent
    ) -> None:
        """Handle DataCollectionStageFinished event."""

    def on_data_collector_skipped(self, event: DataCollectorSkippedEvent) -> None:
        """Handle DataCollectorSkipped event."""

    def on_data_analysis_stage_started(
        self, event: DataAnalysisStageStartedEvent
    ) -> None:
        """Handle DataAnalysisStageStartedEvent event."""

    def on_data_analysis_stage_finished(
        self, event: DataAnalysisStageFinishedEvent
    ) -> None:
        """Handle DataAnalysisStageFinishedEvent event."""

    def on_advice_stage_started(self, event: AdviceStageStartedEvent) -> None:
        """Handle AdviceStageStarted event."""

    def on_advice_stage_finished(self, event: AdviceStageFinishedEvent) -> None:
        """Handle AdviceStageFinished event."""

    def on_collected_data(self, event: CollectedDataEvent) -> None:
        """Handle CollectedData event."""

    def on_analyzed_data(self, event: AnalyzedDataEvent) -> None:
        """Handle AnalyzedData event."""

    def on_action_started(self, event: ActionStartedEvent) -> None:
        """Handle ActionStarted event."""

    def on_action_finished(self, event: ActionFinishedEvent) -> None:
        """Handle ActionFinished event."""