1# Copyright (C) 2006, 2013, 2014, 2015 Red Hat, Inc. 2# Copyright (C) 2006 Daniel P. Berrange <berrange@redhat.com> 3# 4# This work is licensed under the GNU GPLv2 or later. 5# See the COPYING file in the top-level directory. 6 7import os 8import threading 9import time 10import traceback 11 12import libvirt 13 14import virtinst 15from virtinst import log 16from virtinst import pollhelpers 17 18from .lib import connectauth 19from .lib import testmock 20from .baseclass import vmmGObject 21from .lib.libvirtenummap import LibvirtEnumMap 22from .object.domain import vmmDomain 23from .object.network import vmmNetwork 24from .object.nodedev import vmmNodeDevice 25from .object.storagepool import vmmStoragePool 26from .lib.statsmanager import vmmStatsManager 27 28 29class _ObjectList(vmmGObject): 30 """ 31 Class that wraps our internal list of libvirt objects 32 """ 33 # pylint: disable=not-context-manager 34 # pylint doesn't know that lock() has 'with' support 35 BLACKLIST_COUNT = 3 36 37 def __init__(self): 38 vmmGObject.__init__(self) 39 40 self._objects = [] 41 self._denylist = {} 42 self._lock = threading.Lock() 43 44 def _cleanup(self): 45 self._objects = [] 46 47 def _denylist_key(self, obj): 48 return str(obj.__class__) + obj.get_name() 49 50 def add_denylist(self, obj): 51 """ 52 Add an object to the denylist. Basically a list of objects we 53 choose not to poll, because they threw an error at init time 54 55 :param obj: vmmLibvirtObject to denylist 56 :returns: number of added object to list 57 """ 58 key = self._denylist_key(obj) 59 count = self._denylist.get(key, 0) 60 self._denylist[key] = count + 1 61 return self._denylist[key] 62 63 def remove_denylist(self, obj): 64 """ 65 :param obj: vmmLibvirtObject to remove from denylist 66 :returns: True if object was denylisted or False otherwise. 67 """ 68 key = self._denylist_key(obj) 69 return bool(self._denylist.pop(key, 0)) 70 71 def in_denylist(self, obj): 72 """ 73 If an object is in list only once don't consider it denylisted, 74 give it one more chance. 75 76 :param obj: vmmLibvirtObject to check 77 :returns: True if object is denylisted 78 """ 79 key = self._denylist_key(obj) 80 return self._denylist.get(key, 0) >= _ObjectList.BLACKLIST_COUNT 81 82 def remove(self, obj): 83 """ 84 Remove an object from the list. 85 86 :param obj: vmmLibvirtObject to remove 87 :returns: True if object removed, False if object was not found 88 """ 89 with self._lock: 90 # Identity check is sufficient here, since we should never be 91 # asked to remove an object that wasn't at one point in the list. 92 if obj not in self._objects: 93 return self.remove_denylist(obj) 94 95 self._objects.remove(obj) 96 return True 97 98 def add(self, obj): 99 """ 100 Add an object to the list. 101 102 :param obj: vmmLibvirtObject to add 103 :returns: True if object added, False if object already in the list 104 """ 105 with self._lock: 106 # We don't look up based on identity here, to prevent tick() 107 # races from adding the same domain twice 108 # 109 # We don't use lookup_object here since we need to hold the 110 # lock the whole time to prevent a 'time of check' issue 111 for checkobj in self._objects: 112 if (checkobj.__class__ == obj.__class__ and 113 checkobj.get_name() == obj.get_name()): 114 return False 115 116 self._objects.append(obj) 117 return True 118 119 def get_objects_for_class(self, classobj): 120 """ 121 Return all objects over the passed vmmLibvirtObject class 122 """ 123 with self._lock: 124 return [o for o in self._objects if o.__class__ is classobj] 125 126 def lookup_object(self, classobj, name): 127 """ 128 Lookup an object with the passed classobj + name 129 """ 130 # Doesn't require locking, since get_objects_for_class covers us 131 for obj in self.get_objects_for_class(classobj): 132 if obj.get_name() == name: 133 return obj 134 return None 135 136 def all_objects(self): 137 with self._lock: 138 return self._objects[:] 139 140 141class vmmConnection(vmmGObject): 142 __gsignals__ = { 143 "vm-added": (vmmGObject.RUN_FIRST, None, [object]), 144 "vm-removed": (vmmGObject.RUN_FIRST, None, [object]), 145 "net-added": (vmmGObject.RUN_FIRST, None, [object]), 146 "net-removed": (vmmGObject.RUN_FIRST, None, [object]), 147 "pool-added": (vmmGObject.RUN_FIRST, None, [object]), 148 "pool-removed": (vmmGObject.RUN_FIRST, None, [object]), 149 "nodedev-added": (vmmGObject.RUN_FIRST, None, [object]), 150 "nodedev-removed": (vmmGObject.RUN_FIRST, None, [object]), 151 "resources-sampled": (vmmGObject.RUN_FIRST, None, []), 152 "state-changed": (vmmGObject.RUN_FIRST, None, []), 153 "open-completed": (vmmGObject.RUN_FIRST, None, [object]), 154 } 155 156 (_STATE_DISCONNECTED, 157 _STATE_CONNECTING, 158 _STATE_ACTIVE) = range(1, 4) 159 160 def __init__(self, uri): 161 self._uri = uri 162 vmmGObject.__init__(self) 163 164 self._state = self._STATE_DISCONNECTED 165 self._backend = virtinst.VirtinstConnection(self._uri) 166 self._closing = False 167 168 # Error strings are stored here if open() fails 169 self.connect_error = None 170 171 self._init_object_count = None 172 self._init_object_event = None 173 174 self.using_domain_events = False 175 self._domain_cb_ids = [] 176 self.using_network_events = False 177 self._network_cb_ids = [] 178 self.using_storage_pool_events = False 179 self._storage_pool_cb_ids = [] 180 self.using_node_device_events = False 181 self._node_device_cb_ids = [] 182 183 self._xml_flags = {} 184 185 self._objects = _ObjectList() 186 self.statsmanager = vmmStatsManager() 187 188 self._stats = [] 189 self._hostinfo = None 190 191 self.add_gsettings_handle( 192 self._on_config_pretty_name_changed( 193 self._config_pretty_name_changed_cb)) 194 195 self._init_virtconn() 196 197 198 @staticmethod 199 def pretty_hv(gtype, domtype): 200 """ 201 Convert XML <domain type='foo'> and <os><type>bar</type> 202 into a more human relevant string. 203 """ 204 205 gtype = gtype.lower() 206 domtype = domtype.lower() 207 208 label = domtype 209 if domtype == "xen": 210 if gtype == "xen": 211 label = "xen (paravirt)" 212 elif gtype == "hvm": 213 label = "xen (fullvirt)" 214 elif domtype == "test": 215 if gtype == "xen": 216 label = "test (xen)" 217 elif gtype == "hvm": 218 label = "test (hvm)" 219 elif domtype == "qemu": 220 label = "QEMU TCG" 221 elif domtype == "kvm": 222 label = "KVM" 223 224 return label 225 226 def __repr__(self): 227 # pylint: disable=arguments-differ 228 return "<%s uri=%s id=%s>" % ( 229 self.__class__.__name__, self.get_uri(), hex(id(self))) 230 231 232 ################# 233 # Init routines # 234 ################# 235 236 def _wait_for_condition(self, compare_cb, timeout=3): 237 """ 238 Wait for this object to emit the specified signal. Will not 239 block the mainloop. 240 """ 241 from gi.repository import Gtk 242 is_main_thread = (threading.current_thread().name == "MainThread") 243 start_time = time.time() 244 245 while True: 246 cur_time = time.time() 247 if compare_cb(): 248 return 249 if (cur_time - start_time) >= timeout: 250 return # pragma: no cover 251 252 if is_main_thread: 253 if Gtk.events_pending(): 254 Gtk.main_iteration_do(False) 255 continue 256 257 time.sleep(.1) 258 259 def _init_virtconn(self): 260 self._backend.cb_fetch_all_domains = ( 261 lambda: [obj.get_xmlobj(refresh_if_nec=False) 262 for obj in self.list_vms()]) 263 self._backend.cb_fetch_all_pools = ( 264 lambda: [obj.get_xmlobj(refresh_if_nec=False) 265 for obj in self.list_pools()]) 266 self._backend.cb_fetch_all_nodedevs = ( 267 lambda: [obj.get_xmlobj(refresh_if_nec=False) 268 for obj in self.list_nodedevs()]) 269 270 def fetch_all_vols(): 271 ret = [] 272 for pool in self.list_pools(): 273 for vol in pool.get_volumes(): 274 try: 275 ret.append(vol.get_xmlobj(refresh_if_nec=False)) 276 except Exception as e: # pragma: no cover 277 log.debug("Fetching volume XML failed: %s", e) 278 return ret 279 self._backend.cb_fetch_all_vols = fetch_all_vols 280 281 def cache_new_pool(obj): 282 if not self.is_active(): 283 return 284 name = obj.name() 285 self.schedule_priority_tick(pollpool=True) 286 def compare_cb(): 287 return bool(self.get_pool_by_name(name)) 288 self._wait_for_condition(compare_cb) 289 self._backend.cb_cache_new_pool = cache_new_pool 290 291 292 ######################## 293 # General data getters # 294 ######################## 295 296 def get_uri(self): 297 return self._uri 298 def get_backend(self): 299 return self._backend 300 301 def invalidate_caps(self): 302 return self._backend.invalidate_caps() 303 caps = property(lambda self: getattr(self, "_backend").caps) 304 305 def host_memory_size(self): 306 if not self._backend.is_open() or self._hostinfo is None: 307 return 0 308 return self._hostinfo[1] * 1024 309 310 def host_active_processor_count(self): 311 if not self._backend.is_open() or self._hostinfo is None: 312 return 0 # pragma: no cover 313 return self._hostinfo[2] 314 315 316 ########################## 317 # URI + hostname helpers # 318 ########################## 319 320 def libvirt_gethostname(self): 321 return self._backend.getHostname() 322 323 get_uri_hostname = property(lambda s: 324 getattr(s, "_backend").get_uri_hostname) 325 get_uri_username = property(lambda s: 326 getattr(s, "_backend").get_uri_username) 327 get_uri_transport = property(lambda s: 328 getattr(s, "_backend").get_uri_transport) 329 get_uri_port = property(lambda s: getattr(s, "_backend").get_uri_port) 330 get_driver = property(lambda s: getattr(s, "_backend").get_uri_driver) 331 is_container_only = property( 332 lambda s: getattr(s, "_backend").is_container_only) 333 is_lxc = property(lambda s: getattr(s, "_backend").is_lxc) 334 is_vz = property(lambda s: getattr(s, "_backend").is_vz) 335 is_xen = property(lambda s: getattr(s, "_backend").is_xen) 336 is_remote = property(lambda s: getattr(s, "_backend").is_remote) 337 is_qemu = property(lambda s: getattr(s, "_backend").is_qemu) 338 is_qemu_privileged = property(lambda s: getattr(s, "_backend").is_qemu_privileged) 339 is_qemu_unprivileged = property(lambda s: 340 getattr(s, "_backend").is_qemu_unprivileged) 341 is_test = property(lambda s: getattr(s, "_backend").is_test) 342 is_unprivileged = property(lambda s: getattr(s, "_backend").is_unprivileged) 343 344 345 def get_cache_dir(self): 346 uri = self.get_uri().replace("/", "_") 347 ret = os.path.join(self._backend.get_app_cache_dir(), uri) 348 if not os.path.exists(ret): 349 os.makedirs(ret, 0o755) # pragma: no cover 350 return ret 351 352 def get_default_storage_format(self): 353 raw = self.config.get_default_storage_format(raw=True) 354 if raw != "default": 355 return raw # pragma: no cover 356 357 fmt = self.config.get_default_storage_format() 358 if fmt != "qcow2": 359 return fmt # pragma: no cover 360 361 if self.support.conn_default_qcow2(): 362 return fmt 363 return None # pragma: no cover 364 365 366 #################################### 367 # Connection pretty print routines # 368 #################################### 369 370 def get_pretty_desc(self): 371 """ 372 Return a pretty label for use in the manager view, and various 373 connection lists. 374 """ 375 if self._get_config_pretty_name(): 376 return self._get_config_pretty_name() 377 378 pretty_map = { 379 "esx": "ESX", 380 "lxc": "LXC", 381 "openvz": "OpenVZ", 382 "qemu": "QEMU/KVM", 383 "vbox": "Virtualbox", 384 "vmware": "VMWare", 385 "xen": "Xen", 386 } 387 388 hv = pretty_map.get(self.get_driver(), self.get_driver()) 389 hostname = self.get_uri_hostname() 390 path = self.get_backend().get_uri_path() 391 392 ret = hv 393 394 if path == "/session": 395 ret += " " + _("User session") 396 elif (path and path != "/system" and os.path.basename(path)): 397 # Used by test URIs to report what XML file they are using 398 ret += " %s" % os.path.basename(path) 399 400 if hostname: 401 ret += ": %s" % hostname 402 403 return ret 404 405 406 ####################### 407 # API support helpers # 408 ####################### 409 410 @property 411 def support(self): 412 return self._backend.support 413 414 def _get_flags_helper(self, obj, key, check_func): 415 ignore = obj 416 flags_dict = self._xml_flags.get(key) 417 418 if flags_dict is None: 419 # Flags already set 420 inact, act = check_func() 421 flags_dict = {} 422 flags_dict["active"] = act 423 flags_dict["inactive"] = inact 424 425 self._xml_flags[key] = flags_dict 426 427 active_flags = flags_dict["active"] 428 inactive_flags = flags_dict["inactive"] 429 430 return (inactive_flags, active_flags) 431 432 def get_dom_flags(self, vm): 433 key = "domain" 434 435 def check_func(): 436 act = 0 437 inact = 0 438 439 if self.support.domain_xml_inactive(vm): 440 inact = libvirt.VIR_DOMAIN_XML_INACTIVE 441 else: # pragma: no cover 442 log.debug("Domain XML inactive flag not supported.") 443 444 if self.support.domain_xml_secure(vm): 445 inact |= libvirt.VIR_DOMAIN_XML_SECURE 446 act = libvirt.VIR_DOMAIN_XML_SECURE 447 else: # pragma: no cover 448 log.debug("Domain XML secure flag not supported.") 449 450 return inact, act 451 452 return self._get_flags_helper(vm, key, check_func) 453 454 def get_default_pool(self): 455 poolxml = virtinst.StoragePool.lookup_default_pool(self.get_backend()) 456 if poolxml: 457 for p in self.list_pools(): 458 if p.get_name() == poolxml.name: 459 return p 460 return None 461 462 def get_vol_by_path(self, path): 463 for pool in self.list_pools(): 464 for vol in pool.get_volumes(): 465 try: 466 if vol.get_target_path() == path: 467 return vol 468 except Exception as e: # pragma: no cover 469 # Errors can happen if the volume disappeared, bug 1092739 470 log.debug("Error looking up volume from path=%s: %s", 471 path, e) 472 return None 473 474 475 ################################### 476 # Connection state getter/setters # 477 ################################### 478 479 def _change_state(self, newstate): 480 if self._state != newstate: 481 self._state = newstate 482 log.debug("conn=%s changed to state=%s", 483 self.get_uri(), self.get_state_text()) 484 self.emit("state-changed") 485 486 def is_active(self): 487 return self._state == self._STATE_ACTIVE 488 def is_disconnected(self): 489 return self._state == self._STATE_DISCONNECTED 490 def is_connecting(self): 491 return self._state == self._STATE_CONNECTING 492 493 def get_state_text(self): 494 if self.is_disconnected(): 495 return _("Disconnected") 496 elif self.is_connecting(): 497 return _("Connecting") 498 return _("Active") 499 500 501 ################################# 502 # Libvirt object lookup methods # 503 ################################# 504 505 def get_vm_by_name(self, name): 506 return self._objects.lookup_object(vmmDomain, name) 507 def list_vms(self): 508 return self._objects.get_objects_for_class(vmmDomain) 509 510 def get_net_by_name(self, name): 511 return self._objects.lookup_object(vmmNetwork, name) 512 def list_nets(self): 513 return self._objects.get_objects_for_class(vmmNetwork) 514 515 def get_pool_by_name(self, name): 516 return self._objects.lookup_object(vmmStoragePool, name) 517 def list_pools(self): 518 return self._objects.get_objects_for_class(vmmStoragePool) 519 520 def get_nodedev_by_name(self, name): 521 return self._objects.lookup_object(vmmNodeDevice, name) 522 def list_nodedevs(self): 523 return self._objects.get_objects_for_class(vmmNodeDevice) 524 525 526 ############################ 527 # nodedev helper functions # 528 ############################ 529 530 def filter_nodedevs(self, devtype): 531 retdevs = [] 532 for dev in self.list_nodedevs(): 533 try: 534 xmlobj = dev.get_xmlobj() 535 except libvirt.libvirtError as e: # pragma: no cover 536 # Libvirt nodedev XML fetching can be busted 537 # https://bugzilla.redhat.com/show_bug.cgi?id=1225771 538 if e.get_error_code() != libvirt.VIR_ERR_NO_NODE_DEVICE: 539 log.debug("Error fetching nodedev XML", exc_info=True) 540 continue 541 542 if devtype and xmlobj.device_type != devtype: 543 continue 544 545 retdevs.append(dev) 546 return retdevs 547 548 549 ################################### 550 # Libvirt object creation methods # 551 ################################### 552 553 def define_domain(self, xml): 554 return self._backend.defineXML(xml) 555 def define_network(self, xml): 556 return self._backend.networkDefineXML(xml) 557 def define_pool(self, xml): 558 return self._backend.storagePoolDefineXML(xml, 0) 559 560 def rename_object(self, obj, origxml, newxml): 561 if obj.is_domain(): 562 define_cb = self.define_domain 563 elif obj.is_pool(): 564 define_cb = self.define_pool 565 elif obj.is_network(): 566 define_cb = self.define_network 567 else: 568 raise virtinst.xmlutil.DevError("rename_object " 569 "helper doesn't support object class %s" % obj.__class__) 570 571 # Undefine the original object 572 obj.delete(force=False) 573 574 newobj = None 575 try: 576 # Redefine new domain 577 newobj = define_cb(newxml) 578 except Exception as renameerr: 579 try: 580 log.debug("Error defining new name %s XML", 581 obj.class_name(), exc_info=True) 582 newobj = define_cb(origxml) 583 except Exception as fixerr: # pragma: no cover 584 log.debug("Failed to redefine original %s!", 585 obj.class_name(), exc_info=True) 586 msg = _("%(object)s rename failed. Attempting to recover also " 587 "failed.\n" 588 "\n" 589 "Original error: %(origerror)s\n" 590 "\n" 591 "Recover error: %(recovererror)s") % { 592 "object": obj.class_name(), 593 "origerror": str(renameerr), 594 "recovererror": str(fixerr), 595 } 596 raise RuntimeError(msg) from None 597 raise 598 finally: 599 if newobj: 600 # Reinsert handle into new obj 601 obj.change_name_backend(newobj) 602 603 604 ######################### 605 # Domain event handling # 606 ######################### 607 608 # Our strategy here isn't the most efficient: since we need to keep the 609 # poll helpers around for compat with old libvirt, switching to a fully 610 # event driven setup is hard, so we end up doing more polling than 611 # necessary on most events. 612 613 def _domain_xml_misc_event(self, conn, domain, *args): 614 # Just trigger a domain XML refresh for hotplug type events 615 ignore = conn 616 args = list(args) 617 eventstr = args.pop(-1) 618 619 name = domain.name() 620 log.debug("domain xmlmisc event: domain=%s event=%s args=%s", 621 name, eventstr, args) 622 obj = self.get_vm_by_name(name) 623 if obj: 624 self.idle_add(obj.recache_from_event_loop) 625 626 def _domain_lifecycle_event(self, conn, domain, state, reason, userdata): 627 ignore = conn 628 ignore = userdata 629 630 name = domain.name() 631 log.debug("domain lifecycle event: domain=%s %s", name, 632 LibvirtEnumMap.domain_lifecycle_str(state, reason)) 633 634 obj = self.get_vm_by_name(name) 635 636 if obj: 637 self.idle_add(obj.recache_from_event_loop) 638 else: 639 self.schedule_priority_tick(pollvm=True, force=True) 640 641 def _domain_agent_lifecycle_event(self, conn, domain, state, reason, userdata): 642 ignore = conn 643 ignore = userdata 644 645 name = domain.name() 646 log.debug("domain agent lifecycle event: domain=%s %s", name, 647 LibvirtEnumMap.domain_agent_lifecycle_str(state, reason)) 648 649 obj = self.get_vm_by_name(name) 650 651 if obj: 652 self.idle_add(obj.recache_from_event_loop) 653 else: 654 self.schedule_priority_tick(pollvm=True, force=True) # pragma: no cover 655 656 def _network_lifecycle_event(self, conn, network, state, reason, userdata): 657 ignore = conn 658 ignore = userdata 659 660 name = network.name() 661 log.debug("network lifecycle event: network=%s %s", 662 name, LibvirtEnumMap.network_lifecycle_str(state, reason)) 663 obj = self.get_net_by_name(name) 664 665 if obj: 666 self.idle_add(obj.recache_from_event_loop) 667 else: 668 self.schedule_priority_tick(pollnet=True, force=True) 669 670 def _storage_pool_lifecycle_event(self, conn, pool, 671 state, reason, userdata): 672 ignore = conn 673 ignore = userdata 674 675 name = pool.name() 676 log.debug("storage pool lifecycle event: pool=%s %s", 677 name, LibvirtEnumMap.storage_lifecycle_str(state, reason)) 678 679 obj = self.get_pool_by_name(name) 680 681 if obj: 682 self.idle_add(obj.recache_from_event_loop) 683 else: 684 self.schedule_priority_tick(pollpool=True, force=True) 685 686 def _storage_pool_refresh_event(self, conn, pool, userdata): 687 ignore = conn 688 ignore = userdata 689 690 name = pool.name() 691 log.debug("storage pool refresh event: pool=%s", name) 692 693 obj = self.get_pool_by_name(name) 694 695 if not obj: 696 return 697 698 self.idle_add(obj.refresh_pool_cache_from_event_loop) 699 700 def _node_device_lifecycle_event(self, conn, dev, 701 state, reason, userdata): 702 ignore = conn 703 ignore = userdata 704 705 name = dev.name() 706 log.debug("node device lifecycle event: nodedev=%s %s", 707 name, LibvirtEnumMap.nodedev_lifecycle_str(state, reason)) 708 709 self.schedule_priority_tick(pollnodedev=True, force=True) 710 711 def _node_device_update_event(self, conn, dev, userdata): 712 ignore = conn 713 ignore = userdata 714 715 name = dev.name() 716 log.debug("node device update event: nodedev=%s", name) 717 718 obj = self.get_nodedev_by_name(name) 719 720 if obj: 721 self.idle_add(obj.recache_from_event_loop) 722 723 def _add_conn_events(self): 724 if not self.support.conn_working_xen_events(): 725 return # pragma: no cover 726 727 def _check_events_disabled(): 728 if self.config.CLITestOptions.no_events: 729 raise RuntimeError("events disabled via cli") 730 731 try: 732 _check_events_disabled() 733 734 self._domain_cb_ids.append( 735 self.get_backend().domainEventRegisterAny( 736 None, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, 737 self._domain_lifecycle_event, None)) 738 self.using_domain_events = True 739 log.debug("Using domain events") 740 except Exception as e: 741 self.using_domain_events = False 742 log.debug("Error registering domain events: %s", e) 743 744 def _add_domain_xml_event(eventname, eventval, cb=None): 745 if not self.using_domain_events: 746 return 747 if not cb: 748 cb = self._domain_xml_misc_event 749 try: 750 eventid = getattr(libvirt, eventname, eventval) 751 self._domain_cb_ids.append( 752 self.get_backend().domainEventRegisterAny( 753 None, eventid, cb, eventname)) 754 755 if (eventname == "VIR_DOMAIN_EVENT_ID_AGENT_LIFECYCLE" and 756 self.config.CLITestOptions.fake_agent_event): 757 testmock.schedule_fake_agent_event(self, cb) 758 except Exception as e: # pragma: no cover 759 log.debug("Error registering %s event: %s", 760 eventname, e) 761 762 _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_BALLOON_CHANGE", 13) 763 _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_TRAY_CHANGE", 10) 764 _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED", 15) 765 _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_DEVICE_ADDED", 19) 766 _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_AGENT_LIFECYCLE", 18, 767 self._domain_agent_lifecycle_event) 768 769 try: 770 _check_events_disabled() 771 772 eventid = getattr(libvirt, "VIR_NETWORK_EVENT_ID_LIFECYCLE", 0) 773 self._network_cb_ids.append( 774 self.get_backend().networkEventRegisterAny( 775 None, eventid, self._network_lifecycle_event, None)) 776 self.using_network_events = True 777 log.debug("Using network events") 778 except Exception as e: 779 self.using_network_events = False 780 log.debug("Error registering network events: %s", e) 781 782 try: 783 _check_events_disabled() 784 785 eventid = getattr(libvirt, 786 "VIR_STORAGE_POOL_EVENT_ID_LIFECYCLE", 0) 787 refreshid = getattr(libvirt, 788 "VIR_STORAGE_POOL_EVENT_ID_REFRESH", 1) 789 self._storage_pool_cb_ids.append( 790 self.get_backend().storagePoolEventRegisterAny( 791 None, eventid, self._storage_pool_lifecycle_event, None)) 792 self._storage_pool_cb_ids.append( 793 self.get_backend().storagePoolEventRegisterAny( 794 None, refreshid, self._storage_pool_refresh_event, None)) 795 self.using_storage_pool_events = True 796 log.debug("Using storage pool events") 797 except Exception as e: 798 self.using_storage_pool_events = False 799 log.debug("Error registering storage pool events: %s", e) 800 801 try: 802 _check_events_disabled() 803 804 eventid = getattr(libvirt, "VIR_NODE_DEVICE_EVENT_ID_LIFECYCLE", 0) 805 updateid = getattr(libvirt, "VIR_NODE_DEVICE_EVENT_ID_UPDATE", 1) 806 lifecycle_cb = self._node_device_lifecycle_event 807 update_cb = self._node_device_update_event 808 809 self._node_device_cb_ids.append( 810 self.get_backend().nodeDeviceEventRegisterAny( 811 None, eventid, lifecycle_cb, None)) 812 self._node_device_cb_ids.append( 813 self.get_backend().nodeDeviceEventRegisterAny( 814 None, updateid, update_cb, None)) 815 816 if self.config.CLITestOptions.fake_nodedev_event: 817 testmock.schedule_fake_nodedev_event(self, 818 lifecycle_cb, update_cb) 819 820 self.using_node_device_events = True 821 log.debug("Using node device events") 822 except Exception as e: 823 self.using_network_events = False 824 log.debug("Error registering node device events: %s", e) 825 826 827 ###################################### 828 # Connection closing/opening methods # 829 ###################################### 830 831 def _schedule_close(self): 832 self._closing = True 833 self.idle_add(self.close) 834 835 def close(self): 836 if not self.is_disconnected(): 837 log.debug("conn.close() uri=%s", self.get_uri()) 838 self._closing = True 839 840 try: 841 if self._backend.is_open(): 842 for eid in self._domain_cb_ids: 843 self._backend.domainEventDeregisterAny(eid) 844 for eid in self._network_cb_ids: 845 self._backend.networkEventDeregisterAny(eid) 846 for eid in self._storage_pool_cb_ids: 847 self._backend.storagePoolEventDeregisterAny(eid) 848 for eid in self._node_device_cb_ids: 849 self._backend.nodeDeviceEventDeregisterAny(eid) 850 except Exception: # pragma: no cover 851 log.debug("Failed to deregister events in conn cleanup", 852 exc_info=True) 853 finally: 854 self._domain_cb_ids = [] 855 self._network_cb_ids = [] 856 self._storage_pool_cb_ids = [] 857 self._node_device_cb_ids = [] 858 859 self._stats = [] 860 861 if self._init_object_event: 862 self._init_object_event.clear() # pragma: no cover 863 864 for obj in self._objects.all_objects(): 865 self._objects.remove(obj) 866 try: 867 self._remove_object_signal(obj) 868 obj.cleanup() 869 except Exception as e: # pragma: no cover 870 log.debug("Failed to cleanup %s: %s", obj, e) 871 self._objects.cleanup() 872 self._objects = _ObjectList() 873 874 closeret = self._backend.close() 875 if closeret == 1: 876 log.debug( # pragma: no cover 877 "LEAK: conn close() returned 1, " 878 "meaning refs may have leaked.") 879 880 self._change_state(self._STATE_DISCONNECTED) 881 self._closing = False 882 883 def _cleanup(self): 884 self.close() 885 886 self._objects = None 887 self._backend.cb_fetch_all_domains = None 888 self._backend.cb_fetch_all_pools = None 889 self._backend.cb_fetch_all_nodedevs = None 890 self._backend.cb_fetch_all_vols = None 891 self._backend.cb_cache_new_pool = None 892 893 self.statsmanager.cleanup() 894 self.statsmanager = None 895 896 def open(self): 897 if not self.is_disconnected(): 898 return # pragma: no cover 899 900 self._change_state(self._STATE_CONNECTING) 901 902 log.debug("Scheduling background open thread for %s", 903 self.get_uri()) 904 self._start_thread(self._open_thread, "Connect %s" % self.get_uri()) 905 906 def _do_open(self): 907 warnconsole = False 908 libvirt_error_code = None 909 libvirt_error_message = None 910 exc = None 911 912 try: 913 cb = connectauth.creds_dialog 914 data = self 915 if self.config.CLITestOptions.fake_openauth: 916 testmock.fake_openauth(self, cb, data) 917 if self.config.CLITestOptions.fake_session_error: 918 lerr = libvirt.libvirtError("fake session error") 919 lerr.err = [libvirt.VIR_ERR_AUTH_FAILED, None, 920 "fake session error not authorized"] 921 raise lerr 922 self._backend.open(cb, data) 923 return True, None 924 except Exception as e: 925 exc = e 926 tb = "".join(traceback.format_exc()) 927 if isinstance(exc, libvirt.libvirtError): 928 # pylint: disable=no-member 929 libvirt_error_code = exc.get_error_code() 930 libvirt_error_message = exc.get_error_message() 931 932 if (libvirt_error_code == 933 getattr(libvirt, "VIR_ERR_AUTH_CANCELLED", None)): # pragma: no cover 934 log.debug("User cancelled auth, not raising any error.") 935 return False, None 936 937 if (libvirt_error_code == libvirt.VIR_ERR_AUTH_FAILED and 938 "not authorized" in libvirt_error_message.lower()): 939 log.debug("Looks like we might have failed policykit " 940 "auth. Checking to see if we have a valid " 941 "console session") 942 if not self.is_remote(): 943 warnconsole = bool(not connectauth.do_we_have_session()) 944 if self.config.CLITestOptions.fake_session_error: 945 warnconsole = True 946 947 ConnectError = connectauth.connect_error( 948 self, str(exc), tb, warnconsole) 949 return False, ConnectError 950 951 def _populate_initial_state(self): 952 log.debug("libvirt version=%s", self._backend.local_libvirt_version()) 953 log.debug("daemon version=%s", self._backend.daemon_version()) 954 log.debug("conn version=%s", self._backend.conn_version()) 955 log.debug("%s capabilities:\n%s", self.get_uri(), self.caps.get_xml()) 956 957 if not self.support.conn_domain(): # pragma: no cover 958 raise RuntimeError("Connection does not support required " 959 "domain listing APIs") 960 961 if not self.support.conn_storage(): # pragma: no cover 962 log.debug("Connection doesn't seem to support storage APIs.") 963 if not self.support.conn_network(): # pragma: no cover 964 log.debug("Connection doesn't seem to support network APIs.") 965 if not self.support.conn_nodedev(): # pragma: no cover 966 log.debug("Connection doesn't seem to support nodedev APIs.") 967 968 self._add_conn_events() 969 970 try: 971 self._backend.setKeepAlive(20, 1) 972 except Exception as e: 973 log.debug("Failed to setKeepAlive: %s", str(e)) 974 975 # The initial tick will set up a threading event that will only 976 # trigger after all the polled libvirt objects are fully initialized. 977 # That way we only report the connection is open when everything is 978 # nicely setup for the rest of the app. 979 980 self._init_object_event = threading.Event() 981 self._init_object_count = 0 982 983 self.schedule_priority_tick(stats_update=True, 984 pollvm=True, pollnet=True, 985 pollpool=True, pollnodedev=True, 986 force=True, initial_poll=True) 987 988 self._init_object_event.wait() 989 self._init_object_event = None 990 self._init_object_count = None 991 992 # Try to create the default storage pool 993 # We need this after events setup so we can determine if the default 994 # pool already exists 995 try: 996 virtinst.StoragePool.build_default_pool(self.get_backend()) 997 except Exception as e: # pragma: no cover 998 log.debug("Building default pool failed: %s", str(e)) 999 1000 def _open_thread(self): 1001 try: 1002 is_active, ConnectError = self._do_open() 1003 if is_active: 1004 self._populate_initial_state() 1005 except Exception as e: # pragma: no cover 1006 is_active = False 1007 ConnectError = connectauth.connect_error(self, str(e), 1008 "".join(traceback.format_exc()), False) 1009 1010 if is_active: 1011 self.idle_add(self._change_state, self._STATE_ACTIVE) 1012 else: 1013 self._schedule_close() 1014 1015 self.idle_emit("open-completed", ConnectError) 1016 1017 1018 ####################### 1019 # Tick/Update methods # 1020 ####################### 1021 1022 def _remove_object_signal(self, obj): 1023 if obj.is_domain(): 1024 self.emit("vm-removed", obj) 1025 elif obj.is_network(): 1026 self.emit("net-removed", obj) 1027 elif obj.is_pool(): 1028 self.emit("pool-removed", obj) 1029 elif obj.is_nodedev(): 1030 self.emit("nodedev-removed", obj) 1031 1032 def _gone_object_signals(self, gone_objects): 1033 """ 1034 Responsible for signaling the UI for any updates. All possible UI 1035 updates need to go here to enable threading that doesn't block the 1036 app with long tick operations. 1037 """ 1038 if not self._backend.is_open(): 1039 return # pragma: no cover 1040 1041 for obj in gone_objects: 1042 class_name = obj.class_name() 1043 name = obj.get_name() 1044 1045 if not self._objects.remove(obj): 1046 log.debug("Requested removal of %s=%s, but it's " 1047 "not in our object list.", class_name, name) 1048 continue 1049 1050 log.debug("%s=%s removed", class_name, name) 1051 self._remove_object_signal(obj) 1052 obj.cleanup() 1053 1054 def _new_object_cb(self, obj, initialize_failed): 1055 if not self._backend.is_open(): 1056 return # pragma: no cover 1057 1058 try: 1059 class_name = obj.class_name() 1060 1061 if initialize_failed: 1062 log.debug("Blacklisting %s=%s", class_name, obj.get_name()) 1063 count = self._objects.add_denylist(obj) 1064 log.debug("Object added in denylist, count=%d", count) 1065 return 1066 1067 self._objects.remove_denylist(obj) 1068 if not self._objects.add(obj): 1069 log.debug("New %s=%s requested, but it's already tracked.", 1070 class_name, obj.get_name()) 1071 obj.cleanup() 1072 return 1073 1074 if not obj.is_nodedev(): 1075 # Skip nodedev logging since it's noisy and not interesting 1076 log.debug("%s=%s status=%s added", class_name, 1077 obj.get_name(), obj.run_status()) 1078 if obj.is_domain(): 1079 self.emit("vm-added", obj) 1080 elif obj.is_network(): 1081 self.emit("net-added", obj) 1082 elif obj.is_pool(): 1083 self.emit("pool-added", obj) 1084 elif obj.is_nodedev(): 1085 self.emit("nodedev-added", obj) 1086 finally: 1087 if self._init_object_event: 1088 self._init_object_count -= 1 1089 if self._init_object_count <= 0: 1090 self._init_object_event.set() 1091 1092 def _poll(self, initial_poll, 1093 pollvm, pollnet, pollpool, pollnodedev): 1094 """ 1095 Helper called from tick() to do necessary polling and return 1096 the relevant object lists 1097 """ 1098 gone_objects = [] 1099 preexisting_objects = [] 1100 1101 def _process_objects(ptype): 1102 if ptype == "nets": 1103 dopoll = pollnet 1104 objs = self.list_nets() 1105 cls = vmmNetwork 1106 pollcb = pollhelpers.fetch_nets 1107 elif ptype == "pools": 1108 dopoll = pollpool 1109 objs = self.list_pools() 1110 cls = vmmStoragePool 1111 pollcb = pollhelpers.fetch_pools 1112 elif ptype == "nodedevs": 1113 dopoll = pollnodedev 1114 objs = self.list_nodedevs() 1115 cls = vmmNodeDevice 1116 pollcb = pollhelpers.fetch_nodedevs 1117 else: 1118 dopoll = pollvm 1119 objs = self.list_vms() 1120 cls = vmmDomain 1121 pollcb = pollhelpers.fetch_vms 1122 1123 1124 keymap = dict((o.get_name(), o) for o in objs) 1125 def cb(obj, name): 1126 return cls(self, obj, name) 1127 if dopoll: 1128 gone, new, master = pollcb(self._backend, keymap, cb) 1129 else: 1130 gone, new, master = [], [], list(keymap.values()) 1131 1132 if initial_poll: 1133 self._init_object_count += len(new) 1134 1135 gone_objects.extend(gone) 1136 preexisting_objects.extend([o for o in master if o not in new]) 1137 new = [n for n in new if not self._objects.in_denylist(n)] 1138 return new 1139 1140 new_vms = _process_objects("vms") 1141 new_nets = _process_objects("nets") 1142 new_pools = _process_objects("pools") 1143 new_nodedevs = _process_objects("nodedevs") 1144 1145 # Kick off one thread per object type to handle the initial 1146 # XML fetching. Going any more fine grained then this probably 1147 # won't be that useful due to libvirt's locking structure. 1148 # 1149 # Would prefer to start refreshing some objects before all polling 1150 # is complete, but we need init_object_count to be fully accurate 1151 # before we start initializing objects 1152 1153 if initial_poll and self._init_object_count == 0: 1154 # If the connection doesn't have any objects, new_object_cb 1155 # is never called and the event is never set, so let's do it here 1156 self._init_object_event.set() 1157 1158 for newlist in [new_vms, new_nets, new_pools, new_nodedevs]: 1159 if not newlist: 1160 continue 1161 1162 def cb(lst): 1163 for obj in lst: 1164 obj.connect_once("initialized", self._new_object_cb) 1165 obj.init_libvirt_state() 1166 1167 self._start_thread(cb, 1168 "refreshing xml for new %s" % newlist[0].class_name(), 1169 args=(newlist,)) 1170 1171 return gone_objects, preexisting_objects 1172 1173 def _tick(self, stats_update=False, 1174 pollvm=False, pollnet=False, 1175 pollpool=False, pollnodedev=False, 1176 force=False, initial_poll=False): 1177 """ 1178 main update function: polls for new objects, updates stats, ... 1179 1180 :param force: Perform the requested polling even if async events 1181 are in use. 1182 """ 1183 if self._closing: 1184 return # pragma: no cover 1185 if self.is_disconnected(): 1186 return # pragma: no cover 1187 if self.is_connecting() and not force: 1188 return # pragma: no cover 1189 1190 # We need to set this before the event check, since stats polling 1191 # is independent of events 1192 if not pollvm: 1193 stats_update = False 1194 1195 if self.using_domain_events and not force: 1196 pollvm = False 1197 if self.using_network_events and not force: 1198 pollnet = False 1199 if self.using_storage_pool_events and not force: 1200 pollpool = False 1201 if self.using_node_device_events and not force: 1202 pollnodedev = False 1203 1204 self._hostinfo = self._backend.getInfo() 1205 if stats_update: 1206 self.statsmanager.cache_all_stats(self) 1207 1208 gone_objects, preexisting_objects = self._poll( 1209 initial_poll, pollvm, pollnet, pollpool, pollnodedev) 1210 self.idle_add(self._gone_object_signals, gone_objects) 1211 1212 # Only tick() pre-existing objects, since new objects will be 1213 # initialized asynchronously and tick() would be redundant 1214 for obj in preexisting_objects: 1215 try: 1216 if obj.reports_stats() and stats_update: 1217 pass 1218 elif obj.is_domain() and not pollvm: 1219 continue 1220 elif obj.is_network() and not pollnet: 1221 continue 1222 elif obj.is_pool() and not pollpool: 1223 continue 1224 elif obj.is_nodedev() and not pollnodedev: 1225 continue 1226 1227 if self.config.CLITestOptions.conn_crash: 1228 self._backend.close() 1229 e = libvirt.libvirtError("fake error") 1230 e.err = [libvirt.VIR_ERR_SYSTEM_ERROR] 1231 raise e 1232 1233 obj.tick(stats_update=stats_update) 1234 except Exception as e: 1235 log.exception("Tick for %s failed", obj) 1236 if (isinstance(e, libvirt.libvirtError) and 1237 (getattr(e, "get_error_code")() == 1238 libvirt.VIR_ERR_SYSTEM_ERROR)): 1239 # Try a simple getInfo call to see if conn was dropped 1240 self._backend.getInfo() 1241 log.debug( # pragma: no cover 1242 "vm tick raised system error but " 1243 "connection doesn't seem to have dropped. " 1244 "Ignoring.") 1245 1246 if stats_update: 1247 self._recalculate_stats( 1248 [o for o in preexisting_objects if o.reports_stats()]) 1249 self.idle_emit("resources-sampled") 1250 1251 def _recalculate_stats(self, vms): 1252 if not self._backend.is_open(): 1253 return # pragma: no cover 1254 1255 now = time.time() 1256 expected = self.config.get_stats_history_length() 1257 current = len(self._stats) 1258 if current > expected: 1259 del self._stats[expected:current] # pragma: no cover 1260 1261 mem = 0 1262 cpuTime = 0 1263 rdRate = 0 1264 wrRate = 0 1265 rxRate = 0 1266 txRate = 0 1267 diskMaxRate = self.disk_io_max_rate() or 10.0 1268 netMaxRate = self.network_traffic_max_rate() or 10.0 1269 1270 for vm in vms: 1271 if not vm.is_active(): 1272 continue 1273 1274 cpuTime += vm.cpu_time() 1275 mem += vm.stats_memory() 1276 rdRate += vm.disk_read_rate() 1277 wrRate += vm.disk_write_rate() 1278 rxRate += vm.network_rx_rate() 1279 txRate += vm.network_tx_rate() 1280 1281 netMaxRate = max(netMaxRate, vm.network_traffic_max_rate()) 1282 diskMaxRate = max(diskMaxRate, vm.disk_io_max_rate()) 1283 1284 pcentHostCpu = 0 1285 pcentMem = mem * 100.0 / self.host_memory_size() 1286 1287 if len(self._stats) > 0: 1288 prevTimestamp = self._stats[0]["timestamp"] 1289 host_cpus = self.host_active_processor_count() 1290 1291 pcentHostCpu = ((cpuTime) * 100.0 / 1292 ((now - prevTimestamp) * 1293 1000.0 * 1000.0 * 1000.0 * host_cpus)) 1294 1295 pcentHostCpu = max(0.0, min(100.0, pcentHostCpu)) 1296 pcentMem = max(0.0, min(100.0, pcentMem)) 1297 1298 newStats = { 1299 "timestamp": now, 1300 "memory": mem, 1301 "memoryPercent": pcentMem, 1302 "cpuTime": cpuTime, 1303 "cpuHostPercent": pcentHostCpu, 1304 "diskRdRate": rdRate, 1305 "diskWrRate": wrRate, 1306 "netRxRate": rxRate, 1307 "netTxRate": txRate, 1308 "diskMaxRate": diskMaxRate, 1309 "netMaxRate": netMaxRate, 1310 } 1311 1312 self._stats.insert(0, newStats) 1313 1314 1315 def schedule_priority_tick(self, **kwargs): 1316 from .engine import vmmEngine 1317 vmmEngine.get_instance().schedule_priority_tick(self, kwargs) 1318 1319 def tick_from_engine(self, *args, **kwargs): 1320 try: 1321 self._tick(*args, **kwargs) 1322 except Exception: 1323 self._schedule_close() 1324 raise 1325 1326 1327 ######################## 1328 # Stats getter methods # 1329 ######################## 1330 1331 def _get_record_helper(self, record_name): 1332 if len(self._stats) == 0: 1333 return 0 1334 return self._stats[0][record_name] 1335 1336 def _vector_helper(self, record_name, limit, ceil=100.0): 1337 vector = [] 1338 statslen = self.config.get_stats_history_length() + 1 1339 if limit is not None: 1340 statslen = min(statslen, limit) # pragma: no cover 1341 1342 for i in range(statslen): 1343 if i < len(self._stats): 1344 vector.append(self._stats[i][record_name] / ceil) 1345 else: 1346 vector.append(0) 1347 1348 return vector 1349 1350 def stats_memory_vector(self, limit=None): 1351 return self._vector_helper("memoryPercent", limit) 1352 def host_cpu_time_vector(self, limit=None): 1353 return self._vector_helper("cpuHostPercent", limit) 1354 1355 def stats_memory(self): 1356 return self._get_record_helper("memory") 1357 def host_cpu_time_percentage(self): 1358 return self._get_record_helper("cpuHostPercent") 1359 def guest_cpu_time_percentage(self): 1360 return self.host_cpu_time_percentage() 1361 def network_traffic_rate(self): 1362 return (self._get_record_helper("netRxRate") + 1363 self._get_record_helper("netTxRate")) 1364 def disk_io_rate(self): 1365 return (self._get_record_helper("diskRdRate") + 1366 self._get_record_helper("diskWrRate")) 1367 1368 def network_traffic_max_rate(self): 1369 return self._get_record_helper("netMaxRate") 1370 def disk_io_max_rate(self): 1371 return self._get_record_helper("diskMaxRate") 1372 1373 1374 ########################### 1375 # Per-conn config helpers # 1376 ########################### 1377 1378 def get_autoconnect(self): 1379 return self.config.get_conn_autoconnect(self.get_uri()) 1380 def set_autoconnect(self, val): 1381 self.config.set_conn_autoconnect(self.get_uri(), val) 1382 1383 def set_config_pretty_name(self, value): 1384 cfgname = self._get_config_pretty_name() 1385 if value == cfgname: 1386 return # pragma: no cover 1387 if not cfgname and value == self.get_pretty_desc(): 1388 # Don't encode the default connection value into gconf right 1389 # away, require the user to edit it first 1390 return 1391 self.config.set_perconn(self.get_uri(), "/pretty-name", value) 1392 def _get_config_pretty_name(self): 1393 return self.config.get_perconn(self.get_uri(), "/pretty-name") 1394 def _on_config_pretty_name_changed(self, *args, **kwargs): 1395 return self.config.listen_perconn(self.get_uri(), "/pretty-name", 1396 *args, **kwargs) 1397 def _config_pretty_name_changed_cb(self): 1398 self.emit("state-changed") 1399 1400 def set_details_window_size(self, w, h): 1401 self.config.set_perconn(self.get_uri(), "/window-size", (w, h)) 1402 def get_details_window_size(self): 1403 ret = self.config.get_perconn(self.get_uri(), "/window-size") 1404 return ret 1405