xref: /qemu/python/qemu/qmp/events.py (revision b2a3cbb8)
1"""
2QMP Events and EventListeners
3
4Asynchronous QMP uses `EventListener` objects to listen for events. An
5`EventListener` is a FIFO event queue that can be pre-filtered to listen
6for only specific events. Each `EventListener` instance receives its own
7copy of events that it hears, so events may be consumed without fear or
8worry for depriving other listeners of events they need to hear.
9
10
11EventListener Tutorial
12----------------------
13
14In all of the following examples, we assume that we have a `QMPClient`
15instantiated named ``qmp`` that is already connected.
16
17
18`listener()` context blocks with one name
19~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
20
21The most basic usage is by using the `listener()` context manager to
22construct them:
23
24.. code:: python
25
26   with qmp.listener('STOP') as listener:
27       await qmp.execute('stop')
28       await listener.get()
29
30The listener is active only for the duration of the ‘with’ block. This
31instance listens only for ‘STOP’ events.
32
33
34`listener()` context blocks with two or more names
35~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
36
37Multiple events can be selected for by providing any ``Iterable[str]``:
38
39.. code:: python
40
41   with qmp.listener(('STOP', 'RESUME')) as listener:
42       await qmp.execute('stop')
43       event = await listener.get()
44       assert event['event'] == 'STOP'
45
46       await qmp.execute('cont')
47       event = await listener.get()
48       assert event['event'] == 'RESUME'
49
50
51`listener()` context blocks with no names
52~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
53
54By omitting names entirely, you can listen to ALL events.
55
56.. code:: python
57
58   with qmp.listener() as listener:
59       await qmp.execute('stop')
60       event = await listener.get()
61       assert event['event'] == 'STOP'
62
63This isn’t a very good use case for this feature: In a non-trivial
64running system, we may not know what event will arrive next. Grabbing
65the top of a FIFO queue returning multiple kinds of events may be prone
66to error.
67
68
69Using async iterators to retrieve events
70~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
71
72If you’d like to simply watch what events happen to arrive, you can use
73the listener as an async iterator:
74
75.. code:: python
76
77   with qmp.listener() as listener:
78       async for event in listener:
79           print(f"Event arrived: {event['event']}")
80
81This is analogous to the following code:
82
83.. code:: python
84
85   with qmp.listener() as listener:
86       while True:
87           event = listener.get()
88           print(f"Event arrived: {event['event']}")
89
90This event stream will never end, so these blocks will never terminate.
91
92
93Using asyncio.Task to concurrently retrieve events
94~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
95
96Since a listener’s event stream will never terminate, it is not likely
97useful to use that form in a script. For longer-running clients, we can
98create event handlers by using `asyncio.Task` to create concurrent
99coroutines:
100
101.. code:: python
102
103   async def print_events(listener):
104       try:
105           async for event in listener:
106               print(f"Event arrived: {event['event']}")
107       except asyncio.CancelledError:
108           return
109
110   with qmp.listener() as listener:
111       task = asyncio.Task(print_events(listener))
112       await qmp.execute('stop')
113       await qmp.execute('cont')
114       task.cancel()
115       await task
116
117However, there is no guarantee that these events will be received by the
118time we leave this context block. Once the context block is exited, the
119listener will cease to hear any new events, and becomes inert.
120
121Be mindful of the timing: the above example will *probably*– but does
122not *guarantee*– that both STOP/RESUMED events will be printed. The
123example below outlines how to use listeners outside of a context block.
124
125
126Using `register_listener()` and `remove_listener()`
127~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
128
129To create a listener with a longer lifetime, beyond the scope of a
130single block, create a listener and then call `register_listener()`:
131
132.. code:: python
133
134   class MyClient:
135       def __init__(self, qmp):
136           self.qmp = qmp
137           self.listener = EventListener()
138
139       async def print_events(self):
140           try:
141               async for event in self.listener:
142                   print(f"Event arrived: {event['event']}")
143           except asyncio.CancelledError:
144               return
145
146       async def run(self):
147           self.task = asyncio.Task(self.print_events)
148           self.qmp.register_listener(self.listener)
149           await qmp.execute('stop')
150           await qmp.execute('cont')
151
152       async def stop(self):
153           self.task.cancel()
154           await self.task
155           self.qmp.remove_listener(self.listener)
156
157The listener can be deactivated by using `remove_listener()`. When it is
158removed, any possible pending events are cleared and it can be
159re-registered at a later time.
160
161
162Using the built-in all events listener
163~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
164
165The `QMPClient` object creates its own default listener named
166:py:obj:`~Events.events` that can be used for the same purpose without
167having to create your own:
168
169.. code:: python
170
171   async def print_events(listener):
172       try:
173           async for event in listener:
174               print(f"Event arrived: {event['event']}")
175       except asyncio.CancelledError:
176           return
177
178   task = asyncio.Task(print_events(qmp.events))
179
180   await qmp.execute('stop')
181   await qmp.execute('cont')
182
183   task.cancel()
184   await task
185
186
187Using both .get() and async iterators
188~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
189
190The async iterator and `get()` methods pull events from the same FIFO
191queue. If you mix the usage of both, be aware: Events are emitted
192precisely once per listener.
193
194If multiple contexts try to pull events from the same listener instance,
195events are still emitted only precisely once.
196
197This restriction can be lifted by creating additional listeners.
198
199
200Creating multiple listeners
201~~~~~~~~~~~~~~~~~~~~~~~~~~~
202
203Additional `EventListener` objects can be created at-will. Each one
204receives its own copy of events, with separate FIFO event queues.
205
206.. code:: python
207
208   my_listener = EventListener()
209   qmp.register_listener(my_listener)
210
211   await qmp.execute('stop')
212   copy1 = await my_listener.get()
213   copy2 = await qmp.events.get()
214
215   assert copy1 == copy2
216
217In this example, we await an event from both a user-created
218`EventListener` and the built-in events listener. Both receive the same
219event.
220
221
222Clearing listeners
223~~~~~~~~~~~~~~~~~~
224
225`EventListener` objects can be cleared, clearing all events seen thus far:
226
227.. code:: python
228
229   await qmp.execute('stop')
230   qmp.events.clear()
231   await qmp.execute('cont')
232   event = await qmp.events.get()
233   assert event['event'] == 'RESUME'
234
235`EventListener` objects are FIFO queues. If events are not consumed,
236they will remain in the queue until they are witnessed or discarded via
237`clear()`. FIFO queues will be drained automatically upon leaving a
238context block, or when calling `remove_listener()`.
239
240
241Accessing listener history
242~~~~~~~~~~~~~~~~~~~~~~~~~~
243
244`EventListener` objects record their history. Even after being cleared,
245you can obtain a record of all events seen so far:
246
247.. code:: python
248
249   await qmp.execute('stop')
250   await qmp.execute('cont')
251   qmp.events.clear()
252
253   assert len(qmp.events.history) == 2
254   assert qmp.events.history[0]['event'] == 'STOP'
255   assert qmp.events.history[1]['event'] == 'RESUME'
256
257The history is updated immediately and does not require the event to be
258witnessed first.
259
260
261Using event filters
262~~~~~~~~~~~~~~~~~~~
263
264`EventListener` objects can be given complex filtering criteria if names
265are not sufficient:
266
267.. code:: python
268
269   def job1_filter(event) -> bool:
270       event_data = event.get('data', {})
271       event_job_id = event_data.get('id')
272       return event_job_id == "job1"
273
274   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
275       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
276       async for event in listener:
277           if event['data']['status'] == 'concluded':
278               break
279
280These filters might be most useful when parameterized. `EventListener`
281objects expect a function that takes only a single argument (the raw
282event, as a `Message`) and returns a bool; True if the event should be
283accepted into the stream. You can create a function that adapts this
284signature to accept configuration parameters:
285
286.. code:: python
287
288   def job_filter(job_id: str) -> EventFilter:
289       def filter(event: Message) -> bool:
290           return event['data']['id'] == job_id
291       return filter
292
293   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
294       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
295       async for event in listener:
296           if event['data']['status'] == 'concluded':
297               break
298
299
300Activating an existing listener with `listen()`
301~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
302
303Listeners with complex, long configurations can also be created manually
304and activated temporarily by using `listen()` instead of `listener()`:
305
306.. code:: python
307
308   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
309                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
310                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
311
312   with qmp.listen(listener):
313       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
314       async for event in listener:
315           print(event)
316           if event['event'] == 'BLOCK_JOB_COMPLETED':
317               break
318
319Any events that are not witnessed by the time the block is left will be
320cleared from the queue; entering the block is an implicit
321`register_listener()` and leaving the block is an implicit
322`remove_listener()`.
323
324
325Activating multiple existing listeners with `listen()`
326~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
327
328While `listener()` is only capable of creating a single listener,
329`listen()` is capable of activating multiple listeners simultaneously:
330
331.. code:: python
332
333   def job_filter(job_id: str) -> EventFilter:
334       def filter(event: Message) -> bool:
335           return event['data']['id'] == job_id
336       return filter
337
338   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
339   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
340
341   with qmp.listen(jobA, jobB):
342       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
343       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
344
345       async for event in jobA.get():
346           if event['data']['status'] == 'concluded':
347               break
348       async for event in jobB.get():
349           if event['data']['status'] == 'concluded':
350               break
351
352
353Extending the `EventListener` class
354~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
355
356In the case that a more specialized `EventListener` is desired to
357provide either more functionality or more compact syntax for specialized
358cases, it can be extended.
359
360One of the key methods to extend or override is
361:py:meth:`~EventListener.accept()`. The default implementation checks an
362incoming message for:
363
3641. A qualifying name, if any :py:obj:`~EventListener.names` were
365   specified at initialization time
3662. That :py:obj:`~EventListener.event_filter()` returns True.
367
368This can be modified however you see fit to change the criteria for
369inclusion in the stream.
370
371For convenience, a ``JobListener`` class could be created that simply
372bakes in configuration so it does not need to be repeated:
373
374.. code:: python
375
376   class JobListener(EventListener):
377       def __init__(self, job_id: str):
378           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
379                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
380                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
381           self.job_id = job_id
382
383       def accept(self, event) -> bool:
384           if not super().accept(event):
385               return False
386           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
387               return event['data']['id'] == job_id
388           return event['data']['device'] == job_id
389
390From here on out, you can conjure up a custom-purpose listener that
391listens only for job-related events for a specific job-id easily:
392
393.. code:: python
394
395   listener = JobListener('job4')
396   with qmp.listener(listener):
397       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
398       async for event in listener:
399           print(event)
400           if event['event'] == 'BLOCK_JOB_COMPLETED':
401               break
402
403
404Experimental Interfaces & Design Issues
405---------------------------------------
406
407These interfaces are not ones I am sure I will keep or otherwise modify
408heavily.
409
410qmp.listener()’s type signature
411~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
412
413`listener()` does not return anything, because it was assumed the caller
414already had a handle to the listener. However, for
415``qmp.listener(EventListener())`` forms, the caller will not have saved
416a handle to the listener.
417
418Because this function can accept *many* listeners, I found it hard to
419accurately type in a way where it could be used in both “one” or “many”
420forms conveniently and in a statically type-safe manner.
421
422Ultimately, I removed the return altogether, but perhaps with more time
423I can work out a way to re-add it.
424
425
426API Reference
427-------------
428
429"""
430
431import asyncio
432from contextlib import contextmanager
433import logging
434from typing import (
435    AsyncIterator,
436    Callable,
437    Iterable,
438    Iterator,
439    List,
440    Optional,
441    Set,
442    Tuple,
443    Union,
444)
445
446from .error import QMPError
447from .message import Message
448
449
450EventNames = Union[str, Iterable[str], None]
451EventFilter = Callable[[Message], bool]
452
453
454class ListenerError(QMPError):
455    """
456    Generic error class for `EventListener`-related problems.
457    """
458
459
460class EventListener:
461    """
462    Selectively listens for events with runtime configurable filtering.
463
464    This class is designed to be directly usable for the most common cases,
465    but it can be extended to provide more rigorous control.
466
467    :param names:
468        One or more names of events to listen for.
469        When not provided, listen for ALL events.
470    :param event_filter:
471        An optional event filtering function.
472        When names are also provided, this acts as a secondary filter.
473
474    When ``names`` and ``event_filter`` are both provided, the names
475    will be filtered first, and then the filter function will be called
476    second. The event filter function can assume that the format of the
477    event is a known format.
478    """
479    def __init__(
480        self,
481        names: EventNames = None,
482        event_filter: Optional[EventFilter] = None,
483    ):
484        # Queue of 'heard' events yet to be witnessed by a caller.
485        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
486
487        # Intended as a historical record, NOT a processing queue or backlog.
488        self._history: List[Message] = []
489
490        #: Primary event filter, based on one or more event names.
491        self.names: Set[str] = set()
492        if isinstance(names, str):
493            self.names.add(names)
494        elif names is not None:
495            self.names.update(names)
496
497        #: Optional, secondary event filter.
498        self.event_filter: Optional[EventFilter] = event_filter
499
500    @property
501    def history(self) -> Tuple[Message, ...]:
502        """
503        A read-only history of all events seen so far.
504
505        This represents *every* event, including those not yet witnessed
506        via `get()` or ``async for``. It persists between `clear()`
507        calls and is immutable.
508        """
509        return tuple(self._history)
510
511    def accept(self, event: Message) -> bool:
512        """
513        Determine if this listener accepts this event.
514
515        This method determines which events will appear in the stream.
516        The default implementation simply checks the event against the
517        list of names and the event_filter to decide if this
518        `EventListener` accepts a given event. It can be
519        overridden/extended to provide custom listener behavior.
520
521        User code is not expected to need to invoke this method.
522
523        :param event: The event under consideration.
524        :return: `True`, if this listener accepts this event.
525        """
526        name_ok = (not self.names) or (event['event'] in self.names)
527        return name_ok and (
528            (not self.event_filter) or self.event_filter(event)
529        )
530
531    async def put(self, event: Message) -> None:
532        """
533        Conditionally put a new event into the FIFO queue.
534
535        This method is not designed to be invoked from user code, and it
536        should not need to be overridden. It is a public interface so
537        that `QMPClient` has an interface by which it can inform
538        registered listeners of new events.
539
540        The event will be put into the queue if
541        :py:meth:`~EventListener.accept()` returns `True`.
542
543        :param event: The new event to put into the FIFO queue.
544        """
545        if not self.accept(event):
546            return
547
548        self._history.append(event)
549        await self._queue.put(event)
550
551    async def get(self) -> Message:
552        """
553        Wait for the very next event in this stream.
554
555        If one is already available, return that one.
556        """
557        return await self._queue.get()
558
559    def empty(self) -> bool:
560        """
561        Return `True` if there are no pending events.
562        """
563        return self._queue.empty()
564
565    def clear(self) -> List[Message]:
566        """
567        Clear this listener of all pending events.
568
569        Called when an `EventListener` is being unregistered, this clears the
570        pending FIFO queue synchronously. It can be also be used to
571        manually clear any pending events, if desired.
572
573        :return: The cleared events, if any.
574
575        .. warning::
576            Take care when discarding events. Cleared events will be
577            silently tossed on the floor. All events that were ever
578            accepted by this listener are visible in `history()`.
579        """
580        events = []
581        while True:
582            try:
583                events.append(self._queue.get_nowait())
584            except asyncio.QueueEmpty:
585                break
586
587        return events
588
589    def __aiter__(self) -> AsyncIterator[Message]:
590        return self
591
592    async def __anext__(self) -> Message:
593        """
594        Enables the `EventListener` to function as an async iterator.
595
596        It may be used like this:
597
598        .. code:: python
599
600            async for event in listener:
601                print(event)
602
603        These iterators will never terminate of their own accord; you
604        must provide break conditions or otherwise prepare to run them
605        in an `asyncio.Task` that can be cancelled.
606        """
607        return await self.get()
608
609
610class Events:
611    """
612    Events is a mix-in class that adds event functionality to the QMP class.
613
614    It's designed specifically as a mix-in for `QMPClient`, and it
615    relies upon the class it is being mixed into having a 'logger'
616    property.
617    """
618    def __init__(self) -> None:
619        self._listeners: List[EventListener] = []
620
621        #: Default, all-events `EventListener`.
622        self.events: EventListener = EventListener()
623        self.register_listener(self.events)
624
625        # Parent class needs to have a logger
626        self.logger: logging.Logger
627
628    async def _event_dispatch(self, msg: Message) -> None:
629        """
630        Given a new event, propagate it to all of the active listeners.
631
632        :param msg: The event to propagate.
633        """
634        for listener in self._listeners:
635            await listener.put(msg)
636
637    def register_listener(self, listener: EventListener) -> None:
638        """
639        Register and activate an `EventListener`.
640
641        :param listener: The listener to activate.
642        :raise ListenerError: If the given listener is already registered.
643        """
644        if listener in self._listeners:
645            raise ListenerError("Attempted to re-register existing listener")
646        self.logger.debug("Registering %s.", str(listener))
647        self._listeners.append(listener)
648
649    def remove_listener(self, listener: EventListener) -> None:
650        """
651        Unregister and deactivate an `EventListener`.
652
653        The removed listener will have its pending events cleared via
654        `clear()`. The listener can be re-registered later when
655        desired.
656
657        :param listener: The listener to deactivate.
658        :raise ListenerError: If the given listener is not registered.
659        """
660        if listener == self.events:
661            raise ListenerError("Cannot remove the default listener.")
662        self.logger.debug("Removing %s.", str(listener))
663        listener.clear()
664        self._listeners.remove(listener)
665
666    @contextmanager
667    def listen(self, *listeners: EventListener) -> Iterator[None]:
668        r"""
669        Context manager: Temporarily listen with an `EventListener`.
670
671        Accepts one or more `EventListener` objects and registers them,
672        activating them for the duration of the context block.
673
674        `EventListener` objects will have any pending events in their
675        FIFO queue cleared upon exiting the context block, when they are
676        deactivated.
677
678        :param \*listeners: One or more EventListeners to activate.
679        :raise ListenerError: If the given listener(s) are already active.
680        """
681        _added = []
682
683        try:
684            for listener in listeners:
685                self.register_listener(listener)
686                _added.append(listener)
687
688            yield
689
690        finally:
691            for listener in _added:
692                self.remove_listener(listener)
693
694    @contextmanager
695    def listener(
696        self,
697        names: EventNames = (),
698        event_filter: Optional[EventFilter] = None
699    ) -> Iterator[EventListener]:
700        """
701        Context manager: Temporarily listen with a new `EventListener`.
702
703        Creates an `EventListener` object and registers it, activating
704        it for the duration of the context block.
705
706        :param names:
707            One or more names of events to listen for.
708            When not provided, listen for ALL events.
709        :param event_filter:
710            An optional event filtering function.
711            When names are also provided, this acts as a secondary filter.
712
713        :return: The newly created and active `EventListener`.
714        """
715        listener = EventListener(names, event_filter)
716        with self.listen(listener):
717            yield listener
718