1# 2# List of OS Specific data 3# 4# Copyright 2006-2008, 2013-2014 Red Hat, Inc. 5# 6# This work is licensed under the GNU GPLv2 or later. 7# See the COPYING file in the top-level directory. 8 9import datetime 10import os 11import re 12 13from gi.repository import Libosinfo 14 15from . import xmlutil 16from .logger import log 17 18 19def _media_create_from_location(location): 20 if not hasattr(Libosinfo.Media, "create_from_location_with_flags"): 21 return Libosinfo.Media.create_from_location( # pragma: no cover 22 location, None) 23 24 # We prefer this API, because by default it will not 25 # reject non-bootable media, like debian s390x 26 # pylint: disable=no-member 27 return Libosinfo.Media.create_from_location_with_flags(location, None, 0) 28 29 30################### 31# Sorting helpers # 32################### 33 34def _sortby(osobj): 35 """ 36 Combines distro+version to make a more sort friendly string. Examples 37 38 fedora25 -> fedora-0025000000000000 39 ubuntu17.04 -> ubuntu-0017000400000000 40 win2k8r2 -> win-0006000100000000 41 """ 42 if osobj.is_generic(): 43 # Sort generic at the end of the list 44 return "zzzzzz-000000000000" 45 46 version = osobj.version 47 try: 48 t = version.split(".") 49 t = t[:min(4, len(t))] + [0] * (4 - min(4, len(t))) 50 new_version = "" 51 for n in t: 52 new_version = new_version + ("%.4i" % int(n)) 53 version = new_version 54 except Exception: 55 pass 56 57 return "%s-%s" % (osobj.distro, version) 58 59 60def _sort(tosort): 61 sortby_mappings = {} 62 distro_mappings = {} 63 retlist = [] 64 65 for key, osinfo in tosort.items(): 66 # Libosinfo has some duplicate version numbers here, so append .1 67 # if there's a collision 68 sortby = _sortby(osinfo) 69 while sortby_mappings.get(sortby): 70 sortby = sortby + ".1" 71 sortby_mappings[sortby] = key 72 73 # Group by distro first, so debian is clumped together, fedora, etc. 74 distro = osinfo.distro 75 if osinfo.is_generic(): 76 distro = "zzzzzz" 77 if distro not in distro_mappings: 78 distro_mappings[distro] = [] 79 distro_mappings[distro].append(sortby) 80 81 # We want returned lists to be sorted descending by 'distro', so we get 82 # debian5, debian4, fedora14, fedora13 83 # rather than 84 # debian4, debian5, fedora13, fedora14 85 for distro_list in list(distro_mappings.values()): 86 distro_list.sort() 87 distro_list.reverse() 88 89 sorted_distro_list = list(distro_mappings.keys()) 90 sorted_distro_list.sort() 91 92 # Build the final list of sorted os objects 93 for distro in sorted_distro_list: 94 distro_list = distro_mappings[distro] 95 for key in distro_list: 96 orig_key = sortby_mappings[key] 97 retlist.append(tosort[orig_key]) 98 99 return retlist 100 101 102class _OsinfoIter: 103 """ 104 Helper to turn osinfo style get_length/get_nth lists into python 105 iterables 106 """ 107 def __init__(self, listobj): 108 self.current = 0 109 self.listobj = listobj 110 self.high = -1 111 if self.listobj: 112 self.high = self.listobj.get_length() - 1 113 114 def __iter__(self): 115 return self 116 def __next__(self): 117 if self.current > self.high: 118 raise StopIteration 119 ret = self.listobj.get_nth(self.current) 120 self.current += 1 121 return ret 122 123 124class _OSDB(object): 125 """ 126 Entry point for the public API 127 """ 128 def __init__(self): 129 self.__os_loader = None 130 self.__all_variants = None 131 132 # This is only for back compatibility with pre-libosinfo support. 133 # This should never change. 134 _aliases = { 135 "altlinux": "altlinux1.0", 136 "debianetch": "debian4", 137 "debianlenny": "debian5", 138 "debiansqueeze": "debian6", 139 "debianwheezy": "debian7", 140 "freebsd10": "freebsd10.0", 141 "freebsd6": "freebsd6.0", 142 "freebsd7": "freebsd7.0", 143 "freebsd8": "freebsd8.0", 144 "freebsd9": "freebsd9.0", 145 "mandriva2009": "mandriva2009.0", 146 "mandriva2010": "mandriva2010.0", 147 "mbs1": "mbs1.0", 148 "msdos": "msdos6.22", 149 "openbsd4": "openbsd4.2", 150 "opensolaris": "opensolaris2009.06", 151 "opensuse11": "opensuse11.4", 152 "opensuse12": "opensuse12.3", 153 "rhel4": "rhel4.0", 154 "rhel5": "rhel5.0", 155 "rhel6": "rhel6.0", 156 "rhel7": "rhel7.0", 157 "ubuntuhardy": "ubuntu8.04", 158 "ubuntuintrepid": "ubuntu8.10", 159 "ubuntujaunty": "ubuntu9.04", 160 "ubuntukarmic": "ubuntu9.10", 161 "ubuntulucid": "ubuntu10.04", 162 "ubuntumaverick": "ubuntu10.10", 163 "ubuntunatty": "ubuntu11.04", 164 "ubuntuoneiric": "ubuntu11.10", 165 "ubuntuprecise": "ubuntu12.04", 166 "ubuntuquantal": "ubuntu12.10", 167 "ubunturaring": "ubuntu13.04", 168 "ubuntusaucy": "ubuntu13.10", 169 "virtio26": "fedora10", 170 "vista": "winvista", 171 "winxp64": "winxp", 172 173 # Old --os-type values 174 "linux": "generic", 175 "windows": "winxp", 176 "solaris": "solaris10", 177 "unix": "freebsd9.0", 178 "other": "generic", 179 } 180 181 182 ################# 183 # Internal APIs # 184 ################# 185 186 def _make_default_variants(self, allvariants): 187 # Add our custom generic variant 188 o = Libosinfo.Os() 189 o.set_param("short-id", "generic") 190 o.set_param("name", _("Generic OS")) 191 v = _OsVariant(o) 192 allvariants[v.name] = v 193 194 @property 195 def _os_loader(self): 196 if not self.__os_loader: 197 loader = Libosinfo.Loader() 198 loader.process_default_path() 199 200 self.__os_loader = loader 201 return self.__os_loader 202 203 @property 204 def _all_variants(self): 205 if not self.__all_variants: 206 loader = self._os_loader 207 allvariants = {} 208 db = loader.get_db() 209 oslist = db.get_os_list() 210 for o in _OsinfoIter(oslist): 211 osi = _OsVariant(o) 212 for name in osi.get_short_ids(): 213 allvariants[name] = osi 214 215 self._make_default_variants(allvariants) 216 self.__all_variants = allvariants 217 return self.__all_variants 218 219 220 ############### 221 # Public APIs # 222 ############### 223 224 def lookup_os_by_full_id(self, full_id, raise_error=False): 225 for osobj in self._all_variants.values(): 226 if osobj.full_id == full_id: 227 return osobj 228 if raise_error: 229 raise ValueError(_("Unknown libosinfo ID '%s'") % full_id) 230 231 def lookup_os(self, key, raise_error=False): 232 if key not in self._all_variants and key in self._aliases: 233 alias = self._aliases[key] 234 # Added 2018-10-02. Maybe remove aliases in a year 235 msg = (_("OS name '%(oldname)s' is deprecated, using '%(newname)s' " 236 "instead. This alias will be removed in the future.") % 237 {"oldname": key, "newname": alias}) 238 log.warning(msg) 239 key = alias 240 241 ret = self._all_variants.get(key) 242 if ret is None and raise_error: 243 raise ValueError(_("Unknown OS name '%s'. " 244 "See `osinfo-query os` for valid values.") % key) 245 return ret 246 247 def guess_os_by_iso(self, location): 248 try: 249 media = _media_create_from_location(location) 250 except Exception as e: 251 log.debug("Error creating libosinfo media object: %s", str(e)) 252 return None 253 254 if not self._os_loader.get_db().identify_media(media): 255 return None # pragma: no cover 256 return media.get_os().get_short_id(), _OsMedia(media) 257 258 def guess_os_by_tree(self, location): 259 if location.startswith("/"): 260 location = "file://" + location 261 262 if xmlutil.in_testsuite() and not location.startswith("file:"): 263 # We have mock network tests, but we don't want to pass the 264 # fake URL to libosinfo because it slows down the testcase 265 return None 266 267 try: 268 tree = Libosinfo.Tree.create_from_location(location, None) 269 except Exception as e: 270 log.debug("Error creating libosinfo tree object for " 271 "location=%s : %s", location, str(e)) 272 return None 273 274 db = self._os_loader.get_db() 275 if hasattr(db, "identify_tree"): 276 # osinfo_db_identify_tree is part of libosinfo 1.6.0 277 if not db.identify_tree(tree): 278 return None # pragma: no cover 279 return tree.get_os().get_short_id(), _OsTree(tree) 280 else: # pragma: no cover 281 osobj, treeobj = self._os_loader.get_db().guess_os_from_tree(tree) 282 if not osobj: 283 return None # pragma: no cover 284 return osobj.get_short_id(), _OsTree(treeobj) 285 286 def list_os(self): 287 """ 288 List all OSes in the DB 289 """ 290 sortmap = {} 291 292 for osobj in self._all_variants.values(): 293 sortmap[osobj.name] = osobj 294 295 return _sort(sortmap) 296 297 298OSDB = _OSDB() 299 300 301##################### 302# OsResources class # 303##################### 304 305class _OsResources: 306 def __init__(self, minimum, recommended): 307 self._minimum = self._convert_to_dict(minimum) 308 self._recommended = self._convert_to_dict(recommended) 309 310 def _convert_to_dict(self, resources): 311 """ 312 Convert an OsResources object to a dictionary for easier 313 lookups. Layout is: {arch: {strkey: value}} 314 """ 315 ret = {} 316 for r in _OsinfoIter(resources): 317 vals = {} 318 vals["ram"] = r.get_ram() 319 vals["n-cpus"] = r.get_n_cpus() 320 vals["storage"] = r.get_storage() 321 ret[r.get_architecture()] = vals 322 return ret 323 324 def _get_key(self, resources, key, arch): 325 for checkarch in [arch, "all"]: 326 val = resources.get(checkarch, {}).get(key, -1) 327 if val != -1: 328 return val 329 330 def _get_minimum_key(self, key, arch): 331 val = self._get_key(self._minimum, key, arch) 332 if val and val > 0: 333 return val 334 335 def _get_recommended_key(self, key, arch): 336 val = self._get_key(self._recommended, key, arch) 337 if val and val > 0: 338 return val 339 # If we are looking for a recommended value, but the OS 340 # DB only has minimum resources tracked, double the minimum 341 # value as an approximation at a 'recommended' value 342 val = self._get_minimum_key(key, arch) 343 if val: 344 log.debug("No recommended value found for key='%s', " 345 "using minimum=%s * 2", key, val) 346 return val * 2 347 return None 348 349 def get_minimum_ram(self, arch): 350 return self._get_minimum_key("ram", arch) 351 352 def get_recommended_ram(self, arch): 353 return self._get_recommended_key("ram", arch) 354 355 def get_recommended_ncpus(self, arch): 356 return self._get_recommended_key("n-cpus", arch) 357 358 def get_recommended_storage(self, arch): 359 return self._get_recommended_key("storage", arch) 360 361 362##################### 363# OsVariant classes # 364##################### 365 366class _OsVariant(object): 367 def __init__(self, o): 368 self._os = o 369 370 self._short_ids = [self._os.get_short_id()] 371 if hasattr(self._os, "get_short_id_list"): 372 self._short_ids = self._os.get_short_id_list() 373 self.name = self._short_ids[0] 374 375 self._family = self._os.get_family() 376 self.full_id = self._os.get_id() 377 self.label = self._os.get_name() 378 self.codename = self._os.get_codename() or "" 379 self.distro = self._os.get_distro() or "" 380 self.version = self._os.get_version() 381 382 self.eol = self._get_eol() 383 384 def __repr__(self): 385 return "<%s name=%s>" % (self.__class__.__name__, self.name) 386 387 388 ######################## 389 # Internal helper APIs # 390 ######################## 391 392 def _is_related_to(self, related_os_list, osobj=None, 393 check_derives=True, check_upgrades=True, check_clones=True): 394 osobj = osobj or self._os 395 if osobj.get_short_id() in related_os_list: 396 return True 397 398 check_list = [] 399 def _extend(newl): 400 for obj in newl: 401 if obj not in check_list: 402 check_list.append(obj) 403 404 if check_derives: 405 _extend(osobj.get_related( 406 Libosinfo.ProductRelationship.DERIVES_FROM).get_elements()) 407 if check_clones: 408 _extend(osobj.get_related( 409 Libosinfo.ProductRelationship.CLONES).get_elements()) 410 if check_upgrades: 411 _extend(osobj.get_related( 412 Libosinfo.ProductRelationship.UPGRADES).get_elements()) 413 414 for checkobj in check_list: 415 if (checkobj.get_short_id() in related_os_list or 416 self._is_related_to(related_os_list, osobj=checkobj, 417 check_upgrades=check_upgrades, 418 check_derives=check_derives, 419 check_clones=check_clones)): 420 return True 421 422 return False 423 424 def _get_all_devices(self): 425 return list(_OsinfoIter(self._os.get_all_devices())) 426 427 def _device_filter(self, devids=None, cls=None, extra_devs=None): 428 ret = [] 429 devids = devids or [] 430 for dev in self._get_all_devices(): 431 if devids and dev.get_id() not in devids: 432 continue 433 if cls and not re.match(cls, dev.get_class()): 434 continue 435 ret.append(dev.get_name()) 436 437 extra_devs = extra_devs or [] 438 for dev in extra_devs: 439 if dev.get_id() not in devids: 440 continue 441 ret.append(dev.get_name()) 442 443 return ret 444 445 446 ############### 447 # Cached APIs # 448 ############### 449 450 def _get_eol(self): 451 eol = self._os.get_eol_date() 452 rel = self._os.get_release_date() 453 454 # We can use os.get_release_status() & osinfo.ReleaseStatus.ROLLING 455 # if we require libosinfo >= 1.4.0. 456 release_status = self._os.get_param_value( 457 Libosinfo.OS_PROP_RELEASE_STATUS) or None 458 459 def _glib_to_datetime(glibdate): 460 date = "%s-%s" % (glibdate.get_year(), glibdate.get_day_of_year()) 461 return datetime.datetime.strptime(date, "%Y-%j") 462 463 now = datetime.datetime.today() 464 if eol is not None: 465 return now > _glib_to_datetime(eol) 466 467 # Rolling distributions are never EOL. 468 if release_status == "rolling": 469 return False 470 471 # If no EOL is present, assume EOL if release was > 10 years ago 472 if rel is not None: 473 rel5 = _glib_to_datetime(rel) + datetime.timedelta(days=365 * 10) 474 return now > rel5 475 return False 476 477 478 ############### 479 # Public APIs # 480 ############### 481 482 def get_handle(self): 483 return self._os 484 485 def is_generic(self): 486 return self.name == "generic" 487 488 def is_windows(self): 489 return self._family in ['win9x', 'winnt', 'win16'] 490 491 def get_short_ids(self): 492 return self._short_ids[:] 493 494 def broken_uefi_with_hyperv(self): 495 # Some windows versions are broken with hyperv enlightenments + UEFI 496 # https://bugzilla.redhat.com/show_bug.cgi?id=1185253 497 # https://bugs.launchpad.net/qemu/+bug/1593605 498 return self.name in ("win2k8r2", "win7") 499 500 def get_clock(self): 501 if self.is_windows() or self._family in ['solaris']: 502 return "localtime" 503 return "utc" 504 505 def supported_netmodels(self): 506 return self._device_filter(cls="net") 507 508 def supports_usbtablet(self, extra_devs=None): 509 # If no OS specified, still default to tablet 510 if self.is_generic(): 511 return True 512 513 devids = ["http://usb.org/usb/80ee/0021"] 514 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 515 516 def supports_virtiodisk(self, extra_devs=None): 517 # virtio-block and virtio1.0-block 518 devids = ["http://pcisig.com/pci/1af4/1001", 519 "http://pcisig.com/pci/1af4/1042"] 520 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 521 522 def supports_virtioscsi(self, extra_devs=None): 523 # virtio-scsi and virtio1.0-scsi 524 devids = ["http://pcisig.com/pci/1af4/1004", 525 "http://pcisig.com/pci/1af4/1048"] 526 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 527 528 def supports_virtionet(self, extra_devs=None): 529 # virtio-net and virtio1.0-net 530 devids = ["http://pcisig.com/pci/1af4/1000", 531 "http://pcisig.com/pci/1af4/1041"] 532 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 533 534 def supports_virtiorng(self, extra_devs=None): 535 # virtio-rng and virtio1.0-rng 536 devids = ["http://pcisig.com/pci/1af4/1005", 537 "http://pcisig.com/pci/1af4/1044"] 538 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 539 540 def supports_virtioballoon(self, extra_devs=None): 541 # virtio-balloon and virtio1.0-balloon 542 devids = ["http://pcisig.com/pci/1af4/1002", 543 "http://pcisig.com/pci/1af4/1045"] 544 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 545 546 def supports_virtioserial(self, extra_devs=None): 547 devids = ["http://pcisig.com/pci/1af4/1003", 548 "http://pcisig.com/pci/1af4/1043"] 549 if self._device_filter(devids=devids, extra_devs=extra_devs): 550 return True 551 # osinfo data was wrong for RHEL/centos here until Oct 2018 552 # Remove this hack after 6 months or so 553 return self._is_related_to("rhel6.0") 554 555 def supports_virtioinput(self, extra_devs=None): 556 # virtio1.0-input 557 devids = ["http://pcisig.com/pci/1af4/1052"] 558 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 559 560 def supports_usb3(self, extra_devs=None): 561 # qemu-xhci 562 devids = ["http://pcisig.com/pci/1b36/0004"] 563 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 564 565 def supports_virtio1(self, extra_devs=None): 566 # Use virtio1.0-net device as a proxy for virtio1.0 as a whole 567 devids = ["http://pcisig.com/pci/1af4/1041"] 568 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 569 570 def supports_chipset_q35(self, extra_devs=None): 571 # For our purposes, check for the union of q35 + virtio1.0 support 572 if (self.supports_virtionet(extra_devs=extra_devs) and 573 not self.supports_virtio1(extra_devs=extra_devs)): 574 return False 575 devids = ["http://qemu.org/chipset/x86/q35"] 576 return bool(self._device_filter(devids=devids, extra_devs=extra_devs)) 577 578 def get_recommended_resources(self): 579 minimum = self._os.get_minimum_resources() 580 recommended = self._os.get_recommended_resources() 581 return _OsResources(minimum, recommended) 582 583 def get_network_install_required_ram(self, guest): 584 if hasattr(self._os, "get_network_install_resources"): 585 resources = self._os.get_network_install_resources() 586 for r in _OsinfoIter(resources): 587 arch = r.get_architecture() 588 if arch == guest.os.arch or arch == "all": 589 return r.get_ram() 590 591 def get_kernel_url_arg(self): 592 """ 593 Kernel argument name the distro's installer uses to reference 594 a network source, possibly bypassing some installer prompts 595 """ 596 # SUSE distros 597 if self.distro in ["caasp", "sle", "sled", "sles", "opensuse"]: 598 return "install" 599 600 if self.distro not in ["centos", "rhel", "fedora"]: 601 return None 602 603 # Red Hat distros 604 try: 605 if re.match(r"[0-9]+-unknown", self.version): 606 version = float(self.version.split("-")[0]) 607 else: 608 version = float(self.version) 609 except Exception: 610 # Can hit this for -rawhide or -unknown 611 version = 999 612 613 if self.distro in ["centos", "rhel"] and version < 7: 614 return "method" 615 616 if self.distro in ["fedora"] and version < 19: 617 return "method" 618 619 return "inst.repo" 620 621 def _get_generic_location(self, treelist, arch, profile): 622 if not hasattr(Libosinfo.Tree, "get_os_variants"): # pragma: no cover 623 for tree in treelist: 624 if tree.get_architecture() == arch: 625 return tree.get_url() 626 return None 627 628 fallback_tree = None 629 if profile == "jeos": 630 profile = "Server" 631 elif profile == "desktop": 632 profile = "Workstation" 633 elif not profile: 634 profile = "Everything" 635 636 for tree in treelist: 637 if tree.get_architecture() != arch: 638 continue 639 640 variant_list = tree.get_os_variants() 641 fallback_tree = tree 642 for variant in _OsinfoIter(variant_list): 643 if profile in variant.get_name(): 644 return tree.get_url() 645 646 if fallback_tree: 647 return fallback_tree.get_url() 648 return None 649 650 def get_location(self, arch, profile=None): 651 treelist = list(_OsinfoIter(self._os.get_tree_list())) 652 653 if not treelist: 654 raise RuntimeError( 655 _("OS '%s' does not have a URL location") % self.name) 656 657 # Some distros have more than one URL for a specific architecture, 658 # which is the case for Fedora and different variants (Server, 659 # Workstation). Later on, we'll have to differentiate that and return 660 # the right one. However, for now, let's just rely on returning the 661 # most generic tree possible. 662 location = self._get_generic_location(treelist, arch, profile) 663 if location: 664 return location 665 666 raise RuntimeError( 667 _("OS '%(osname)s' does not have a URL location " 668 "for the architecture '%(archname)s'") % 669 {"osname": self.name, "archname": arch}) 670 671 def get_install_script_list(self): 672 return list(_OsinfoIter(self._os.get_install_script_list())) 673 674 def _get_installable_drivers(self, arch): 675 installable_drivers = [] 676 device_drivers = list(_OsinfoIter(self._os.get_device_drivers())) 677 for device_driver in device_drivers: 678 if arch != "all" and device_driver.get_architecture() != arch: 679 continue 680 681 installable_drivers.append(device_driver) 682 return installable_drivers 683 684 def _get_pre_installable_drivers(self, arch): 685 installable_drivers = self._get_installable_drivers(arch) 686 pre_inst_drivers = [] 687 for driver in installable_drivers: 688 if driver.get_pre_installable(): 689 pre_inst_drivers.append(driver) 690 return pre_inst_drivers 691 692 def _get_drivers_location(self, drivers): 693 locations = [] 694 for driver in drivers: 695 filenames = driver.get_files() 696 for filename in filenames: 697 location = os.path.join(driver.get_location(), filename) 698 locations.append(location) 699 return locations 700 701 def get_pre_installable_drivers_location(self, arch): 702 pre_inst_drivers = self._get_pre_installable_drivers(arch) 703 704 return self._get_drivers_location(pre_inst_drivers) 705 706 def get_pre_installable_devices(self, arch): 707 drivers = self._get_pre_installable_drivers(arch) 708 devices = [] 709 for driver in drivers: 710 devices += list(_OsinfoIter(driver.get_devices())) 711 return devices 712 713 def supports_unattended_drivers(self, arch): 714 if self._get_pre_installable_drivers(arch): 715 return True 716 return False 717 718 719class _OsMedia(object): 720 def __init__(self, osinfo_media): 721 self._media = osinfo_media 722 723 def get_kernel_path(self): 724 return self._media.get_kernel_path() 725 def get_initrd_path(self): 726 return self._media.get_initrd_path() 727 def supports_installer_script(self): 728 return self._media.supports_installer_script() 729 730 def is_netinst(self): 731 variants = list(_OsinfoIter(self._media.get_os_variants())) 732 for variant in variants: 733 if "netinst" in variant.get_id(): 734 return True 735 return False # pragma: no cover 736 737 def get_install_script_list(self): 738 return list(_OsinfoIter(self._media.get_install_script_list())) 739 740 def get_osinfo_media(self): 741 return self._media 742 743 744class _OsTree(object): 745 def __init__(self, osinfo_tree): 746 self._tree = osinfo_tree 747 748 def get_osinfo_tree(self): 749 return self._tree 750