1"""
2An engine that listens for libvirt events and resends them to the salt event bus.
3
4The minimal configuration is the following and will listen to all events on the
5local hypervisor and send them with a tag starting with ``salt/engines/libvirt_events``:
6
7.. code-block:: yaml
8
9    engines:
10        - libvirt_events
11
12Note that the automatically-picked libvirt connection will depend on the value
13of ``uri_default`` in ``/etc/libvirt/libvirt.conf``. To force using another
14connection like the local LXC libvirt driver, set the ``uri`` property as in the
15following example configuration.
16
17.. code-block:: yaml
18
19    engines:
20        - libvirt_events:
21            uri: lxc:///
22            tag_prefix: libvirt
23            filters:
24                - domain/lifecycle
25                - domain/reboot
26                - pool
27
28Filters is a list of event types to relay to the event bus. Items in this list
29can be either one of the main types (``domain``, ``network``, ``pool``,
30``nodedev``, ``secret``), ``all`` or a more precise filter. These can be done
31with values like <main_type>/<subtype>. The possible values are in the
32CALLBACK_DEFS constant. If the filters list contains ``all``, all
33events will be relayed.
34
35Be aware that the list of events increases with libvirt versions, for example
36network events have been added in libvirt 1.2.1 and storage events in 2.0.0.
37
38Running the engine on non-root
39------------------------------
40
41Running this engine as non-root requires a special attention, which is surely
42the case for the master running as user `salt`. The engine is likely to fail
43to connect to libvirt with an error like this one:
44
45    [ERROR   ] authentication unavailable: no polkit agent available to authenticate action 'org.libvirt.unix.monitor'
46
47
48To fix this, the user running the engine, for example the salt-master, needs
49to have the rights to connect to libvirt in the machine polkit config.
50A polkit rule like the following one will allow `salt` user to connect to libvirt:
51
52.. code-block:: javascript
53
54    polkit.addRule(function(action, subject) {
55        if (action.id.indexOf("org.libvirt") == 0 &&
56            subject.user == "salt") {
57            return polkit.Result.YES;
58        }
59    });
60
61:depends: libvirt 1.0.0+ python binding
62
63.. versionadded:: 2019.2.0
64"""
65
66import logging
67import urllib.parse
68
69import salt.utils.event
70
71log = logging.getLogger(__name__)
72
73
74try:
75    import libvirt
76except ImportError:
77    libvirt = None  # pylint: disable=invalid-name
78
79
80def __virtual__():
81    """
82    Only load if libvirt python binding is present
83    """
84    if libvirt is None:
85        msg = "libvirt module not found"
86    elif libvirt.getVersion() < 1000000:
87        msg = "libvirt >= 1.0.0 required"
88    else:
89        msg = ""
90    return not bool(msg), msg
91
92
93REGISTER_FUNCTIONS = {
94    "domain": "domainEventRegisterAny",
95    "network": "networkEventRegisterAny",
96    "pool": "storagePoolEventRegisterAny",
97    "nodedev": "nodeDeviceEventRegisterAny",
98    "secret": "secretEventRegisterAny",
99}
100
101# Handle either BLOCK_JOB or BLOCK_JOB_2, but prefer the latter
102if hasattr(libvirt, "VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2"):
103    BLOCK_JOB_ID = "VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2"
104else:
105    BLOCK_JOB_ID = "VIR_DOMAIN_EVENT_ID_BLOCK_JOB"
106
107CALLBACK_DEFS = {
108    "domain": (
109        ("lifecycle", None),
110        ("reboot", None),
111        ("rtc_change", None),
112        ("watchdog", None),
113        ("graphics", None),
114        ("io_error", "VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON"),
115        ("control_error", None),
116        ("disk_change", None),
117        ("tray_change", None),
118        ("pmwakeup", None),
119        ("pmsuspend", None),
120        ("balloon_change", None),
121        ("pmsuspend_disk", None),
122        ("device_removed", None),
123        ("block_job", BLOCK_JOB_ID),
124        ("tunable", None),
125        ("agent_lifecycle", None),
126        ("device_added", None),
127        ("migration_iteration", None),
128        ("job_completed", None),
129        ("device_removal_failed", None),
130        ("metadata_change", None),
131        ("block_threshold", None),
132    ),
133    "network": (("lifecycle", None),),
134    "pool": (
135        ("lifecycle", "VIR_STORAGE_POOL_EVENT_ID_LIFECYCLE"),
136        ("refresh", "VIR_STORAGE_POOL_EVENT_ID_REFRESH"),
137    ),
138    "nodedev": (
139        ("lifecycle", "VIR_NODE_DEVICE_EVENT_ID_LIFECYCLE"),
140        ("update", "VIR_NODE_DEVICE_EVENT_ID_UPDATE"),
141    ),
142    "secret": (("lifecycle", None), ("value_changed", None)),
143}
144
145
146def _compute_subprefix(attr):
147    """
148    Get the part before the first '_' or the end of attr including
149    the potential '_'
150    """
151    return "".join((attr.split("_")[0], "_" if len(attr.split("_")) > 1 else ""))
152
153
154def _get_libvirt_enum_string(prefix, value):
155    """
156    Convert the libvirt enum integer value into a human readable string.
157
158    :param prefix: start of the libvirt attribute to look for.
159    :param value: integer to convert to string
160    """
161    attributes = [
162        attr[len(prefix) :] for attr in libvirt.__dict__ if attr.startswith(prefix)
163    ]
164
165    # Filter out the values starting with a common base as they match another enum
166    prefixes = [_compute_subprefix(p) for p in attributes]
167    counts = {p: prefixes.count(p) for p in prefixes}
168    sub_prefixes = [
169        p
170        for p, count in counts.items()
171        if count > 1 or (p.endswith("_") and p[:-1] in prefixes)
172    ]
173    filtered = [
174        attr for attr in attributes if _compute_subprefix(attr) not in sub_prefixes
175    ]
176
177    for candidate in filtered:
178        if value == getattr(libvirt, "".join((prefix, candidate))):
179            name = candidate.lower().replace("_", " ")
180            return name
181    return "unknown"
182
183
184def _get_domain_event_detail(event, detail):
185    """
186    Convert event and detail numeric values into a tuple of human readable strings
187    """
188    event_name = _get_libvirt_enum_string("VIR_DOMAIN_EVENT_", event)
189    if event_name == "unknown":
190        return event_name, "unknown"
191
192    prefix = "VIR_DOMAIN_EVENT_{}_".format(event_name.upper())
193    detail_name = _get_libvirt_enum_string(prefix, detail)
194
195    return event_name, detail_name
196
197
198def _salt_send_event(opaque, conn, data):
199    """
200    Convenience function adding common data to the event and sending it
201    on the salt event bus.
202
203    :param opaque: the opaque data that is passed to the callback.
204                   This is a dict with 'prefix', 'object' and 'event' keys.
205    :param conn: libvirt connection
206    :param data: additional event data dict to send
207    """
208    tag_prefix = opaque["prefix"]
209    object_type = opaque["object"]
210    event_type = opaque["event"]
211
212    # Prepare the connection URI to fit in the tag
213    # qemu+ssh://user@host:1234/system -> qemu+ssh/user@host:1234/system
214    uri = urllib.parse.urlparse(conn.getURI())
215    uri_tag = [uri.scheme]
216    if uri.netloc:
217        uri_tag.append(uri.netloc)
218    path = uri.path.strip("/")
219    if path:
220        uri_tag.append(path)
221    uri_str = "/".join(uri_tag)
222
223    # Append some common data
224    all_data = {"uri": conn.getURI()}
225    all_data.update(data)
226
227    tag = "/".join((tag_prefix, uri_str, object_type, event_type))
228
229    # Actually send the event in salt
230    if __opts__.get("__role") == "master":
231        salt.utils.event.get_master_event(__opts__, __opts__["sock_dir"]).fire_event(
232            all_data, tag
233        )
234    else:
235        __salt__["event.send"](tag, all_data)
236
237
238def _salt_send_domain_event(opaque, conn, domain, event, event_data):
239    """
240    Helper function send a salt event for a libvirt domain.
241
242    :param opaque: the opaque data that is passed to the callback.
243                   This is a dict with 'prefix', 'object' and 'event' keys.
244    :param conn: libvirt connection
245    :param domain: name of the domain related to the event
246    :param event: name of the event
247    :param event_data: additional event data dict to send
248    """
249    data = {
250        "domain": {
251            "name": domain.name(),
252            "id": domain.ID(),
253            "uuid": domain.UUIDString(),
254        },
255        "event": event,
256    }
257    data.update(event_data)
258    _salt_send_event(opaque, conn, data)
259
260
261def _domain_event_lifecycle_cb(conn, domain, event, detail, opaque):
262    """
263    Domain lifecycle events handler
264    """
265    event_str, detail_str = _get_domain_event_detail(event, detail)
266
267    _salt_send_domain_event(
268        opaque,
269        conn,
270        domain,
271        opaque["event"],
272        {"event": event_str, "detail": detail_str},
273    )
274
275
276def _domain_event_reboot_cb(conn, domain, opaque):
277    """
278    Domain reboot events handler
279    """
280    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {})
281
282
283def _domain_event_rtc_change_cb(conn, domain, utcoffset, opaque):
284    """
285    Domain RTC change events handler
286    """
287    _salt_send_domain_event(
288        opaque, conn, domain, opaque["event"], {"utcoffset": utcoffset}
289    )
290
291
292def _domain_event_watchdog_cb(conn, domain, action, opaque):
293    """
294    Domain watchdog events handler
295    """
296    _salt_send_domain_event(
297        opaque,
298        conn,
299        domain,
300        opaque["event"],
301        {"action": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_WATCHDOG_", action)},
302    )
303
304
305def _domain_event_io_error_cb(conn, domain, srcpath, devalias, action, reason, opaque):
306    """
307    Domain I/O Error events handler
308    """
309    _salt_send_domain_event(
310        opaque,
311        conn,
312        domain,
313        opaque["event"],
314        {
315            "srcPath": srcpath,
316            "dev": devalias,
317            "action": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_IO_ERROR_", action),
318            "reason": reason,
319        },
320    )
321
322
323def _domain_event_graphics_cb(
324    conn, domain, phase, local, remote, auth, subject, opaque
325):
326    """
327    Domain graphics events handler
328    """
329    prefix = "VIR_DOMAIN_EVENT_GRAPHICS_"
330
331    def get_address(addr):
332        """
333        transform address structure into event data piece
334        """
335        return {
336            "family": _get_libvirt_enum_string(
337                "{}_ADDRESS_".format(prefix), addr["family"]
338            ),
339            "node": addr["node"],
340            "service": addr["service"],
341        }
342
343    _salt_send_domain_event(
344        opaque,
345        conn,
346        domain,
347        opaque["event"],
348        {
349            "phase": _get_libvirt_enum_string(prefix, phase),
350            "local": get_address(local),
351            "remote": get_address(remote),
352            "authScheme": auth,
353            "subject": [{"type": item[0], "name": item[1]} for item in subject],
354        },
355    )
356
357
358def _domain_event_control_error_cb(conn, domain, opaque):
359    """
360    Domain control error events handler
361    """
362    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {})
363
364
365def _domain_event_disk_change_cb(conn, domain, old_src, new_src, dev, reason, opaque):
366    """
367    Domain disk change events handler
368    """
369    _salt_send_domain_event(
370        opaque,
371        conn,
372        domain,
373        opaque["event"],
374        {
375            "oldSrcPath": old_src,
376            "newSrcPath": new_src,
377            "dev": dev,
378            "reason": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_DISK_", reason),
379        },
380    )
381
382
383def _domain_event_tray_change_cb(conn, domain, dev, reason, opaque):
384    """
385    Domain tray change events handler
386    """
387    _salt_send_domain_event(
388        opaque,
389        conn,
390        domain,
391        opaque["event"],
392        {
393            "dev": dev,
394            "reason": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_TRAY_CHANGE_", reason),
395        },
396    )
397
398
399def _domain_event_pmwakeup_cb(conn, domain, reason, opaque):
400    """
401    Domain wakeup events handler
402    """
403    _salt_send_domain_event(
404        opaque, conn, domain, opaque["event"], {"reason": "unknown"}  # currently unused
405    )
406
407
408def _domain_event_pmsuspend_cb(conn, domain, reason, opaque):
409    """
410    Domain suspend events handler
411    """
412    _salt_send_domain_event(
413        opaque, conn, domain, opaque["event"], {"reason": "unknown"}  # currently unused
414    )
415
416
417def _domain_event_balloon_change_cb(conn, domain, actual, opaque):
418    """
419    Domain balloon change events handler
420    """
421    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"actual": actual})
422
423
424def _domain_event_pmsuspend_disk_cb(conn, domain, reason, opaque):
425    """
426    Domain disk suspend events handler
427    """
428    _salt_send_domain_event(
429        opaque, conn, domain, opaque["event"], {"reason": "unknown"}  # currently unused
430    )
431
432
433def _domain_event_block_job_cb(conn, domain, disk, job_type, status, opaque):
434    """
435    Domain block job events handler
436    """
437    _salt_send_domain_event(
438        opaque,
439        conn,
440        domain,
441        opaque["event"],
442        {
443            "disk": disk,
444            "type": _get_libvirt_enum_string("VIR_DOMAIN_BLOCK_JOB_TYPE_", job_type),
445            "status": _get_libvirt_enum_string("VIR_DOMAIN_BLOCK_JOB_", status),
446        },
447    )
448
449
450def _domain_event_device_removed_cb(conn, domain, dev, opaque):
451    """
452    Domain device removal events handler
453    """
454    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev})
455
456
457def _domain_event_tunable_cb(conn, domain, params, opaque):
458    """
459    Domain tunable events handler
460    """
461    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"params": params})
462
463
464# pylint: disable=invalid-name
465def _domain_event_agent_lifecycle_cb(conn, domain, state, reason, opaque):
466    """
467    Domain agent lifecycle events handler
468    """
469    _salt_send_domain_event(
470        opaque,
471        conn,
472        domain,
473        opaque["event"],
474        {
475            "state": _get_libvirt_enum_string(
476                "VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_STATE_", state
477            ),
478            "reason": _get_libvirt_enum_string(
479                "VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_REASON_", reason
480            ),
481        },
482    )
483
484
485def _domain_event_device_added_cb(conn, domain, dev, opaque):
486    """
487    Domain device addition events handler
488    """
489    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev})
490
491
492# pylint: disable=invalid-name
493def _domain_event_migration_iteration_cb(conn, domain, iteration, opaque):
494    """
495    Domain migration iteration events handler
496    """
497    _salt_send_domain_event(
498        opaque, conn, domain, opaque["event"], {"iteration": iteration}
499    )
500
501
502def _domain_event_job_completed_cb(conn, domain, params, opaque):
503    """
504    Domain job completion events handler
505    """
506    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"params": params})
507
508
509def _domain_event_device_removal_failed_cb(conn, domain, dev, opaque):
510    """
511    Domain device removal failure events handler
512    """
513    _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev})
514
515
516def _domain_event_metadata_change_cb(conn, domain, mtype, nsuri, opaque):
517    """
518    Domain metadata change events handler
519    """
520    _salt_send_domain_event(
521        opaque,
522        conn,
523        domain,
524        opaque["event"],
525        {
526            "type": _get_libvirt_enum_string("VIR_DOMAIN_METADATA_", mtype),
527            "nsuri": nsuri,
528        },
529    )
530
531
532def _domain_event_block_threshold_cb(
533    conn, domain, dev, path, threshold, excess, opaque
534):
535    """
536    Domain block threshold events handler
537    """
538    _salt_send_domain_event(
539        opaque,
540        conn,
541        domain,
542        opaque["event"],
543        {"dev": dev, "path": path, "threshold": threshold, "excess": excess},
544    )
545
546
547def _network_event_lifecycle_cb(conn, net, event, detail, opaque):
548    """
549    Network lifecycle events handler
550    """
551
552    _salt_send_event(
553        opaque,
554        conn,
555        {
556            "network": {"name": net.name(), "uuid": net.UUIDString()},
557            "event": _get_libvirt_enum_string("VIR_NETWORK_EVENT_", event),
558            "detail": "unknown",  # currently unused
559        },
560    )
561
562
563def _pool_event_lifecycle_cb(conn, pool, event, detail, opaque):
564    """
565    Storage pool lifecycle events handler
566    """
567    _salt_send_event(
568        opaque,
569        conn,
570        {
571            "pool": {"name": pool.name(), "uuid": pool.UUIDString()},
572            "event": _get_libvirt_enum_string("VIR_STORAGE_POOL_EVENT_", event),
573            "detail": "unknown",  # currently unused
574        },
575    )
576
577
578def _pool_event_refresh_cb(conn, pool, opaque):
579    """
580    Storage pool refresh events handler
581    """
582    _salt_send_event(
583        opaque,
584        conn,
585        {
586            "pool": {"name": pool.name(), "uuid": pool.UUIDString()},
587            "event": opaque["event"],
588        },
589    )
590
591
592def _nodedev_event_lifecycle_cb(conn, dev, event, detail, opaque):
593    """
594    Node device lifecycle events handler
595    """
596    _salt_send_event(
597        opaque,
598        conn,
599        {
600            "nodedev": {"name": dev.name()},
601            "event": _get_libvirt_enum_string("VIR_NODE_DEVICE_EVENT_", event),
602            "detail": "unknown",  # currently unused
603        },
604    )
605
606
607def _nodedev_event_update_cb(conn, dev, opaque):
608    """
609    Node device update events handler
610    """
611    _salt_send_event(
612        opaque, conn, {"nodedev": {"name": dev.name()}, "event": opaque["event"]}
613    )
614
615
616def _secret_event_lifecycle_cb(conn, secret, event, detail, opaque):
617    """
618    Secret lifecycle events handler
619    """
620    _salt_send_event(
621        opaque,
622        conn,
623        {
624            "secret": {"uuid": secret.UUIDString()},
625            "event": _get_libvirt_enum_string("VIR_SECRET_EVENT_", event),
626            "detail": "unknown",  # currently unused
627        },
628    )
629
630
631def _secret_event_value_changed_cb(conn, secret, opaque):
632    """
633    Secret value change events handler
634    """
635    _salt_send_event(
636        opaque,
637        conn,
638        {"secret": {"uuid": secret.UUIDString()}, "event": opaque["event"]},
639    )
640
641
642def _cleanup(cnx):
643    """
644    Close the libvirt connection
645
646    :param cnx: libvirt connection
647    """
648    log.debug("Closing libvirt connection: %s", cnx.getURI())
649    cnx.close()
650
651
652def _callbacks_cleanup(cnx, callback_ids):
653    """
654    Unregister all the registered callbacks
655
656    :param cnx: libvirt connection
657    :param callback_ids: dictionary mapping a libvirt object type to an ID list
658                         of callbacks to deregister
659    """
660    for obj, ids in callback_ids.items():
661        register_name = REGISTER_FUNCTIONS[obj]
662        deregister_name = register_name.replace("Reg", "Dereg")
663        deregister = getattr(cnx, deregister_name)
664        for callback_id in ids:
665            deregister(callback_id)
666
667
668def _register_callback(cnx, tag_prefix, obj, event, real_id):
669    """
670    Helper function registering a callback
671
672    :param cnx: libvirt connection
673    :param tag_prefix: salt event tag prefix to use
674    :param obj: the libvirt object name for the event. Needs to
675                be one of the REGISTER_FUNCTIONS keys.
676    :param event: the event type name.
677    :param real_id: the libvirt name of an alternative event id to use or None
678
679    :rtype integer value needed to deregister the callback
680    """
681    libvirt_name = real_id
682    if real_id is None:
683        libvirt_name = "VIR_{}_EVENT_ID_{}".format(obj, event).upper()
684
685    if not hasattr(libvirt, libvirt_name):
686        log.warning('Skipping "%s/%s" events: libvirt too old', obj, event)
687        return None
688
689    libvirt_id = getattr(libvirt, libvirt_name)
690    callback_name = "_{}_event_{}_cb".format(obj, event)
691    callback = globals().get(callback_name, None)
692    if callback is None:
693        log.error("Missing function %s in engine", callback_name)
694        return None
695
696    register = getattr(cnx, REGISTER_FUNCTIONS[obj])
697    return register(
698        None,
699        libvirt_id,
700        callback,
701        {"prefix": tag_prefix, "object": obj, "event": event},
702    )
703
704
705def _append_callback_id(ids, obj, callback_id):
706    """
707    Helper function adding a callback ID to the IDs dict.
708    The callback ids dict maps an object to event callback ids.
709
710    :param ids: dict of callback IDs to update
711    :param obj: one of the keys of REGISTER_FUNCTIONS
712    :param callback_id: the result of _register_callback
713    """
714    if obj not in ids:
715        ids[obj] = []
716    ids[obj].append(callback_id)
717
718
719def start(uri=None, tag_prefix="salt/engines/libvirt_events", filters=None):
720    """
721    Listen to libvirt events and forward them to salt.
722
723    :param uri: libvirt URI to listen on.
724                Defaults to None to pick the first available local hypervisor
725    :param tag_prefix: the beginning of the salt event tag to use.
726                       Defaults to 'salt/engines/libvirt_events'
727    :param filters: the list of event of listen on. Defaults to 'all'
728    """
729    if filters is None:
730        filters = ["all"]
731    try:
732        libvirt.virEventRegisterDefaultImpl()
733
734        cnx = libvirt.openReadOnly(uri)
735        log.debug("Opened libvirt uri: %s", cnx.getURI())
736
737        callback_ids = {}
738        all_filters = "all" in filters
739
740        for obj, event_defs in CALLBACK_DEFS.items():
741            for event, real_id in event_defs:
742                event_filter = "/".join((obj, event))
743                if (
744                    event_filter not in filters
745                    and obj not in filters
746                    and not all_filters
747                ):
748                    continue
749                registered_id = _register_callback(cnx, tag_prefix, obj, event, real_id)
750                if registered_id:
751                    _append_callback_id(callback_ids, obj, registered_id)
752
753        exit_loop = False
754        while not exit_loop:
755            exit_loop = libvirt.virEventRunDefaultImpl() < 0
756
757    except Exception as err:  # pylint: disable=broad-except
758        log.exception(err)
759    finally:
760        _callbacks_cleanup(cnx, callback_ids)
761        _cleanup(cnx)
762