1#
2# List of OS Specific data
3#
4# Copyright 2006-2008, 2013-2014 Red Hat, Inc.
5#
6# This work is licensed under the GNU GPLv2 or later.
7# See the COPYING file in the top-level directory.
8
9import datetime
10import os
11import re
12
13from gi.repository import Libosinfo
14
15from . import xmlutil
16from .logger import log
17
18
19def _media_create_from_location(location):
20    if not hasattr(Libosinfo.Media, "create_from_location_with_flags"):
21        return Libosinfo.Media.create_from_location(  # pragma: no cover
22                location, None)
23
24    # We prefer this API, because by default it will not
25    # reject non-bootable media, like debian s390x
26    # pylint: disable=no-member
27    return Libosinfo.Media.create_from_location_with_flags(location, None, 0)
28
29
30###################
31# Sorting helpers #
32###################
33
34def _sortby(osobj):
35    """
36    Combines distro+version to make a more sort friendly string. Examples
37
38    fedora25    -> fedora-0025000000000000
39    ubuntu17.04 -> ubuntu-0017000400000000
40    win2k8r2    -> win-0006000100000000
41    """
42    if osobj.is_generic():
43        # Sort generic at the end of the list
44        return "zzzzzz-000000000000"
45
46    version = osobj.version
47    try:
48        t = version.split(".")
49        t = t[:min(4, len(t))] + [0] * (4 - min(4, len(t)))
50        new_version = ""
51        for n in t:
52            new_version = new_version + ("%.4i" % int(n))
53        version = new_version
54    except Exception:
55        pass
56
57    return "%s-%s" % (osobj.distro, version)
58
59
60def _sort(tosort):
61    sortby_mappings = {}
62    distro_mappings = {}
63    retlist = []
64
65    for key, osinfo in tosort.items():
66        # Libosinfo has some duplicate version numbers here, so append .1
67        # if there's a collision
68        sortby = _sortby(osinfo)
69        while sortby_mappings.get(sortby):
70            sortby = sortby + ".1"
71        sortby_mappings[sortby] = key
72
73        # Group by distro first, so debian is clumped together, fedora, etc.
74        distro = osinfo.distro
75        if osinfo.is_generic():
76            distro = "zzzzzz"
77        if distro not in distro_mappings:
78            distro_mappings[distro] = []
79        distro_mappings[distro].append(sortby)
80
81    # We want returned lists to be sorted descending by 'distro', so we get
82    # debian5, debian4, fedora14, fedora13
83    #   rather than
84    # debian4, debian5, fedora13, fedora14
85    for distro_list in list(distro_mappings.values()):
86        distro_list.sort()
87        distro_list.reverse()
88
89    sorted_distro_list = list(distro_mappings.keys())
90    sorted_distro_list.sort()
91
92    # Build the final list of sorted os objects
93    for distro in sorted_distro_list:
94        distro_list = distro_mappings[distro]
95        for key in distro_list:
96            orig_key = sortby_mappings[key]
97            retlist.append(tosort[orig_key])
98
99    return retlist
100
101
102class _OsinfoIter:
103    """
104    Helper to turn osinfo style get_length/get_nth lists into python
105    iterables
106    """
107    def __init__(self, listobj):
108        self.current = 0
109        self.listobj = listobj
110        self.high = -1
111        if self.listobj:
112            self.high = self.listobj.get_length() - 1
113
114    def __iter__(self):
115        return self
116    def __next__(self):
117        if self.current > self.high:
118            raise StopIteration
119        ret = self.listobj.get_nth(self.current)
120        self.current += 1
121        return ret
122
123
124class _OSDB(object):
125    """
126    Entry point for the public API
127    """
128    def __init__(self):
129        self.__os_loader = None
130        self.__all_variants = None
131
132    # This is only for back compatibility with pre-libosinfo support.
133    # This should never change.
134    _aliases = {
135        "altlinux": "altlinux1.0",
136        "debianetch": "debian4",
137        "debianlenny": "debian5",
138        "debiansqueeze": "debian6",
139        "debianwheezy": "debian7",
140        "freebsd10": "freebsd10.0",
141        "freebsd6": "freebsd6.0",
142        "freebsd7": "freebsd7.0",
143        "freebsd8": "freebsd8.0",
144        "freebsd9": "freebsd9.0",
145        "mandriva2009": "mandriva2009.0",
146        "mandriva2010": "mandriva2010.0",
147        "mbs1": "mbs1.0",
148        "msdos": "msdos6.22",
149        "openbsd4": "openbsd4.2",
150        "opensolaris": "opensolaris2009.06",
151        "opensuse11": "opensuse11.4",
152        "opensuse12": "opensuse12.3",
153        "rhel4": "rhel4.0",
154        "rhel5": "rhel5.0",
155        "rhel6": "rhel6.0",
156        "rhel7": "rhel7.0",
157        "ubuntuhardy": "ubuntu8.04",
158        "ubuntuintrepid": "ubuntu8.10",
159        "ubuntujaunty": "ubuntu9.04",
160        "ubuntukarmic": "ubuntu9.10",
161        "ubuntulucid": "ubuntu10.04",
162        "ubuntumaverick": "ubuntu10.10",
163        "ubuntunatty": "ubuntu11.04",
164        "ubuntuoneiric": "ubuntu11.10",
165        "ubuntuprecise": "ubuntu12.04",
166        "ubuntuquantal": "ubuntu12.10",
167        "ubunturaring": "ubuntu13.04",
168        "ubuntusaucy": "ubuntu13.10",
169        "virtio26": "fedora10",
170        "vista": "winvista",
171        "winxp64": "winxp",
172
173        # Old --os-type values
174        "linux": "generic",
175        "windows": "winxp",
176        "solaris": "solaris10",
177        "unix": "freebsd9.0",
178        "other": "generic",
179    }
180
181
182    #################
183    # Internal APIs #
184    #################
185
186    def _make_default_variants(self, allvariants):
187        # Add our custom generic variant
188        o = Libosinfo.Os()
189        o.set_param("short-id", "generic")
190        o.set_param("name", _("Generic OS"))
191        v = _OsVariant(o)
192        allvariants[v.name] = v
193
194    @property
195    def _os_loader(self):
196        if not self.__os_loader:
197            loader = Libosinfo.Loader()
198            loader.process_default_path()
199
200            self.__os_loader = loader
201        return self.__os_loader
202
203    @property
204    def _all_variants(self):
205        if not self.__all_variants:
206            loader = self._os_loader
207            allvariants = {}
208            db = loader.get_db()
209            oslist = db.get_os_list()
210            for o in _OsinfoIter(oslist):
211                osi = _OsVariant(o)
212                for name in osi.get_short_ids():
213                    allvariants[name] = osi
214
215            self._make_default_variants(allvariants)
216            self.__all_variants = allvariants
217        return self.__all_variants
218
219
220    ###############
221    # Public APIs #
222    ###############
223
224    def lookup_os_by_full_id(self, full_id, raise_error=False):
225        for osobj in self._all_variants.values():
226            if osobj.full_id == full_id:
227                return osobj
228        if raise_error:
229            raise ValueError(_("Unknown libosinfo ID '%s'") % full_id)
230
231    def lookup_os(self, key, raise_error=False):
232        if key not in self._all_variants and key in self._aliases:
233            alias = self._aliases[key]
234            # Added 2018-10-02. Maybe remove aliases in a year
235            msg = (_("OS name '%(oldname)s' is deprecated, using '%(newname)s' "
236                    "instead. This alias will be removed in the future.") %
237                    {"oldname": key, "newname": alias})
238            log.warning(msg)
239            key = alias
240
241        ret = self._all_variants.get(key)
242        if ret is None and raise_error:
243            raise ValueError(_("Unknown OS name '%s'. "
244                    "See `osinfo-query os` for valid values.") % key)
245        return ret
246
247    def guess_os_by_iso(self, location):
248        try:
249            media = _media_create_from_location(location)
250        except Exception as e:
251            log.debug("Error creating libosinfo media object: %s", str(e))
252            return None
253
254        if not self._os_loader.get_db().identify_media(media):
255            return None  # pragma: no cover
256        return media.get_os().get_short_id(), _OsMedia(media)
257
258    def guess_os_by_tree(self, location):
259        if location.startswith("/"):
260            location = "file://" + location
261
262        if xmlutil.in_testsuite() and not location.startswith("file:"):
263            # We have mock network tests, but we don't want to pass the
264            # fake URL to libosinfo because it slows down the testcase
265            return None
266
267        try:
268            tree = Libosinfo.Tree.create_from_location(location, None)
269        except Exception as e:
270            log.debug("Error creating libosinfo tree object for "
271                "location=%s : %s", location, str(e))
272            return None
273
274        db = self._os_loader.get_db()
275        if hasattr(db, "identify_tree"):
276            # osinfo_db_identify_tree is part of libosinfo 1.6.0
277            if not db.identify_tree(tree):
278                return None  # pragma: no cover
279            return tree.get_os().get_short_id(), _OsTree(tree)
280        else:  # pragma: no cover
281            osobj, treeobj = self._os_loader.get_db().guess_os_from_tree(tree)
282            if not osobj:
283                return None  # pragma: no cover
284            return osobj.get_short_id(), _OsTree(treeobj)
285
286    def list_os(self):
287        """
288        List all OSes in the DB
289        """
290        sortmap = {}
291
292        for osobj in self._all_variants.values():
293            sortmap[osobj.name] = osobj
294
295        return _sort(sortmap)
296
297
298OSDB = _OSDB()
299
300
301#####################
302# OsResources class #
303#####################
304
305class _OsResources:
306    def __init__(self, minimum, recommended):
307        self._minimum = self._convert_to_dict(minimum)
308        self._recommended = self._convert_to_dict(recommended)
309
310    def _convert_to_dict(self, resources):
311        """
312        Convert an OsResources object to a dictionary for easier
313        lookups. Layout is: {arch: {strkey: value}}
314        """
315        ret = {}
316        for r in _OsinfoIter(resources):
317            vals = {}
318            vals["ram"] = r.get_ram()
319            vals["n-cpus"] = r.get_n_cpus()
320            vals["storage"] = r.get_storage()
321            ret[r.get_architecture()] = vals
322        return ret
323
324    def _get_key(self, resources, key, arch):
325        for checkarch in [arch, "all"]:
326            val = resources.get(checkarch, {}).get(key, -1)
327            if val != -1:
328                return val
329
330    def _get_minimum_key(self, key, arch):
331        val = self._get_key(self._minimum, key, arch)
332        if val and val > 0:
333            return val
334
335    def _get_recommended_key(self, key, arch):
336        val = self._get_key(self._recommended, key, arch)
337        if val and val > 0:
338            return val
339        # If we are looking for a recommended value, but the OS
340        # DB only has minimum resources tracked, double the minimum
341        # value as an approximation at a 'recommended' value
342        val = self._get_minimum_key(key, arch)
343        if val:
344            log.debug("No recommended value found for key='%s', "
345                    "using minimum=%s * 2", key, val)
346            return val * 2
347        return None
348
349    def get_minimum_ram(self, arch):
350        return self._get_minimum_key("ram", arch)
351
352    def get_recommended_ram(self, arch):
353        return self._get_recommended_key("ram", arch)
354
355    def get_recommended_ncpus(self, arch):
356        return self._get_recommended_key("n-cpus", arch)
357
358    def get_recommended_storage(self, arch):
359        return self._get_recommended_key("storage", arch)
360
361
362#####################
363# OsVariant classes #
364#####################
365
366class _OsVariant(object):
367    def __init__(self, o):
368        self._os = o
369
370        self._short_ids = [self._os.get_short_id()]
371        if hasattr(self._os, "get_short_id_list"):
372            self._short_ids = self._os.get_short_id_list()
373        self.name = self._short_ids[0]
374
375        self._family = self._os.get_family()
376        self.full_id = self._os.get_id()
377        self.label = self._os.get_name()
378        self.codename = self._os.get_codename() or ""
379        self.distro = self._os.get_distro() or ""
380        self.version = self._os.get_version()
381
382        self.eol = self._get_eol()
383
384    def __repr__(self):
385        return "<%s name=%s>" % (self.__class__.__name__, self.name)
386
387
388    ########################
389    # Internal helper APIs #
390    ########################
391
392    def _is_related_to(self, related_os_list, osobj=None,
393            check_derives=True, check_upgrades=True, check_clones=True):
394        osobj = osobj or self._os
395        if osobj.get_short_id() in related_os_list:
396            return True
397
398        check_list = []
399        def _extend(newl):
400            for obj in newl:
401                if obj not in check_list:
402                    check_list.append(obj)
403
404        if check_derives:
405            _extend(osobj.get_related(
406                Libosinfo.ProductRelationship.DERIVES_FROM).get_elements())
407        if check_clones:
408            _extend(osobj.get_related(
409                Libosinfo.ProductRelationship.CLONES).get_elements())
410        if check_upgrades:
411            _extend(osobj.get_related(
412                Libosinfo.ProductRelationship.UPGRADES).get_elements())
413
414        for checkobj in check_list:
415            if (checkobj.get_short_id() in related_os_list or
416                self._is_related_to(related_os_list, osobj=checkobj,
417                    check_upgrades=check_upgrades,
418                    check_derives=check_derives,
419                    check_clones=check_clones)):
420                return True
421
422        return False
423
424    def _get_all_devices(self):
425        return list(_OsinfoIter(self._os.get_all_devices()))
426
427    def _device_filter(self, devids=None, cls=None, extra_devs=None):
428        ret = []
429        devids = devids or []
430        for dev in self._get_all_devices():
431            if devids and dev.get_id() not in devids:
432                continue
433            if cls and not re.match(cls, dev.get_class()):
434                continue
435            ret.append(dev.get_name())
436
437        extra_devs = extra_devs or []
438        for dev in extra_devs:
439            if dev.get_id() not in devids:
440                continue
441            ret.append(dev.get_name())
442
443        return ret
444
445
446    ###############
447    # Cached APIs #
448    ###############
449
450    def _get_eol(self):
451        eol = self._os.get_eol_date()
452        rel = self._os.get_release_date()
453
454        # We can use os.get_release_status() & osinfo.ReleaseStatus.ROLLING
455        # if we require libosinfo >= 1.4.0.
456        release_status = self._os.get_param_value(
457                Libosinfo.OS_PROP_RELEASE_STATUS) or None
458
459        def _glib_to_datetime(glibdate):
460            date = "%s-%s" % (glibdate.get_year(), glibdate.get_day_of_year())
461            return datetime.datetime.strptime(date, "%Y-%j")
462
463        now = datetime.datetime.today()
464        if eol is not None:
465            return now > _glib_to_datetime(eol)
466
467        # Rolling distributions are never EOL.
468        if release_status == "rolling":
469            return False
470
471        # If no EOL is present, assume EOL if release was > 10 years ago
472        if rel is not None:
473            rel5 = _glib_to_datetime(rel) + datetime.timedelta(days=365 * 10)
474            return now > rel5
475        return False
476
477
478    ###############
479    # Public APIs #
480    ###############
481
482    def get_handle(self):
483        return self._os
484
485    def is_generic(self):
486        return self.name == "generic"
487
488    def is_windows(self):
489        return self._family in ['win9x', 'winnt', 'win16']
490
491    def get_short_ids(self):
492        return self._short_ids[:]
493
494    def broken_uefi_with_hyperv(self):
495        # Some windows versions are broken with hyperv enlightenments + UEFI
496        # https://bugzilla.redhat.com/show_bug.cgi?id=1185253
497        # https://bugs.launchpad.net/qemu/+bug/1593605
498        return self.name in ("win2k8r2", "win7")
499
500    def get_clock(self):
501        if self.is_windows() or self._family in ['solaris']:
502            return "localtime"
503        return "utc"
504
505    def supported_netmodels(self):
506        return self._device_filter(cls="net")
507
508    def supports_usbtablet(self, extra_devs=None):
509        # If no OS specified, still default to tablet
510        if self.is_generic():
511            return True
512
513        devids = ["http://usb.org/usb/80ee/0021"]
514        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
515
516    def supports_virtiodisk(self, extra_devs=None):
517        # virtio-block and virtio1.0-block
518        devids = ["http://pcisig.com/pci/1af4/1001",
519                  "http://pcisig.com/pci/1af4/1042"]
520        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
521
522    def supports_virtioscsi(self, extra_devs=None):
523        # virtio-scsi and virtio1.0-scsi
524        devids = ["http://pcisig.com/pci/1af4/1004",
525                  "http://pcisig.com/pci/1af4/1048"]
526        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
527
528    def supports_virtionet(self, extra_devs=None):
529        # virtio-net and virtio1.0-net
530        devids = ["http://pcisig.com/pci/1af4/1000",
531                  "http://pcisig.com/pci/1af4/1041"]
532        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
533
534    def supports_virtiorng(self, extra_devs=None):
535        # virtio-rng and virtio1.0-rng
536        devids = ["http://pcisig.com/pci/1af4/1005",
537                  "http://pcisig.com/pci/1af4/1044"]
538        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
539
540    def supports_virtioballoon(self, extra_devs=None):
541        # virtio-balloon and virtio1.0-balloon
542        devids = ["http://pcisig.com/pci/1af4/1002",
543                  "http://pcisig.com/pci/1af4/1045"]
544        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
545
546    def supports_virtioserial(self, extra_devs=None):
547        devids = ["http://pcisig.com/pci/1af4/1003",
548                  "http://pcisig.com/pci/1af4/1043"]
549        if self._device_filter(devids=devids, extra_devs=extra_devs):
550            return True
551        # osinfo data was wrong for RHEL/centos here until Oct 2018
552        # Remove this hack after 6 months or so
553        return self._is_related_to("rhel6.0")
554
555    def supports_virtioinput(self, extra_devs=None):
556        # virtio1.0-input
557        devids = ["http://pcisig.com/pci/1af4/1052"]
558        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
559
560    def supports_usb3(self, extra_devs=None):
561        # qemu-xhci
562        devids = ["http://pcisig.com/pci/1b36/0004"]
563        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
564
565    def supports_virtio1(self, extra_devs=None):
566        # Use virtio1.0-net device as a proxy for virtio1.0 as a whole
567        devids = ["http://pcisig.com/pci/1af4/1041"]
568        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
569
570    def supports_chipset_q35(self, extra_devs=None):
571        # For our purposes, check for the union of q35 + virtio1.0 support
572        if (self.supports_virtionet(extra_devs=extra_devs) and
573            not self.supports_virtio1(extra_devs=extra_devs)):
574            return False
575        devids = ["http://qemu.org/chipset/x86/q35"]
576        return bool(self._device_filter(devids=devids, extra_devs=extra_devs))
577
578    def get_recommended_resources(self):
579        minimum = self._os.get_minimum_resources()
580        recommended = self._os.get_recommended_resources()
581        return _OsResources(minimum, recommended)
582
583    def get_network_install_required_ram(self, guest):
584        if hasattr(self._os, "get_network_install_resources"):
585            resources = self._os.get_network_install_resources()
586            for r in _OsinfoIter(resources):
587                arch = r.get_architecture()
588                if arch == guest.os.arch or arch == "all":
589                    return r.get_ram()
590
591    def get_kernel_url_arg(self):
592        """
593        Kernel argument name the distro's installer uses to reference
594        a network source, possibly bypassing some installer prompts
595        """
596        # SUSE distros
597        if self.distro in ["caasp", "sle", "sled", "sles", "opensuse"]:
598            return "install"
599
600        if self.distro not in ["centos", "rhel", "fedora"]:
601            return None
602
603        # Red Hat distros
604        try:
605            if re.match(r"[0-9]+-unknown", self.version):
606                version = float(self.version.split("-")[0])
607            else:
608                version = float(self.version)
609        except Exception:
610            # Can hit this for -rawhide or -unknown
611            version = 999
612
613        if self.distro in ["centos", "rhel"] and version < 7:
614            return "method"
615
616        if self.distro in ["fedora"] and version < 19:
617            return "method"
618
619        return "inst.repo"
620
621    def _get_generic_location(self, treelist, arch, profile):
622        if not hasattr(Libosinfo.Tree, "get_os_variants"):  # pragma: no cover
623            for tree in treelist:
624                if tree.get_architecture() == arch:
625                    return tree.get_url()
626            return None
627
628        fallback_tree = None
629        if profile == "jeos":
630            profile = "Server"
631        elif profile == "desktop":
632            profile = "Workstation"
633        elif not profile:
634            profile = "Everything"
635
636        for tree in treelist:
637            if tree.get_architecture() != arch:
638                continue
639
640            variant_list = tree.get_os_variants()
641            fallback_tree = tree
642            for variant in _OsinfoIter(variant_list):
643                if profile in variant.get_name():
644                    return tree.get_url()
645
646        if fallback_tree:
647            return fallback_tree.get_url()
648        return None
649
650    def get_location(self, arch, profile=None):
651        treelist = list(_OsinfoIter(self._os.get_tree_list()))
652
653        if not treelist:
654            raise RuntimeError(
655                _("OS '%s' does not have a URL location") % self.name)
656
657        # Some distros have more than one URL for a specific architecture,
658        # which is the case for Fedora and different variants (Server,
659        # Workstation). Later on, we'll have to differentiate that and return
660        # the right one. However, for now, let's just rely on returning the
661        # most generic tree possible.
662        location = self._get_generic_location(treelist, arch, profile)
663        if location:
664            return location
665
666        raise RuntimeError(
667            _("OS '%(osname)s' does not have a URL location "
668              "for the architecture '%(archname)s'") %
669            {"osname": self.name, "archname": arch})
670
671    def get_install_script_list(self):
672        return list(_OsinfoIter(self._os.get_install_script_list()))
673
674    def _get_installable_drivers(self, arch):
675        installable_drivers = []
676        device_drivers = list(_OsinfoIter(self._os.get_device_drivers()))
677        for device_driver in device_drivers:
678            if arch != "all" and device_driver.get_architecture() != arch:
679                continue
680
681            installable_drivers.append(device_driver)
682        return installable_drivers
683
684    def _get_pre_installable_drivers(self, arch):
685        installable_drivers = self._get_installable_drivers(arch)
686        pre_inst_drivers = []
687        for driver in installable_drivers:
688            if driver.get_pre_installable():
689                pre_inst_drivers.append(driver)
690        return pre_inst_drivers
691
692    def _get_drivers_location(self, drivers):
693        locations = []
694        for driver in drivers:
695            filenames = driver.get_files()
696            for filename in filenames:
697                location = os.path.join(driver.get_location(), filename)
698                locations.append(location)
699        return locations
700
701    def get_pre_installable_drivers_location(self, arch):
702        pre_inst_drivers = self._get_pre_installable_drivers(arch)
703
704        return self._get_drivers_location(pre_inst_drivers)
705
706    def get_pre_installable_devices(self, arch):
707        drivers = self._get_pre_installable_drivers(arch)
708        devices = []
709        for driver in drivers:
710            devices += list(_OsinfoIter(driver.get_devices()))
711        return devices
712
713    def supports_unattended_drivers(self, arch):
714        if self._get_pre_installable_drivers(arch):
715            return True
716        return False
717
718
719class _OsMedia(object):
720    def __init__(self, osinfo_media):
721        self._media = osinfo_media
722
723    def get_kernel_path(self):
724        return self._media.get_kernel_path()
725    def get_initrd_path(self):
726        return self._media.get_initrd_path()
727    def supports_installer_script(self):
728        return self._media.supports_installer_script()
729
730    def is_netinst(self):
731        variants = list(_OsinfoIter(self._media.get_os_variants()))
732        for variant in variants:
733            if "netinst" in variant.get_id():
734                return True
735        return False  # pragma: no cover
736
737    def get_install_script_list(self):
738        return list(_OsinfoIter(self._media.get_install_script_list()))
739
740    def get_osinfo_media(self):
741        return self._media
742
743
744class _OsTree(object):
745    def __init__(self, osinfo_tree):
746        self._tree = osinfo_tree
747
748    def get_osinfo_tree(self):
749        return self._tree
750