1""" 2An engine that listens for libvirt events and resends them to the salt event bus. 3 4The minimal configuration is the following and will listen to all events on the 5local hypervisor and send them with a tag starting with ``salt/engines/libvirt_events``: 6 7.. code-block:: yaml 8 9 engines: 10 - libvirt_events 11 12Note that the automatically-picked libvirt connection will depend on the value 13of ``uri_default`` in ``/etc/libvirt/libvirt.conf``. To force using another 14connection like the local LXC libvirt driver, set the ``uri`` property as in the 15following example configuration. 16 17.. code-block:: yaml 18 19 engines: 20 - libvirt_events: 21 uri: lxc:/// 22 tag_prefix: libvirt 23 filters: 24 - domain/lifecycle 25 - domain/reboot 26 - pool 27 28Filters is a list of event types to relay to the event bus. Items in this list 29can be either one of the main types (``domain``, ``network``, ``pool``, 30``nodedev``, ``secret``), ``all`` or a more precise filter. These can be done 31with values like <main_type>/<subtype>. The possible values are in the 32CALLBACK_DEFS constant. If the filters list contains ``all``, all 33events will be relayed. 34 35Be aware that the list of events increases with libvirt versions, for example 36network events have been added in libvirt 1.2.1 and storage events in 2.0.0. 37 38Running the engine on non-root 39------------------------------ 40 41Running this engine as non-root requires a special attention, which is surely 42the case for the master running as user `salt`. The engine is likely to fail 43to connect to libvirt with an error like this one: 44 45 [ERROR ] authentication unavailable: no polkit agent available to authenticate action 'org.libvirt.unix.monitor' 46 47 48To fix this, the user running the engine, for example the salt-master, needs 49to have the rights to connect to libvirt in the machine polkit config. 50A polkit rule like the following one will allow `salt` user to connect to libvirt: 51 52.. code-block:: javascript 53 54 polkit.addRule(function(action, subject) { 55 if (action.id.indexOf("org.libvirt") == 0 && 56 subject.user == "salt") { 57 return polkit.Result.YES; 58 } 59 }); 60 61:depends: libvirt 1.0.0+ python binding 62 63.. versionadded:: 2019.2.0 64""" 65 66import logging 67import urllib.parse 68 69import salt.utils.event 70 71log = logging.getLogger(__name__) 72 73 74try: 75 import libvirt 76except ImportError: 77 libvirt = None # pylint: disable=invalid-name 78 79 80def __virtual__(): 81 """ 82 Only load if libvirt python binding is present 83 """ 84 if libvirt is None: 85 msg = "libvirt module not found" 86 elif libvirt.getVersion() < 1000000: 87 msg = "libvirt >= 1.0.0 required" 88 else: 89 msg = "" 90 return not bool(msg), msg 91 92 93REGISTER_FUNCTIONS = { 94 "domain": "domainEventRegisterAny", 95 "network": "networkEventRegisterAny", 96 "pool": "storagePoolEventRegisterAny", 97 "nodedev": "nodeDeviceEventRegisterAny", 98 "secret": "secretEventRegisterAny", 99} 100 101# Handle either BLOCK_JOB or BLOCK_JOB_2, but prefer the latter 102if hasattr(libvirt, "VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2"): 103 BLOCK_JOB_ID = "VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2" 104else: 105 BLOCK_JOB_ID = "VIR_DOMAIN_EVENT_ID_BLOCK_JOB" 106 107CALLBACK_DEFS = { 108 "domain": ( 109 ("lifecycle", None), 110 ("reboot", None), 111 ("rtc_change", None), 112 ("watchdog", None), 113 ("graphics", None), 114 ("io_error", "VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON"), 115 ("control_error", None), 116 ("disk_change", None), 117 ("tray_change", None), 118 ("pmwakeup", None), 119 ("pmsuspend", None), 120 ("balloon_change", None), 121 ("pmsuspend_disk", None), 122 ("device_removed", None), 123 ("block_job", BLOCK_JOB_ID), 124 ("tunable", None), 125 ("agent_lifecycle", None), 126 ("device_added", None), 127 ("migration_iteration", None), 128 ("job_completed", None), 129 ("device_removal_failed", None), 130 ("metadata_change", None), 131 ("block_threshold", None), 132 ), 133 "network": (("lifecycle", None),), 134 "pool": ( 135 ("lifecycle", "VIR_STORAGE_POOL_EVENT_ID_LIFECYCLE"), 136 ("refresh", "VIR_STORAGE_POOL_EVENT_ID_REFRESH"), 137 ), 138 "nodedev": ( 139 ("lifecycle", "VIR_NODE_DEVICE_EVENT_ID_LIFECYCLE"), 140 ("update", "VIR_NODE_DEVICE_EVENT_ID_UPDATE"), 141 ), 142 "secret": (("lifecycle", None), ("value_changed", None)), 143} 144 145 146def _compute_subprefix(attr): 147 """ 148 Get the part before the first '_' or the end of attr including 149 the potential '_' 150 """ 151 return "".join((attr.split("_")[0], "_" if len(attr.split("_")) > 1 else "")) 152 153 154def _get_libvirt_enum_string(prefix, value): 155 """ 156 Convert the libvirt enum integer value into a human readable string. 157 158 :param prefix: start of the libvirt attribute to look for. 159 :param value: integer to convert to string 160 """ 161 attributes = [ 162 attr[len(prefix) :] for attr in libvirt.__dict__ if attr.startswith(prefix) 163 ] 164 165 # Filter out the values starting with a common base as they match another enum 166 prefixes = [_compute_subprefix(p) for p in attributes] 167 counts = {p: prefixes.count(p) for p in prefixes} 168 sub_prefixes = [ 169 p 170 for p, count in counts.items() 171 if count > 1 or (p.endswith("_") and p[:-1] in prefixes) 172 ] 173 filtered = [ 174 attr for attr in attributes if _compute_subprefix(attr) not in sub_prefixes 175 ] 176 177 for candidate in filtered: 178 if value == getattr(libvirt, "".join((prefix, candidate))): 179 name = candidate.lower().replace("_", " ") 180 return name 181 return "unknown" 182 183 184def _get_domain_event_detail(event, detail): 185 """ 186 Convert event and detail numeric values into a tuple of human readable strings 187 """ 188 event_name = _get_libvirt_enum_string("VIR_DOMAIN_EVENT_", event) 189 if event_name == "unknown": 190 return event_name, "unknown" 191 192 prefix = "VIR_DOMAIN_EVENT_{}_".format(event_name.upper()) 193 detail_name = _get_libvirt_enum_string(prefix, detail) 194 195 return event_name, detail_name 196 197 198def _salt_send_event(opaque, conn, data): 199 """ 200 Convenience function adding common data to the event and sending it 201 on the salt event bus. 202 203 :param opaque: the opaque data that is passed to the callback. 204 This is a dict with 'prefix', 'object' and 'event' keys. 205 :param conn: libvirt connection 206 :param data: additional event data dict to send 207 """ 208 tag_prefix = opaque["prefix"] 209 object_type = opaque["object"] 210 event_type = opaque["event"] 211 212 # Prepare the connection URI to fit in the tag 213 # qemu+ssh://user@host:1234/system -> qemu+ssh/user@host:1234/system 214 uri = urllib.parse.urlparse(conn.getURI()) 215 uri_tag = [uri.scheme] 216 if uri.netloc: 217 uri_tag.append(uri.netloc) 218 path = uri.path.strip("/") 219 if path: 220 uri_tag.append(path) 221 uri_str = "/".join(uri_tag) 222 223 # Append some common data 224 all_data = {"uri": conn.getURI()} 225 all_data.update(data) 226 227 tag = "/".join((tag_prefix, uri_str, object_type, event_type)) 228 229 # Actually send the event in salt 230 if __opts__.get("__role") == "master": 231 salt.utils.event.get_master_event(__opts__, __opts__["sock_dir"]).fire_event( 232 all_data, tag 233 ) 234 else: 235 __salt__["event.send"](tag, all_data) 236 237 238def _salt_send_domain_event(opaque, conn, domain, event, event_data): 239 """ 240 Helper function send a salt event for a libvirt domain. 241 242 :param opaque: the opaque data that is passed to the callback. 243 This is a dict with 'prefix', 'object' and 'event' keys. 244 :param conn: libvirt connection 245 :param domain: name of the domain related to the event 246 :param event: name of the event 247 :param event_data: additional event data dict to send 248 """ 249 data = { 250 "domain": { 251 "name": domain.name(), 252 "id": domain.ID(), 253 "uuid": domain.UUIDString(), 254 }, 255 "event": event, 256 } 257 data.update(event_data) 258 _salt_send_event(opaque, conn, data) 259 260 261def _domain_event_lifecycle_cb(conn, domain, event, detail, opaque): 262 """ 263 Domain lifecycle events handler 264 """ 265 event_str, detail_str = _get_domain_event_detail(event, detail) 266 267 _salt_send_domain_event( 268 opaque, 269 conn, 270 domain, 271 opaque["event"], 272 {"event": event_str, "detail": detail_str}, 273 ) 274 275 276def _domain_event_reboot_cb(conn, domain, opaque): 277 """ 278 Domain reboot events handler 279 """ 280 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {}) 281 282 283def _domain_event_rtc_change_cb(conn, domain, utcoffset, opaque): 284 """ 285 Domain RTC change events handler 286 """ 287 _salt_send_domain_event( 288 opaque, conn, domain, opaque["event"], {"utcoffset": utcoffset} 289 ) 290 291 292def _domain_event_watchdog_cb(conn, domain, action, opaque): 293 """ 294 Domain watchdog events handler 295 """ 296 _salt_send_domain_event( 297 opaque, 298 conn, 299 domain, 300 opaque["event"], 301 {"action": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_WATCHDOG_", action)}, 302 ) 303 304 305def _domain_event_io_error_cb(conn, domain, srcpath, devalias, action, reason, opaque): 306 """ 307 Domain I/O Error events handler 308 """ 309 _salt_send_domain_event( 310 opaque, 311 conn, 312 domain, 313 opaque["event"], 314 { 315 "srcPath": srcpath, 316 "dev": devalias, 317 "action": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_IO_ERROR_", action), 318 "reason": reason, 319 }, 320 ) 321 322 323def _domain_event_graphics_cb( 324 conn, domain, phase, local, remote, auth, subject, opaque 325): 326 """ 327 Domain graphics events handler 328 """ 329 prefix = "VIR_DOMAIN_EVENT_GRAPHICS_" 330 331 def get_address(addr): 332 """ 333 transform address structure into event data piece 334 """ 335 return { 336 "family": _get_libvirt_enum_string( 337 "{}_ADDRESS_".format(prefix), addr["family"] 338 ), 339 "node": addr["node"], 340 "service": addr["service"], 341 } 342 343 _salt_send_domain_event( 344 opaque, 345 conn, 346 domain, 347 opaque["event"], 348 { 349 "phase": _get_libvirt_enum_string(prefix, phase), 350 "local": get_address(local), 351 "remote": get_address(remote), 352 "authScheme": auth, 353 "subject": [{"type": item[0], "name": item[1]} for item in subject], 354 }, 355 ) 356 357 358def _domain_event_control_error_cb(conn, domain, opaque): 359 """ 360 Domain control error events handler 361 """ 362 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {}) 363 364 365def _domain_event_disk_change_cb(conn, domain, old_src, new_src, dev, reason, opaque): 366 """ 367 Domain disk change events handler 368 """ 369 _salt_send_domain_event( 370 opaque, 371 conn, 372 domain, 373 opaque["event"], 374 { 375 "oldSrcPath": old_src, 376 "newSrcPath": new_src, 377 "dev": dev, 378 "reason": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_DISK_", reason), 379 }, 380 ) 381 382 383def _domain_event_tray_change_cb(conn, domain, dev, reason, opaque): 384 """ 385 Domain tray change events handler 386 """ 387 _salt_send_domain_event( 388 opaque, 389 conn, 390 domain, 391 opaque["event"], 392 { 393 "dev": dev, 394 "reason": _get_libvirt_enum_string("VIR_DOMAIN_EVENT_TRAY_CHANGE_", reason), 395 }, 396 ) 397 398 399def _domain_event_pmwakeup_cb(conn, domain, reason, opaque): 400 """ 401 Domain wakeup events handler 402 """ 403 _salt_send_domain_event( 404 opaque, conn, domain, opaque["event"], {"reason": "unknown"} # currently unused 405 ) 406 407 408def _domain_event_pmsuspend_cb(conn, domain, reason, opaque): 409 """ 410 Domain suspend events handler 411 """ 412 _salt_send_domain_event( 413 opaque, conn, domain, opaque["event"], {"reason": "unknown"} # currently unused 414 ) 415 416 417def _domain_event_balloon_change_cb(conn, domain, actual, opaque): 418 """ 419 Domain balloon change events handler 420 """ 421 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"actual": actual}) 422 423 424def _domain_event_pmsuspend_disk_cb(conn, domain, reason, opaque): 425 """ 426 Domain disk suspend events handler 427 """ 428 _salt_send_domain_event( 429 opaque, conn, domain, opaque["event"], {"reason": "unknown"} # currently unused 430 ) 431 432 433def _domain_event_block_job_cb(conn, domain, disk, job_type, status, opaque): 434 """ 435 Domain block job events handler 436 """ 437 _salt_send_domain_event( 438 opaque, 439 conn, 440 domain, 441 opaque["event"], 442 { 443 "disk": disk, 444 "type": _get_libvirt_enum_string("VIR_DOMAIN_BLOCK_JOB_TYPE_", job_type), 445 "status": _get_libvirt_enum_string("VIR_DOMAIN_BLOCK_JOB_", status), 446 }, 447 ) 448 449 450def _domain_event_device_removed_cb(conn, domain, dev, opaque): 451 """ 452 Domain device removal events handler 453 """ 454 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev}) 455 456 457def _domain_event_tunable_cb(conn, domain, params, opaque): 458 """ 459 Domain tunable events handler 460 """ 461 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"params": params}) 462 463 464# pylint: disable=invalid-name 465def _domain_event_agent_lifecycle_cb(conn, domain, state, reason, opaque): 466 """ 467 Domain agent lifecycle events handler 468 """ 469 _salt_send_domain_event( 470 opaque, 471 conn, 472 domain, 473 opaque["event"], 474 { 475 "state": _get_libvirt_enum_string( 476 "VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_STATE_", state 477 ), 478 "reason": _get_libvirt_enum_string( 479 "VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_REASON_", reason 480 ), 481 }, 482 ) 483 484 485def _domain_event_device_added_cb(conn, domain, dev, opaque): 486 """ 487 Domain device addition events handler 488 """ 489 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev}) 490 491 492# pylint: disable=invalid-name 493def _domain_event_migration_iteration_cb(conn, domain, iteration, opaque): 494 """ 495 Domain migration iteration events handler 496 """ 497 _salt_send_domain_event( 498 opaque, conn, domain, opaque["event"], {"iteration": iteration} 499 ) 500 501 502def _domain_event_job_completed_cb(conn, domain, params, opaque): 503 """ 504 Domain job completion events handler 505 """ 506 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"params": params}) 507 508 509def _domain_event_device_removal_failed_cb(conn, domain, dev, opaque): 510 """ 511 Domain device removal failure events handler 512 """ 513 _salt_send_domain_event(opaque, conn, domain, opaque["event"], {"dev": dev}) 514 515 516def _domain_event_metadata_change_cb(conn, domain, mtype, nsuri, opaque): 517 """ 518 Domain metadata change events handler 519 """ 520 _salt_send_domain_event( 521 opaque, 522 conn, 523 domain, 524 opaque["event"], 525 { 526 "type": _get_libvirt_enum_string("VIR_DOMAIN_METADATA_", mtype), 527 "nsuri": nsuri, 528 }, 529 ) 530 531 532def _domain_event_block_threshold_cb( 533 conn, domain, dev, path, threshold, excess, opaque 534): 535 """ 536 Domain block threshold events handler 537 """ 538 _salt_send_domain_event( 539 opaque, 540 conn, 541 domain, 542 opaque["event"], 543 {"dev": dev, "path": path, "threshold": threshold, "excess": excess}, 544 ) 545 546 547def _network_event_lifecycle_cb(conn, net, event, detail, opaque): 548 """ 549 Network lifecycle events handler 550 """ 551 552 _salt_send_event( 553 opaque, 554 conn, 555 { 556 "network": {"name": net.name(), "uuid": net.UUIDString()}, 557 "event": _get_libvirt_enum_string("VIR_NETWORK_EVENT_", event), 558 "detail": "unknown", # currently unused 559 }, 560 ) 561 562 563def _pool_event_lifecycle_cb(conn, pool, event, detail, opaque): 564 """ 565 Storage pool lifecycle events handler 566 """ 567 _salt_send_event( 568 opaque, 569 conn, 570 { 571 "pool": {"name": pool.name(), "uuid": pool.UUIDString()}, 572 "event": _get_libvirt_enum_string("VIR_STORAGE_POOL_EVENT_", event), 573 "detail": "unknown", # currently unused 574 }, 575 ) 576 577 578def _pool_event_refresh_cb(conn, pool, opaque): 579 """ 580 Storage pool refresh events handler 581 """ 582 _salt_send_event( 583 opaque, 584 conn, 585 { 586 "pool": {"name": pool.name(), "uuid": pool.UUIDString()}, 587 "event": opaque["event"], 588 }, 589 ) 590 591 592def _nodedev_event_lifecycle_cb(conn, dev, event, detail, opaque): 593 """ 594 Node device lifecycle events handler 595 """ 596 _salt_send_event( 597 opaque, 598 conn, 599 { 600 "nodedev": {"name": dev.name()}, 601 "event": _get_libvirt_enum_string("VIR_NODE_DEVICE_EVENT_", event), 602 "detail": "unknown", # currently unused 603 }, 604 ) 605 606 607def _nodedev_event_update_cb(conn, dev, opaque): 608 """ 609 Node device update events handler 610 """ 611 _salt_send_event( 612 opaque, conn, {"nodedev": {"name": dev.name()}, "event": opaque["event"]} 613 ) 614 615 616def _secret_event_lifecycle_cb(conn, secret, event, detail, opaque): 617 """ 618 Secret lifecycle events handler 619 """ 620 _salt_send_event( 621 opaque, 622 conn, 623 { 624 "secret": {"uuid": secret.UUIDString()}, 625 "event": _get_libvirt_enum_string("VIR_SECRET_EVENT_", event), 626 "detail": "unknown", # currently unused 627 }, 628 ) 629 630 631def _secret_event_value_changed_cb(conn, secret, opaque): 632 """ 633 Secret value change events handler 634 """ 635 _salt_send_event( 636 opaque, 637 conn, 638 {"secret": {"uuid": secret.UUIDString()}, "event": opaque["event"]}, 639 ) 640 641 642def _cleanup(cnx): 643 """ 644 Close the libvirt connection 645 646 :param cnx: libvirt connection 647 """ 648 log.debug("Closing libvirt connection: %s", cnx.getURI()) 649 cnx.close() 650 651 652def _callbacks_cleanup(cnx, callback_ids): 653 """ 654 Unregister all the registered callbacks 655 656 :param cnx: libvirt connection 657 :param callback_ids: dictionary mapping a libvirt object type to an ID list 658 of callbacks to deregister 659 """ 660 for obj, ids in callback_ids.items(): 661 register_name = REGISTER_FUNCTIONS[obj] 662 deregister_name = register_name.replace("Reg", "Dereg") 663 deregister = getattr(cnx, deregister_name) 664 for callback_id in ids: 665 deregister(callback_id) 666 667 668def _register_callback(cnx, tag_prefix, obj, event, real_id): 669 """ 670 Helper function registering a callback 671 672 :param cnx: libvirt connection 673 :param tag_prefix: salt event tag prefix to use 674 :param obj: the libvirt object name for the event. Needs to 675 be one of the REGISTER_FUNCTIONS keys. 676 :param event: the event type name. 677 :param real_id: the libvirt name of an alternative event id to use or None 678 679 :rtype integer value needed to deregister the callback 680 """ 681 libvirt_name = real_id 682 if real_id is None: 683 libvirt_name = "VIR_{}_EVENT_ID_{}".format(obj, event).upper() 684 685 if not hasattr(libvirt, libvirt_name): 686 log.warning('Skipping "%s/%s" events: libvirt too old', obj, event) 687 return None 688 689 libvirt_id = getattr(libvirt, libvirt_name) 690 callback_name = "_{}_event_{}_cb".format(obj, event) 691 callback = globals().get(callback_name, None) 692 if callback is None: 693 log.error("Missing function %s in engine", callback_name) 694 return None 695 696 register = getattr(cnx, REGISTER_FUNCTIONS[obj]) 697 return register( 698 None, 699 libvirt_id, 700 callback, 701 {"prefix": tag_prefix, "object": obj, "event": event}, 702 ) 703 704 705def _append_callback_id(ids, obj, callback_id): 706 """ 707 Helper function adding a callback ID to the IDs dict. 708 The callback ids dict maps an object to event callback ids. 709 710 :param ids: dict of callback IDs to update 711 :param obj: one of the keys of REGISTER_FUNCTIONS 712 :param callback_id: the result of _register_callback 713 """ 714 if obj not in ids: 715 ids[obj] = [] 716 ids[obj].append(callback_id) 717 718 719def start(uri=None, tag_prefix="salt/engines/libvirt_events", filters=None): 720 """ 721 Listen to libvirt events and forward them to salt. 722 723 :param uri: libvirt URI to listen on. 724 Defaults to None to pick the first available local hypervisor 725 :param tag_prefix: the beginning of the salt event tag to use. 726 Defaults to 'salt/engines/libvirt_events' 727 :param filters: the list of event of listen on. Defaults to 'all' 728 """ 729 if filters is None: 730 filters = ["all"] 731 try: 732 libvirt.virEventRegisterDefaultImpl() 733 734 cnx = libvirt.openReadOnly(uri) 735 log.debug("Opened libvirt uri: %s", cnx.getURI()) 736 737 callback_ids = {} 738 all_filters = "all" in filters 739 740 for obj, event_defs in CALLBACK_DEFS.items(): 741 for event, real_id in event_defs: 742 event_filter = "/".join((obj, event)) 743 if ( 744 event_filter not in filters 745 and obj not in filters 746 and not all_filters 747 ): 748 continue 749 registered_id = _register_callback(cnx, tag_prefix, obj, event, real_id) 750 if registered_id: 751 _append_callback_id(callback_ids, obj, registered_id) 752 753 exit_loop = False 754 while not exit_loop: 755 exit_loop = libvirt.virEventRunDefaultImpl() < 0 756 757 except Exception as err: # pylint: disable=broad-except 758 log.exception(err) 759 finally: 760 _callbacks_cleanup(cnx, callback_ids) 761 _cleanup(cnx) 762