1# Copyright (C) 2006, 2013, 2014, 2015 Red Hat, Inc.
2# Copyright (C) 2006 Daniel P. Berrange <berrange@redhat.com>
3#
4# This work is licensed under the GNU GPLv2 or later.
5# See the COPYING file in the top-level directory.
6
7import os
8import threading
9import time
10import traceback
11
12import libvirt
13
14import virtinst
15from virtinst import log
16from virtinst import pollhelpers
17
18from .lib import connectauth
19from .lib import testmock
20from .baseclass import vmmGObject
21from .lib.libvirtenummap import LibvirtEnumMap
22from .object.domain import vmmDomain
23from .object.network import vmmNetwork
24from .object.nodedev import vmmNodeDevice
25from .object.storagepool import vmmStoragePool
26from .lib.statsmanager import vmmStatsManager
27
28
29class _ObjectList(vmmGObject):
30    """
31    Class that wraps our internal list of libvirt objects
32    """
33    # pylint: disable=not-context-manager
34    # pylint doesn't know that lock() has 'with' support
35    BLACKLIST_COUNT = 3
36
37    def __init__(self):
38        vmmGObject.__init__(self)
39
40        self._objects = []
41        self._denylist = {}
42        self._lock = threading.Lock()
43
44    def _cleanup(self):
45        self._objects = []
46
47    def _denylist_key(self, obj):
48        return str(obj.__class__) + obj.get_name()
49
50    def add_denylist(self, obj):
51        """
52        Add an object to the denylist. Basically a list of objects we
53        choose not to poll, because they threw an error at init time
54
55        :param obj: vmmLibvirtObject to denylist
56        :returns: number of added object to list
57        """
58        key = self._denylist_key(obj)
59        count = self._denylist.get(key, 0)
60        self._denylist[key] = count + 1
61        return self._denylist[key]
62
63    def remove_denylist(self, obj):
64        """
65        :param obj: vmmLibvirtObject to remove from denylist
66        :returns: True if object was denylisted or False otherwise.
67        """
68        key = self._denylist_key(obj)
69        return bool(self._denylist.pop(key, 0))
70
71    def in_denylist(self, obj):
72        """
73        If an object is in list only once don't consider it denylisted,
74        give it one more chance.
75
76        :param obj: vmmLibvirtObject to check
77        :returns: True if object is denylisted
78        """
79        key = self._denylist_key(obj)
80        return self._denylist.get(key, 0) >= _ObjectList.BLACKLIST_COUNT
81
82    def remove(self, obj):
83        """
84        Remove an object from the list.
85
86        :param obj: vmmLibvirtObject to remove
87        :returns: True if object removed, False if object was not found
88        """
89        with self._lock:
90            # Identity check is sufficient here, since we should never be
91            # asked to remove an object that wasn't at one point in the list.
92            if obj not in self._objects:
93                return self.remove_denylist(obj)
94
95            self._objects.remove(obj)
96            return True
97
98    def add(self, obj):
99        """
100        Add an object to the list.
101
102        :param obj: vmmLibvirtObject to add
103        :returns: True if object added, False if object already in the list
104        """
105        with self._lock:
106            # We don't look up based on identity here, to prevent tick()
107            # races from adding the same domain twice
108            #
109            # We don't use lookup_object here since we need to hold the
110            # lock the whole time to prevent a 'time of check' issue
111            for checkobj in self._objects:
112                if (checkobj.__class__ == obj.__class__ and
113                    checkobj.get_name() == obj.get_name()):
114                    return False
115
116            self._objects.append(obj)
117            return True
118
119    def get_objects_for_class(self, classobj):
120        """
121        Return all objects over the passed vmmLibvirtObject class
122        """
123        with self._lock:
124            return [o for o in self._objects if o.__class__ is classobj]
125
126    def lookup_object(self, classobj, name):
127        """
128        Lookup an object with the passed classobj + name
129        """
130        # Doesn't require locking, since get_objects_for_class covers us
131        for obj in self.get_objects_for_class(classobj):
132            if obj.get_name() == name:
133                return obj
134        return None
135
136    def all_objects(self):
137        with self._lock:
138            return self._objects[:]
139
140
141class vmmConnection(vmmGObject):
142    __gsignals__ = {
143        "vm-added": (vmmGObject.RUN_FIRST, None, [object]),
144        "vm-removed": (vmmGObject.RUN_FIRST, None, [object]),
145        "net-added": (vmmGObject.RUN_FIRST, None, [object]),
146        "net-removed": (vmmGObject.RUN_FIRST, None, [object]),
147        "pool-added": (vmmGObject.RUN_FIRST, None, [object]),
148        "pool-removed": (vmmGObject.RUN_FIRST, None, [object]),
149        "nodedev-added": (vmmGObject.RUN_FIRST, None, [object]),
150        "nodedev-removed": (vmmGObject.RUN_FIRST, None, [object]),
151        "resources-sampled": (vmmGObject.RUN_FIRST, None, []),
152        "state-changed": (vmmGObject.RUN_FIRST, None, []),
153        "open-completed": (vmmGObject.RUN_FIRST, None, [object]),
154    }
155
156    (_STATE_DISCONNECTED,
157     _STATE_CONNECTING,
158     _STATE_ACTIVE) = range(1, 4)
159
160    def __init__(self, uri):
161        self._uri = uri
162        vmmGObject.__init__(self)
163
164        self._state = self._STATE_DISCONNECTED
165        self._backend = virtinst.VirtinstConnection(self._uri)
166        self._closing = False
167
168        # Error strings are stored here if open() fails
169        self.connect_error = None
170
171        self._init_object_count = None
172        self._init_object_event = None
173
174        self.using_domain_events = False
175        self._domain_cb_ids = []
176        self.using_network_events = False
177        self._network_cb_ids = []
178        self.using_storage_pool_events = False
179        self._storage_pool_cb_ids = []
180        self.using_node_device_events = False
181        self._node_device_cb_ids = []
182
183        self._xml_flags = {}
184
185        self._objects = _ObjectList()
186        self.statsmanager = vmmStatsManager()
187
188        self._stats = []
189        self._hostinfo = None
190
191        self.add_gsettings_handle(
192            self._on_config_pretty_name_changed(
193                self._config_pretty_name_changed_cb))
194
195        self._init_virtconn()
196
197
198    @staticmethod
199    def pretty_hv(gtype, domtype):
200        """
201        Convert XML <domain type='foo'> and <os><type>bar</type>
202        into a more human relevant string.
203        """
204
205        gtype = gtype.lower()
206        domtype = domtype.lower()
207
208        label = domtype
209        if domtype == "xen":
210            if gtype == "xen":
211                label = "xen (paravirt)"
212            elif gtype == "hvm":
213                label = "xen (fullvirt)"
214        elif domtype == "test":
215            if gtype == "xen":
216                label = "test (xen)"
217            elif gtype == "hvm":
218                label = "test (hvm)"
219        elif domtype == "qemu":
220            label = "QEMU TCG"
221        elif domtype == "kvm":
222            label = "KVM"
223
224        return label
225
226    def __repr__(self):
227        # pylint: disable=arguments-differ
228        return "<%s uri=%s id=%s>" % (
229                self.__class__.__name__, self.get_uri(), hex(id(self)))
230
231
232    #################
233    # Init routines #
234    #################
235
236    def _wait_for_condition(self, compare_cb, timeout=3):
237        """
238        Wait for this object to emit the specified signal. Will not
239        block the mainloop.
240        """
241        from gi.repository import Gtk
242        is_main_thread = (threading.current_thread().name == "MainThread")
243        start_time = time.time()
244
245        while True:
246            cur_time = time.time()
247            if compare_cb():
248                return
249            if (cur_time - start_time) >= timeout:
250                return  # pragma: no cover
251
252            if is_main_thread:
253                if Gtk.events_pending():
254                    Gtk.main_iteration_do(False)
255                    continue
256
257            time.sleep(.1)
258
259    def _init_virtconn(self):
260        self._backend.cb_fetch_all_domains = (
261            lambda: [obj.get_xmlobj(refresh_if_nec=False)
262                     for obj in self.list_vms()])
263        self._backend.cb_fetch_all_pools = (
264            lambda: [obj.get_xmlobj(refresh_if_nec=False)
265                     for obj in self.list_pools()])
266        self._backend.cb_fetch_all_nodedevs = (
267            lambda: [obj.get_xmlobj(refresh_if_nec=False)
268                     for obj in self.list_nodedevs()])
269
270        def fetch_all_vols():
271            ret = []
272            for pool in self.list_pools():
273                for vol in pool.get_volumes():
274                    try:
275                        ret.append(vol.get_xmlobj(refresh_if_nec=False))
276                    except Exception as e:  # pragma: no cover
277                        log.debug("Fetching volume XML failed: %s", e)
278            return ret
279        self._backend.cb_fetch_all_vols = fetch_all_vols
280
281        def cache_new_pool(obj):
282            if not self.is_active():
283                return
284            name = obj.name()
285            self.schedule_priority_tick(pollpool=True)
286            def compare_cb():
287                return bool(self.get_pool_by_name(name))
288            self._wait_for_condition(compare_cb)
289        self._backend.cb_cache_new_pool = cache_new_pool
290
291
292    ########################
293    # General data getters #
294    ########################
295
296    def get_uri(self):
297        return self._uri
298    def get_backend(self):
299        return self._backend
300
301    def invalidate_caps(self):
302        return self._backend.invalidate_caps()
303    caps = property(lambda self: getattr(self, "_backend").caps)
304
305    def host_memory_size(self):
306        if not self._backend.is_open() or self._hostinfo is None:
307            return 0
308        return self._hostinfo[1] * 1024
309
310    def host_active_processor_count(self):
311        if not self._backend.is_open() or self._hostinfo is None:
312            return 0  # pragma: no cover
313        return self._hostinfo[2]
314
315
316    ##########################
317    # URI + hostname helpers #
318    ##########################
319
320    def libvirt_gethostname(self):
321        return self._backend.getHostname()
322
323    get_uri_hostname = property(lambda s:
324        getattr(s, "_backend").get_uri_hostname)
325    get_uri_username = property(lambda s:
326        getattr(s, "_backend").get_uri_username)
327    get_uri_transport = property(lambda s:
328        getattr(s, "_backend").get_uri_transport)
329    get_uri_port = property(lambda s: getattr(s, "_backend").get_uri_port)
330    get_driver = property(lambda s: getattr(s, "_backend").get_uri_driver)
331    is_container_only = property(
332            lambda s: getattr(s, "_backend").is_container_only)
333    is_lxc = property(lambda s: getattr(s, "_backend").is_lxc)
334    is_vz = property(lambda s: getattr(s, "_backend").is_vz)
335    is_xen = property(lambda s: getattr(s, "_backend").is_xen)
336    is_remote = property(lambda s: getattr(s, "_backend").is_remote)
337    is_qemu = property(lambda s: getattr(s, "_backend").is_qemu)
338    is_qemu_privileged = property(lambda s: getattr(s, "_backend").is_qemu_privileged)
339    is_qemu_unprivileged = property(lambda s:
340                               getattr(s, "_backend").is_qemu_unprivileged)
341    is_test = property(lambda s: getattr(s, "_backend").is_test)
342    is_unprivileged = property(lambda s: getattr(s, "_backend").is_unprivileged)
343
344
345    def get_cache_dir(self):
346        uri = self.get_uri().replace("/", "_")
347        ret = os.path.join(self._backend.get_app_cache_dir(), uri)
348        if not os.path.exists(ret):
349            os.makedirs(ret, 0o755)  # pragma: no cover
350        return ret
351
352    def get_default_storage_format(self):
353        raw = self.config.get_default_storage_format(raw=True)
354        if raw != "default":
355            return raw  # pragma: no cover
356
357        fmt = self.config.get_default_storage_format()
358        if fmt != "qcow2":
359            return fmt  # pragma: no cover
360
361        if self.support.conn_default_qcow2():
362            return fmt
363        return None  # pragma: no cover
364
365
366    ####################################
367    # Connection pretty print routines #
368    ####################################
369
370    def get_pretty_desc(self):
371        """
372        Return a pretty label for use in the manager view, and various
373        connection lists.
374        """
375        if self._get_config_pretty_name():
376            return self._get_config_pretty_name()
377
378        pretty_map = {
379            "esx":        "ESX",
380            "lxc":        "LXC",
381            "openvz":     "OpenVZ",
382            "qemu":       "QEMU/KVM",
383            "vbox":       "Virtualbox",
384            "vmware":     "VMWare",
385            "xen":        "Xen",
386        }
387
388        hv = pretty_map.get(self.get_driver(), self.get_driver())
389        hostname = self.get_uri_hostname()
390        path = self.get_backend().get_uri_path()
391
392        ret = hv
393
394        if path == "/session":
395            ret += " " + _("User session")
396        elif (path and path != "/system" and os.path.basename(path)):
397            # Used by test URIs to report what XML file they are using
398            ret += " %s" % os.path.basename(path)
399
400        if hostname:
401            ret += ": %s" % hostname
402
403        return ret
404
405
406    #######################
407    # API support helpers #
408    #######################
409
410    @property
411    def support(self):
412        return self._backend.support
413
414    def _get_flags_helper(self, obj, key, check_func):
415        ignore = obj
416        flags_dict = self._xml_flags.get(key)
417
418        if flags_dict is None:
419            # Flags already set
420            inact, act = check_func()
421            flags_dict = {}
422            flags_dict["active"] = act
423            flags_dict["inactive"] = inact
424
425            self._xml_flags[key] = flags_dict
426
427        active_flags   = flags_dict["active"]
428        inactive_flags = flags_dict["inactive"]
429
430        return (inactive_flags, active_flags)
431
432    def get_dom_flags(self, vm):
433        key = "domain"
434
435        def check_func():
436            act   = 0
437            inact = 0
438
439            if self.support.domain_xml_inactive(vm):
440                inact = libvirt.VIR_DOMAIN_XML_INACTIVE
441            else:  # pragma: no cover
442                log.debug("Domain XML inactive flag not supported.")
443
444            if self.support.domain_xml_secure(vm):
445                inact |= libvirt.VIR_DOMAIN_XML_SECURE
446                act = libvirt.VIR_DOMAIN_XML_SECURE
447            else:  # pragma: no cover
448                log.debug("Domain XML secure flag not supported.")
449
450            return inact, act
451
452        return self._get_flags_helper(vm, key, check_func)
453
454    def get_default_pool(self):
455        poolxml = virtinst.StoragePool.lookup_default_pool(self.get_backend())
456        if poolxml:
457            for p in self.list_pools():
458                if p.get_name() == poolxml.name:
459                    return p
460        return None
461
462    def get_vol_by_path(self, path):
463        for pool in self.list_pools():
464            for vol in pool.get_volumes():
465                try:
466                    if vol.get_target_path() == path:
467                        return vol
468                except Exception as e:  # pragma: no cover
469                    # Errors can happen if the volume disappeared, bug 1092739
470                    log.debug("Error looking up volume from path=%s: %s",
471                        path, e)
472        return None
473
474
475    ###################################
476    # Connection state getter/setters #
477    ###################################
478
479    def _change_state(self, newstate):
480        if self._state != newstate:
481            self._state = newstate
482            log.debug("conn=%s changed to state=%s",
483                self.get_uri(), self.get_state_text())
484            self.emit("state-changed")
485
486    def is_active(self):
487        return self._state == self._STATE_ACTIVE
488    def is_disconnected(self):
489        return self._state == self._STATE_DISCONNECTED
490    def is_connecting(self):
491        return self._state == self._STATE_CONNECTING
492
493    def get_state_text(self):
494        if self.is_disconnected():
495            return _("Disconnected")
496        elif self.is_connecting():
497            return _("Connecting")
498        return _("Active")
499
500
501    #################################
502    # Libvirt object lookup methods #
503    #################################
504
505    def get_vm_by_name(self, name):
506        return self._objects.lookup_object(vmmDomain, name)
507    def list_vms(self):
508        return self._objects.get_objects_for_class(vmmDomain)
509
510    def get_net_by_name(self, name):
511        return self._objects.lookup_object(vmmNetwork, name)
512    def list_nets(self):
513        return self._objects.get_objects_for_class(vmmNetwork)
514
515    def get_pool_by_name(self, name):
516        return self._objects.lookup_object(vmmStoragePool, name)
517    def list_pools(self):
518        return self._objects.get_objects_for_class(vmmStoragePool)
519
520    def get_nodedev_by_name(self, name):
521        return self._objects.lookup_object(vmmNodeDevice, name)
522    def list_nodedevs(self):
523        return self._objects.get_objects_for_class(vmmNodeDevice)
524
525
526    ############################
527    # nodedev helper functions #
528    ############################
529
530    def filter_nodedevs(self, devtype):
531        retdevs = []
532        for dev in self.list_nodedevs():
533            try:
534                xmlobj = dev.get_xmlobj()
535            except libvirt.libvirtError as e:  # pragma: no cover
536                # Libvirt nodedev XML fetching can be busted
537                # https://bugzilla.redhat.com/show_bug.cgi?id=1225771
538                if e.get_error_code() != libvirt.VIR_ERR_NO_NODE_DEVICE:
539                    log.debug("Error fetching nodedev XML", exc_info=True)
540                continue
541
542            if devtype and xmlobj.device_type != devtype:
543                continue
544
545            retdevs.append(dev)
546        return retdevs
547
548
549    ###################################
550    # Libvirt object creation methods #
551    ###################################
552
553    def define_domain(self, xml):
554        return self._backend.defineXML(xml)
555    def define_network(self, xml):
556        return self._backend.networkDefineXML(xml)
557    def define_pool(self, xml):
558        return self._backend.storagePoolDefineXML(xml, 0)
559
560    def rename_object(self, obj, origxml, newxml):
561        if obj.is_domain():
562            define_cb = self.define_domain
563        elif obj.is_pool():
564            define_cb = self.define_pool
565        elif obj.is_network():
566            define_cb = self.define_network
567        else:
568            raise virtinst.xmlutil.DevError("rename_object "
569                "helper doesn't support object class %s" % obj.__class__)
570
571        # Undefine the original object
572        obj.delete(force=False)
573
574        newobj = None
575        try:
576            # Redefine new domain
577            newobj = define_cb(newxml)
578        except Exception as renameerr:
579            try:
580                log.debug("Error defining new name %s XML",
581                    obj.class_name(), exc_info=True)
582                newobj = define_cb(origxml)
583            except Exception as fixerr:  # pragma: no cover
584                log.debug("Failed to redefine original %s!",
585                    obj.class_name(), exc_info=True)
586                msg = _("%(object)s rename failed. Attempting to recover also "
587                        "failed.\n"
588                        "\n"
589                        "Original error: %(origerror)s\n"
590                        "\n"
591                        "Recover error: %(recovererror)s") % {
592                            "object": obj.class_name(),
593                            "origerror": str(renameerr),
594                            "recovererror": str(fixerr),
595                        }
596                raise RuntimeError(msg) from None
597            raise
598        finally:
599            if newobj:
600                # Reinsert handle into new obj
601                obj.change_name_backend(newobj)
602
603
604    #########################
605    # Domain event handling #
606    #########################
607
608    # Our strategy here isn't the most efficient: since we need to keep the
609    # poll helpers around for compat with old libvirt, switching to a fully
610    # event driven setup is hard, so we end up doing more polling than
611    # necessary on most events.
612
613    def _domain_xml_misc_event(self, conn, domain, *args):
614        # Just trigger a domain XML refresh for hotplug type events
615        ignore = conn
616        args = list(args)
617        eventstr = args.pop(-1)
618
619        name = domain.name()
620        log.debug("domain xmlmisc event: domain=%s event=%s args=%s",
621                name, eventstr, args)
622        obj = self.get_vm_by_name(name)
623        if obj:
624            self.idle_add(obj.recache_from_event_loop)
625
626    def _domain_lifecycle_event(self, conn, domain, state, reason, userdata):
627        ignore = conn
628        ignore = userdata
629
630        name = domain.name()
631        log.debug("domain lifecycle event: domain=%s %s", name,
632                LibvirtEnumMap.domain_lifecycle_str(state, reason))
633
634        obj = self.get_vm_by_name(name)
635
636        if obj:
637            self.idle_add(obj.recache_from_event_loop)
638        else:
639            self.schedule_priority_tick(pollvm=True, force=True)
640
641    def _domain_agent_lifecycle_event(self, conn, domain, state, reason, userdata):
642        ignore = conn
643        ignore = userdata
644
645        name = domain.name()
646        log.debug("domain agent lifecycle event: domain=%s %s", name,
647                LibvirtEnumMap.domain_agent_lifecycle_str(state, reason))
648
649        obj = self.get_vm_by_name(name)
650
651        if obj:
652            self.idle_add(obj.recache_from_event_loop)
653        else:
654            self.schedule_priority_tick(pollvm=True, force=True)  # pragma: no cover
655
656    def _network_lifecycle_event(self, conn, network, state, reason, userdata):
657        ignore = conn
658        ignore = userdata
659
660        name = network.name()
661        log.debug("network lifecycle event: network=%s %s",
662                name, LibvirtEnumMap.network_lifecycle_str(state, reason))
663        obj = self.get_net_by_name(name)
664
665        if obj:
666            self.idle_add(obj.recache_from_event_loop)
667        else:
668            self.schedule_priority_tick(pollnet=True, force=True)
669
670    def _storage_pool_lifecycle_event(self, conn, pool,
671                                      state, reason, userdata):
672        ignore = conn
673        ignore = userdata
674
675        name = pool.name()
676        log.debug("storage pool lifecycle event: pool=%s %s",
677            name, LibvirtEnumMap.storage_lifecycle_str(state, reason))
678
679        obj = self.get_pool_by_name(name)
680
681        if obj:
682            self.idle_add(obj.recache_from_event_loop)
683        else:
684            self.schedule_priority_tick(pollpool=True, force=True)
685
686    def _storage_pool_refresh_event(self, conn, pool, userdata):
687        ignore = conn
688        ignore = userdata
689
690        name = pool.name()
691        log.debug("storage pool refresh event: pool=%s", name)
692
693        obj = self.get_pool_by_name(name)
694
695        if not obj:
696            return
697
698        self.idle_add(obj.refresh_pool_cache_from_event_loop)
699
700    def _node_device_lifecycle_event(self, conn, dev,
701                                     state, reason, userdata):
702        ignore = conn
703        ignore = userdata
704
705        name = dev.name()
706        log.debug("node device lifecycle event: nodedev=%s %s",
707            name, LibvirtEnumMap.nodedev_lifecycle_str(state, reason))
708
709        self.schedule_priority_tick(pollnodedev=True, force=True)
710
711    def _node_device_update_event(self, conn, dev, userdata):
712        ignore = conn
713        ignore = userdata
714
715        name = dev.name()
716        log.debug("node device update event: nodedev=%s", name)
717
718        obj = self.get_nodedev_by_name(name)
719
720        if obj:
721            self.idle_add(obj.recache_from_event_loop)
722
723    def _add_conn_events(self):
724        if not self.support.conn_working_xen_events():
725            return  # pragma: no cover
726
727        def _check_events_disabled():
728            if self.config.CLITestOptions.no_events:
729                raise RuntimeError("events disabled via cli")
730
731        try:
732            _check_events_disabled()
733
734            self._domain_cb_ids.append(
735                self.get_backend().domainEventRegisterAny(
736                None, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
737                self._domain_lifecycle_event, None))
738            self.using_domain_events = True
739            log.debug("Using domain events")
740        except Exception as e:
741            self.using_domain_events = False
742            log.debug("Error registering domain events: %s", e)
743
744        def _add_domain_xml_event(eventname, eventval, cb=None):
745            if not self.using_domain_events:
746                return
747            if not cb:
748                cb = self._domain_xml_misc_event
749            try:
750                eventid = getattr(libvirt, eventname, eventval)
751                self._domain_cb_ids.append(
752                    self.get_backend().domainEventRegisterAny(
753                    None, eventid, cb, eventname))
754
755                if (eventname == "VIR_DOMAIN_EVENT_ID_AGENT_LIFECYCLE" and
756                    self.config.CLITestOptions.fake_agent_event):
757                    testmock.schedule_fake_agent_event(self, cb)
758            except Exception as e:  # pragma: no cover
759                log.debug("Error registering %s event: %s",
760                    eventname, e)
761
762        _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_BALLOON_CHANGE", 13)
763        _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_TRAY_CHANGE", 10)
764        _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED", 15)
765        _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_DEVICE_ADDED", 19)
766        _add_domain_xml_event("VIR_DOMAIN_EVENT_ID_AGENT_LIFECYCLE", 18,
767                              self._domain_agent_lifecycle_event)
768
769        try:
770            _check_events_disabled()
771
772            eventid = getattr(libvirt, "VIR_NETWORK_EVENT_ID_LIFECYCLE", 0)
773            self._network_cb_ids.append(
774                self.get_backend().networkEventRegisterAny(
775                None, eventid, self._network_lifecycle_event, None))
776            self.using_network_events = True
777            log.debug("Using network events")
778        except Exception as e:
779            self.using_network_events = False
780            log.debug("Error registering network events: %s", e)
781
782        try:
783            _check_events_disabled()
784
785            eventid = getattr(libvirt,
786                              "VIR_STORAGE_POOL_EVENT_ID_LIFECYCLE", 0)
787            refreshid = getattr(libvirt,
788                              "VIR_STORAGE_POOL_EVENT_ID_REFRESH", 1)
789            self._storage_pool_cb_ids.append(
790                self.get_backend().storagePoolEventRegisterAny(
791                None, eventid, self._storage_pool_lifecycle_event, None))
792            self._storage_pool_cb_ids.append(
793                self.get_backend().storagePoolEventRegisterAny(
794                None, refreshid, self._storage_pool_refresh_event, None))
795            self.using_storage_pool_events = True
796            log.debug("Using storage pool events")
797        except Exception as e:
798            self.using_storage_pool_events = False
799            log.debug("Error registering storage pool events: %s", e)
800
801        try:
802            _check_events_disabled()
803
804            eventid = getattr(libvirt, "VIR_NODE_DEVICE_EVENT_ID_LIFECYCLE", 0)
805            updateid = getattr(libvirt, "VIR_NODE_DEVICE_EVENT_ID_UPDATE", 1)
806            lifecycle_cb = self._node_device_lifecycle_event
807            update_cb = self._node_device_update_event
808
809            self._node_device_cb_ids.append(
810                self.get_backend().nodeDeviceEventRegisterAny(
811                None, eventid, lifecycle_cb, None))
812            self._node_device_cb_ids.append(
813                self.get_backend().nodeDeviceEventRegisterAny(
814                None, updateid, update_cb, None))
815
816            if self.config.CLITestOptions.fake_nodedev_event:
817                testmock.schedule_fake_nodedev_event(self,
818                        lifecycle_cb, update_cb)
819
820            self.using_node_device_events = True
821            log.debug("Using node device events")
822        except Exception as e:
823            self.using_network_events = False
824            log.debug("Error registering node device events: %s", e)
825
826
827    ######################################
828    # Connection closing/opening methods #
829    ######################################
830
831    def _schedule_close(self):
832        self._closing = True
833        self.idle_add(self.close)
834
835    def close(self):
836        if not self.is_disconnected():
837            log.debug("conn.close() uri=%s", self.get_uri())
838        self._closing = True
839
840        try:
841            if self._backend.is_open():
842                for eid in self._domain_cb_ids:
843                    self._backend.domainEventDeregisterAny(eid)
844                for eid in self._network_cb_ids:
845                    self._backend.networkEventDeregisterAny(eid)
846                for eid in self._storage_pool_cb_ids:
847                    self._backend.storagePoolEventDeregisterAny(eid)
848                for eid in self._node_device_cb_ids:
849                    self._backend.nodeDeviceEventDeregisterAny(eid)
850        except Exception:  # pragma: no cover
851            log.debug("Failed to deregister events in conn cleanup",
852                exc_info=True)
853        finally:
854            self._domain_cb_ids = []
855            self._network_cb_ids = []
856            self._storage_pool_cb_ids = []
857            self._node_device_cb_ids = []
858
859        self._stats = []
860
861        if self._init_object_event:
862            self._init_object_event.clear()  # pragma: no cover
863
864        for obj in self._objects.all_objects():
865            self._objects.remove(obj)
866            try:
867                self._remove_object_signal(obj)
868                obj.cleanup()
869            except Exception as e:  # pragma: no cover
870                log.debug("Failed to cleanup %s: %s", obj, e)
871        self._objects.cleanup()
872        self._objects = _ObjectList()
873
874        closeret = self._backend.close()
875        if closeret == 1:
876            log.debug(  # pragma: no cover
877                    "LEAK: conn close() returned 1, "
878                    "meaning refs may have leaked.")
879
880        self._change_state(self._STATE_DISCONNECTED)
881        self._closing = False
882
883    def _cleanup(self):
884        self.close()
885
886        self._objects = None
887        self._backend.cb_fetch_all_domains = None
888        self._backend.cb_fetch_all_pools = None
889        self._backend.cb_fetch_all_nodedevs = None
890        self._backend.cb_fetch_all_vols = None
891        self._backend.cb_cache_new_pool = None
892
893        self.statsmanager.cleanup()
894        self.statsmanager = None
895
896    def open(self):
897        if not self.is_disconnected():
898            return  # pragma: no cover
899
900        self._change_state(self._STATE_CONNECTING)
901
902        log.debug("Scheduling background open thread for %s",
903                      self.get_uri())
904        self._start_thread(self._open_thread, "Connect %s" % self.get_uri())
905
906    def _do_open(self):
907        warnconsole = False
908        libvirt_error_code = None
909        libvirt_error_message = None
910        exc = None
911
912        try:
913            cb = connectauth.creds_dialog
914            data = self
915            if self.config.CLITestOptions.fake_openauth:
916                testmock.fake_openauth(self, cb, data)
917            if self.config.CLITestOptions.fake_session_error:
918                lerr = libvirt.libvirtError("fake session error")
919                lerr.err = [libvirt.VIR_ERR_AUTH_FAILED, None,
920                            "fake session error not authorized"]
921                raise lerr
922            self._backend.open(cb, data)
923            return True, None
924        except Exception as e:
925            exc = e
926            tb = "".join(traceback.format_exc())
927            if isinstance(exc, libvirt.libvirtError):
928                # pylint: disable=no-member
929                libvirt_error_code = exc.get_error_code()
930                libvirt_error_message = exc.get_error_message()
931
932        if (libvirt_error_code ==
933            getattr(libvirt, "VIR_ERR_AUTH_CANCELLED", None)):  # pragma: no cover
934            log.debug("User cancelled auth, not raising any error.")
935            return False, None
936
937        if (libvirt_error_code == libvirt.VIR_ERR_AUTH_FAILED and
938            "not authorized" in libvirt_error_message.lower()):
939            log.debug("Looks like we might have failed policykit "
940                          "auth. Checking to see if we have a valid "
941                          "console session")
942            if not self.is_remote():
943                warnconsole = bool(not connectauth.do_we_have_session())
944            if self.config.CLITestOptions.fake_session_error:
945                warnconsole = True
946
947        ConnectError = connectauth.connect_error(
948                self, str(exc), tb, warnconsole)
949        return False, ConnectError
950
951    def _populate_initial_state(self):
952        log.debug("libvirt version=%s", self._backend.local_libvirt_version())
953        log.debug("daemon version=%s", self._backend.daemon_version())
954        log.debug("conn version=%s", self._backend.conn_version())
955        log.debug("%s capabilities:\n%s", self.get_uri(), self.caps.get_xml())
956
957        if not self.support.conn_domain():  # pragma: no cover
958            raise RuntimeError("Connection does not support required "
959                    "domain listing APIs")
960
961        if not self.support.conn_storage():  # pragma: no cover
962            log.debug("Connection doesn't seem to support storage APIs.")
963        if not self.support.conn_network():  # pragma: no cover
964            log.debug("Connection doesn't seem to support network APIs.")
965        if not self.support.conn_nodedev():  # pragma: no cover
966            log.debug("Connection doesn't seem to support nodedev APIs.")
967
968        self._add_conn_events()
969
970        try:
971            self._backend.setKeepAlive(20, 1)
972        except Exception as e:
973            log.debug("Failed to setKeepAlive: %s", str(e))
974
975        # The initial tick will set up a threading event that will only
976        # trigger after all the polled libvirt objects are fully initialized.
977        # That way we only report the connection is open when everything is
978        # nicely setup for the rest of the app.
979
980        self._init_object_event = threading.Event()
981        self._init_object_count = 0
982
983        self.schedule_priority_tick(stats_update=True,
984            pollvm=True, pollnet=True,
985            pollpool=True, pollnodedev=True,
986            force=True, initial_poll=True)
987
988        self._init_object_event.wait()
989        self._init_object_event = None
990        self._init_object_count = None
991
992        # Try to create the default storage pool
993        # We need this after events setup so we can determine if the default
994        # pool already exists
995        try:
996            virtinst.StoragePool.build_default_pool(self.get_backend())
997        except Exception as e:  # pragma: no cover
998            log.debug("Building default pool failed: %s", str(e))
999
1000    def _open_thread(self):
1001        try:
1002            is_active, ConnectError = self._do_open()
1003            if is_active:
1004                self._populate_initial_state()
1005        except Exception as e:  # pragma: no cover
1006            is_active = False
1007            ConnectError = connectauth.connect_error(self, str(e),
1008                    "".join(traceback.format_exc()), False)
1009
1010        if is_active:
1011            self.idle_add(self._change_state, self._STATE_ACTIVE)
1012        else:
1013            self._schedule_close()
1014
1015        self.idle_emit("open-completed", ConnectError)
1016
1017
1018    #######################
1019    # Tick/Update methods #
1020    #######################
1021
1022    def _remove_object_signal(self, obj):
1023        if obj.is_domain():
1024            self.emit("vm-removed", obj)
1025        elif obj.is_network():
1026            self.emit("net-removed", obj)
1027        elif obj.is_pool():
1028            self.emit("pool-removed", obj)
1029        elif obj.is_nodedev():
1030            self.emit("nodedev-removed", obj)
1031
1032    def _gone_object_signals(self, gone_objects):
1033        """
1034        Responsible for signaling the UI for any updates. All possible UI
1035        updates need to go here to enable threading that doesn't block the
1036        app with long tick operations.
1037        """
1038        if not self._backend.is_open():
1039            return  # pragma: no cover
1040
1041        for obj in gone_objects:
1042            class_name = obj.class_name()
1043            name = obj.get_name()
1044
1045            if not self._objects.remove(obj):
1046                log.debug("Requested removal of %s=%s, but it's "
1047                    "not in our object list.", class_name, name)
1048                continue
1049
1050            log.debug("%s=%s removed", class_name, name)
1051            self._remove_object_signal(obj)
1052            obj.cleanup()
1053
1054    def _new_object_cb(self, obj, initialize_failed):
1055        if not self._backend.is_open():
1056            return  # pragma: no cover
1057
1058        try:
1059            class_name = obj.class_name()
1060
1061            if initialize_failed:
1062                log.debug("Blacklisting %s=%s", class_name, obj.get_name())
1063                count = self._objects.add_denylist(obj)
1064                log.debug("Object added in denylist, count=%d", count)
1065                return
1066
1067            self._objects.remove_denylist(obj)
1068            if not self._objects.add(obj):
1069                log.debug("New %s=%s requested, but it's already tracked.",
1070                    class_name, obj.get_name())
1071                obj.cleanup()
1072                return
1073
1074            if not obj.is_nodedev():
1075                # Skip nodedev logging since it's noisy and not interesting
1076                log.debug("%s=%s status=%s added", class_name,
1077                    obj.get_name(), obj.run_status())
1078            if obj.is_domain():
1079                self.emit("vm-added", obj)
1080            elif obj.is_network():
1081                self.emit("net-added", obj)
1082            elif obj.is_pool():
1083                self.emit("pool-added", obj)
1084            elif obj.is_nodedev():
1085                self.emit("nodedev-added", obj)
1086        finally:
1087            if self._init_object_event:
1088                self._init_object_count -= 1
1089                if self._init_object_count <= 0:
1090                    self._init_object_event.set()
1091
1092    def _poll(self, initial_poll,
1093            pollvm, pollnet, pollpool, pollnodedev):
1094        """
1095        Helper called from tick() to do necessary polling and return
1096        the relevant object lists
1097        """
1098        gone_objects = []
1099        preexisting_objects = []
1100
1101        def _process_objects(ptype):
1102            if ptype == "nets":
1103                dopoll = pollnet
1104                objs = self.list_nets()
1105                cls = vmmNetwork
1106                pollcb = pollhelpers.fetch_nets
1107            elif ptype == "pools":
1108                dopoll = pollpool
1109                objs = self.list_pools()
1110                cls = vmmStoragePool
1111                pollcb = pollhelpers.fetch_pools
1112            elif ptype == "nodedevs":
1113                dopoll = pollnodedev
1114                objs = self.list_nodedevs()
1115                cls = vmmNodeDevice
1116                pollcb = pollhelpers.fetch_nodedevs
1117            else:
1118                dopoll = pollvm
1119                objs = self.list_vms()
1120                cls = vmmDomain
1121                pollcb = pollhelpers.fetch_vms
1122
1123
1124            keymap = dict((o.get_name(), o) for o in objs)
1125            def cb(obj, name):
1126                return cls(self, obj, name)
1127            if dopoll:
1128                gone, new, master = pollcb(self._backend, keymap, cb)
1129            else:
1130                gone, new, master = [], [], list(keymap.values())
1131
1132            if initial_poll:
1133                self._init_object_count += len(new)
1134
1135            gone_objects.extend(gone)
1136            preexisting_objects.extend([o for o in master if o not in new])
1137            new = [n for n in new if not self._objects.in_denylist(n)]
1138            return new
1139
1140        new_vms = _process_objects("vms")
1141        new_nets = _process_objects("nets")
1142        new_pools = _process_objects("pools")
1143        new_nodedevs = _process_objects("nodedevs")
1144
1145        # Kick off one thread per object type to handle the initial
1146        # XML fetching. Going any more fine grained then this probably
1147        # won't be that useful due to libvirt's locking structure.
1148        #
1149        # Would prefer to start refreshing some objects before all polling
1150        # is complete, but we need init_object_count to be fully accurate
1151        # before we start initializing objects
1152
1153        if initial_poll and self._init_object_count == 0:
1154            # If the connection doesn't have any objects, new_object_cb
1155            # is never called and the event is never set, so let's do it here
1156            self._init_object_event.set()
1157
1158        for newlist in [new_vms, new_nets, new_pools, new_nodedevs]:
1159            if not newlist:
1160                continue
1161
1162            def cb(lst):
1163                for obj in lst:
1164                    obj.connect_once("initialized", self._new_object_cb)
1165                    obj.init_libvirt_state()
1166
1167            self._start_thread(cb,
1168                "refreshing xml for new %s" % newlist[0].class_name(),
1169                args=(newlist,))
1170
1171        return gone_objects, preexisting_objects
1172
1173    def _tick(self, stats_update=False,
1174             pollvm=False, pollnet=False,
1175             pollpool=False, pollnodedev=False,
1176             force=False, initial_poll=False):
1177        """
1178        main update function: polls for new objects, updates stats, ...
1179
1180        :param force: Perform the requested polling even if async events
1181            are in use.
1182        """
1183        if self._closing:
1184            return  # pragma: no cover
1185        if self.is_disconnected():
1186            return  # pragma: no cover
1187        if self.is_connecting() and not force:
1188            return  # pragma: no cover
1189
1190        # We need to set this before the event check, since stats polling
1191        # is independent of events
1192        if not pollvm:
1193            stats_update = False
1194
1195        if self.using_domain_events and not force:
1196            pollvm = False
1197        if self.using_network_events and not force:
1198            pollnet = False
1199        if self.using_storage_pool_events and not force:
1200            pollpool = False
1201        if self.using_node_device_events and not force:
1202            pollnodedev = False
1203
1204        self._hostinfo = self._backend.getInfo()
1205        if stats_update:
1206            self.statsmanager.cache_all_stats(self)
1207
1208        gone_objects, preexisting_objects = self._poll(
1209            initial_poll, pollvm, pollnet, pollpool, pollnodedev)
1210        self.idle_add(self._gone_object_signals, gone_objects)
1211
1212        # Only tick() pre-existing objects, since new objects will be
1213        # initialized asynchronously and tick() would be redundant
1214        for obj in preexisting_objects:
1215            try:
1216                if obj.reports_stats() and stats_update:
1217                    pass
1218                elif obj.is_domain() and not pollvm:
1219                    continue
1220                elif obj.is_network() and not pollnet:
1221                    continue
1222                elif obj.is_pool() and not pollpool:
1223                    continue
1224                elif obj.is_nodedev() and not pollnodedev:
1225                    continue
1226
1227                if self.config.CLITestOptions.conn_crash:
1228                    self._backend.close()
1229                    e = libvirt.libvirtError("fake error")
1230                    e.err = [libvirt.VIR_ERR_SYSTEM_ERROR]
1231                    raise e
1232
1233                obj.tick(stats_update=stats_update)
1234            except Exception as e:
1235                log.exception("Tick for %s failed", obj)
1236                if (isinstance(e, libvirt.libvirtError) and
1237                    (getattr(e, "get_error_code")() ==
1238                     libvirt.VIR_ERR_SYSTEM_ERROR)):
1239                    # Try a simple getInfo call to see if conn was dropped
1240                    self._backend.getInfo()
1241                    log.debug(  # pragma: no cover
1242                            "vm tick raised system error but "
1243                            "connection doesn't seem to have dropped. "
1244                            "Ignoring.")
1245
1246        if stats_update:
1247            self._recalculate_stats(
1248                [o for o in preexisting_objects if o.reports_stats()])
1249            self.idle_emit("resources-sampled")
1250
1251    def _recalculate_stats(self, vms):
1252        if not self._backend.is_open():
1253            return  # pragma: no cover
1254
1255        now = time.time()
1256        expected = self.config.get_stats_history_length()
1257        current = len(self._stats)
1258        if current > expected:
1259            del self._stats[expected:current]  # pragma: no cover
1260
1261        mem = 0
1262        cpuTime = 0
1263        rdRate = 0
1264        wrRate = 0
1265        rxRate = 0
1266        txRate = 0
1267        diskMaxRate = self.disk_io_max_rate() or 10.0
1268        netMaxRate = self.network_traffic_max_rate() or 10.0
1269
1270        for vm in vms:
1271            if not vm.is_active():
1272                continue
1273
1274            cpuTime += vm.cpu_time()
1275            mem += vm.stats_memory()
1276            rdRate += vm.disk_read_rate()
1277            wrRate += vm.disk_write_rate()
1278            rxRate += vm.network_rx_rate()
1279            txRate += vm.network_tx_rate()
1280
1281            netMaxRate = max(netMaxRate, vm.network_traffic_max_rate())
1282            diskMaxRate = max(diskMaxRate, vm.disk_io_max_rate())
1283
1284        pcentHostCpu = 0
1285        pcentMem = mem * 100.0 / self.host_memory_size()
1286
1287        if len(self._stats) > 0:
1288            prevTimestamp = self._stats[0]["timestamp"]
1289            host_cpus = self.host_active_processor_count()
1290
1291            pcentHostCpu = ((cpuTime) * 100.0 /
1292                            ((now - prevTimestamp) *
1293                             1000.0 * 1000.0 * 1000.0 * host_cpus))
1294
1295        pcentHostCpu = max(0.0, min(100.0, pcentHostCpu))
1296        pcentMem = max(0.0, min(100.0, pcentMem))
1297
1298        newStats = {
1299            "timestamp": now,
1300            "memory": mem,
1301            "memoryPercent": pcentMem,
1302            "cpuTime": cpuTime,
1303            "cpuHostPercent": pcentHostCpu,
1304            "diskRdRate": rdRate,
1305            "diskWrRate": wrRate,
1306            "netRxRate": rxRate,
1307            "netTxRate": txRate,
1308            "diskMaxRate": diskMaxRate,
1309            "netMaxRate": netMaxRate,
1310        }
1311
1312        self._stats.insert(0, newStats)
1313
1314
1315    def schedule_priority_tick(self, **kwargs):
1316        from .engine import vmmEngine
1317        vmmEngine.get_instance().schedule_priority_tick(self, kwargs)
1318
1319    def tick_from_engine(self, *args, **kwargs):
1320        try:
1321            self._tick(*args, **kwargs)
1322        except Exception:
1323            self._schedule_close()
1324            raise
1325
1326
1327    ########################
1328    # Stats getter methods #
1329    ########################
1330
1331    def _get_record_helper(self, record_name):
1332        if len(self._stats) == 0:
1333            return 0
1334        return self._stats[0][record_name]
1335
1336    def _vector_helper(self, record_name, limit, ceil=100.0):
1337        vector = []
1338        statslen = self.config.get_stats_history_length() + 1
1339        if limit is not None:
1340            statslen = min(statslen, limit)  # pragma: no cover
1341
1342        for i in range(statslen):
1343            if i < len(self._stats):
1344                vector.append(self._stats[i][record_name] / ceil)
1345            else:
1346                vector.append(0)
1347
1348        return vector
1349
1350    def stats_memory_vector(self, limit=None):
1351        return self._vector_helper("memoryPercent", limit)
1352    def host_cpu_time_vector(self, limit=None):
1353        return self._vector_helper("cpuHostPercent", limit)
1354
1355    def stats_memory(self):
1356        return self._get_record_helper("memory")
1357    def host_cpu_time_percentage(self):
1358        return self._get_record_helper("cpuHostPercent")
1359    def guest_cpu_time_percentage(self):
1360        return self.host_cpu_time_percentage()
1361    def network_traffic_rate(self):
1362        return (self._get_record_helper("netRxRate") +
1363                self._get_record_helper("netTxRate"))
1364    def disk_io_rate(self):
1365        return (self._get_record_helper("diskRdRate") +
1366                self._get_record_helper("diskWrRate"))
1367
1368    def network_traffic_max_rate(self):
1369        return self._get_record_helper("netMaxRate")
1370    def disk_io_max_rate(self):
1371        return self._get_record_helper("diskMaxRate")
1372
1373
1374    ###########################
1375    # Per-conn config helpers #
1376    ###########################
1377
1378    def get_autoconnect(self):
1379        return self.config.get_conn_autoconnect(self.get_uri())
1380    def set_autoconnect(self, val):
1381        self.config.set_conn_autoconnect(self.get_uri(), val)
1382
1383    def set_config_pretty_name(self, value):
1384        cfgname = self._get_config_pretty_name()
1385        if value == cfgname:
1386            return  # pragma: no cover
1387        if not cfgname and value == self.get_pretty_desc():
1388            # Don't encode the default connection value into gconf right
1389            # away, require the user to edit it first
1390            return
1391        self.config.set_perconn(self.get_uri(), "/pretty-name", value)
1392    def _get_config_pretty_name(self):
1393        return self.config.get_perconn(self.get_uri(), "/pretty-name")
1394    def _on_config_pretty_name_changed(self, *args, **kwargs):
1395        return self.config.listen_perconn(self.get_uri(), "/pretty-name",
1396            *args, **kwargs)
1397    def _config_pretty_name_changed_cb(self):
1398        self.emit("state-changed")
1399
1400    def set_details_window_size(self, w, h):
1401        self.config.set_perconn(self.get_uri(), "/window-size", (w, h))
1402    def get_details_window_size(self):
1403        ret = self.config.get_perconn(self.get_uri(), "/window-size")
1404        return ret
1405