1import os
2import pprint
3
4import pytest
5import salt.utils.path
6import salt.utils.pkg
7import salt.utils.platform
8from tests.support.case import ModuleCase
9from tests.support.helpers import requires_system_grains
10from tests.support.mixins import SaltReturnAssertsMixin
11from tests.support.unit import skipIf
12
13
14@pytest.mark.windows_whitelisted
15class PkgModuleTest(ModuleCase, SaltReturnAssertsMixin):
16    """
17    Validate the pkg module
18    """
19
20    @classmethod
21    @requires_system_grains
22    def setUpClass(cls, grains):  # pylint: disable=arguments-differ
23        cls.ctx = {}
24        cls.pkg = "figlet"
25        if salt.utils.platform.is_windows():
26            cls.pkg = "putty"
27        elif grains["os_family"] == "RedHat":
28            if grains["os"] == "VMware Photon OS":
29                cls.pkg = "snoopy"
30            else:
31                cls.pkg = "units"
32
33    @pytest.mark.skip_if_not_root
34    @pytest.mark.requires_salt_modules("pkg.refresh_db")
35    def setUp(self):
36        if "refresh" not in self.ctx:
37            self.run_function("pkg.refresh_db")
38            self.ctx["refresh"] = True
39
40    @pytest.mark.requires_salt_modules("pkg.list_pkgs")
41    @pytest.mark.slow_test
42    def test_list(self):
43        """
44        verify that packages are installed
45        """
46        ret = self.run_function("pkg.list_pkgs")
47        self.assertNotEqual(len(ret.keys()), 0)
48
49    @pytest.mark.requires_salt_modules("pkg.version_cmp")
50    @requires_system_grains
51    @pytest.mark.slow_test
52    def test_version_cmp(self, grains):
53        """
54        test package version comparison on supported platforms
55        """
56        func = "pkg.version_cmp"
57        if grains["os_family"] == "Debian":
58            lt = ["0.2.4-0ubuntu1", "0.2.4.1-0ubuntu1"]
59            eq = ["0.2.4-0ubuntu1", "0.2.4-0ubuntu1"]
60            gt = ["0.2.4.1-0ubuntu1", "0.2.4-0ubuntu1"]
61        elif grains["os_family"] == "Suse":
62            lt = ["2.3.0-1", "2.3.1-15.1"]
63            eq = ["2.3.1-15.1", "2.3.1-15.1"]
64            gt = ["2.3.2-15.1", "2.3.1-15.1"]
65        else:
66            lt = ["2.3.0", "2.3.1"]
67            eq = ["2.3.1", "2.3.1"]
68            gt = ["2.3.2", "2.3.1"]
69
70        self.assertEqual(self.run_function(func, lt), -1)
71        self.assertEqual(self.run_function(func, eq), 0)
72        self.assertEqual(self.run_function(func, gt), 1)
73
74    @pytest.mark.destructive_test
75    @pytest.mark.requires_salt_modules("pkg.mod_repo", "pkg.del_repo", "pkg.get_repo")
76    @requires_system_grains
77    @pytest.mark.slow_test
78    @pytest.mark.requires_network
79    def test_mod_del_repo(self, grains):
80        """
81        test modifying and deleting a software repository
82        """
83        repo = None
84
85        try:
86            if grains["os"] == "Ubuntu":
87                repo = "ppa:otto-kesselgulasch/gimp-edge"
88                uri = "http://ppa.launchpad.net/otto-kesselgulasch/gimp-edge/ubuntu"
89                ret = self.run_function("pkg.mod_repo", [repo, "comps=main"])
90                self.assertNotEqual(ret, {})
91                ret = self.run_function("pkg.get_repo", [repo])
92
93                self.assertIsInstance(
94                    ret,
95                    dict,
96                    "The 'pkg.get_repo' command did not return the excepted dictionary."
97                    " Output:\n{}".format(ret),
98                )
99                self.assertEqual(
100                    ret["uri"],
101                    uri,
102                    msg="The URI did not match. Full return:\n{}".format(
103                        pprint.pformat(ret)
104                    ),
105                )
106            elif grains["os_family"] == "RedHat":
107                repo = "saltstack"
108                name = "SaltStack repo for RHEL/CentOS {}".format(
109                    grains["osmajorrelease"]
110                )
111                baseurl = (
112                    "http://repo.saltproject.io/py3/redhat/{}/x86_64/latest/".format(
113                        grains["osmajorrelease"]
114                    )
115                )
116                gpgkey = "https://repo.saltproject.io/py3/redhat/{}/x86_64/latest/SALTSTACK-GPG-KEY.pub".format(
117                    grains["osmajorrelease"]
118                )
119                gpgcheck = 1
120                enabled = 1
121                ret = self.run_function(
122                    "pkg.mod_repo",
123                    [repo],
124                    name=name,
125                    baseurl=baseurl,
126                    gpgkey=gpgkey,
127                    gpgcheck=gpgcheck,
128                    enabled=enabled,
129                )
130                # return data from pkg.mod_repo contains the file modified at
131                # the top level, so use next(iter(ret)) to get that key
132                self.assertNotEqual(ret, {})
133                repo_info = ret[next(iter(ret))]
134                self.assertIn(repo, repo_info)
135                self.assertEqual(repo_info[repo]["baseurl"], baseurl)
136                ret = self.run_function("pkg.get_repo", [repo])
137                self.assertEqual(ret["baseurl"], baseurl)
138        finally:
139            if repo is not None:
140                self.run_function("pkg.del_repo", [repo])
141
142    @pytest.mark.slow_test
143    def test_mod_del_repo_multiline_values(self):
144        """
145        test modifying and deleting a software repository defined with multiline values
146        """
147        os_grain = self.run_function("grains.item", ["os"])["os"]
148        repo = None
149        try:
150            if os_grain in ["CentOS", "RedHat", "VMware Photon OS"]:
151                my_baseurl = (
152                    "http://my.fake.repo/foo/bar/\n http://my.fake.repo.alt/foo/bar/"
153                )
154                expected_get_repo_baseurl = (
155                    "http://my.fake.repo/foo/bar/\nhttp://my.fake.repo.alt/foo/bar/"
156                )
157                major_release = int(
158                    self.run_function("grains.item", ["osmajorrelease"])[
159                        "osmajorrelease"
160                    ]
161                )
162                repo = "fakerepo"
163                name = "Fake repo for RHEL/CentOS/SUSE"
164                baseurl = my_baseurl
165                gpgkey = "https://my.fake.repo/foo/bar/MY-GPG-KEY.pub"
166                failovermethod = "priority"
167                gpgcheck = 1
168                enabled = 1
169                ret = self.run_function(
170                    "pkg.mod_repo",
171                    [repo],
172                    name=name,
173                    baseurl=baseurl,
174                    gpgkey=gpgkey,
175                    gpgcheck=gpgcheck,
176                    enabled=enabled,
177                    failovermethod=failovermethod,
178                )
179                # return data from pkg.mod_repo contains the file modified at
180                # the top level, so use next(iter(ret)) to get that key
181                self.assertNotEqual(ret, {})
182                repo_info = ret[next(iter(ret))]
183                self.assertIn(repo, repo_info)
184                self.assertEqual(repo_info[repo]["baseurl"], my_baseurl)
185                ret = self.run_function("pkg.get_repo", [repo])
186                self.assertEqual(ret["baseurl"], expected_get_repo_baseurl)
187                self.run_function("pkg.mod_repo", [repo])
188                ret = self.run_function("pkg.get_repo", [repo])
189                self.assertEqual(ret["baseurl"], expected_get_repo_baseurl)
190        finally:
191            if repo is not None:
192                self.run_function("pkg.del_repo", [repo])
193
194    @pytest.mark.requires_salt_modules("pkg.owner")
195    def test_owner(self):
196        """
197        test finding the package owning a file
198        """
199        func = "pkg.owner"
200        ret = self.run_function(func, ["/bin/ls"])
201        self.assertNotEqual(len(ret), 0)
202
203    # Similar to pkg.owner, but for FreeBSD's pkgng
204    @pytest.mark.requires_salt_modules("pkg.which")
205    def test_which(self):
206        """
207        test finding the package owning a file
208        """
209        func = "pkg.which"
210        ret = self.run_function(func, ["/usr/local/bin/salt-call"])
211        self.assertNotEqual(len(ret), 0)
212
213    @pytest.mark.destructive_test
214    @pytest.mark.requires_salt_modules("pkg.version", "pkg.install", "pkg.remove")
215    @pytest.mark.slow_test
216    @pytest.mark.requires_network
217    def test_install_remove(self):
218        """
219        successfully install and uninstall a package
220        """
221        version = self.run_function("pkg.version", [self.pkg])
222
223        def test_install():
224            install_ret = self.run_function("pkg.install", [self.pkg])
225            self.assertIn(self.pkg, install_ret)
226
227        def test_remove():
228            remove_ret = self.run_function("pkg.remove", [self.pkg])
229            self.assertIn(self.pkg, remove_ret)
230
231        if version and isinstance(version, dict):
232            version = version[self.pkg]
233
234        if version:
235            test_remove()
236            test_install()
237        else:
238            test_install()
239            test_remove()
240
241    @pytest.mark.destructive_test
242    @pytest.mark.skipif(
243        salt.utils.platform.is_photonos(),
244        reason="package hold/unhold unsupported on Photon OS",
245    )
246    @pytest.mark.requires_salt_modules(
247        "pkg.hold",
248        "pkg.unhold",
249        "pkg.install",
250        "pkg.version",
251        "pkg.remove",
252        "pkg.list_pkgs",
253    )
254    @requires_system_grains
255    @pytest.mark.slow_test
256    @pytest.mark.requires_network
257    @pytest.mark.requires_salt_states("pkg.installed")
258    def test_hold_unhold(self, grains):
259        """
260        test holding and unholding a package
261        """
262        versionlock_pkg = None
263        if grains["os_family"] == "RedHat":
264            pkgs = {
265                p for p in self.run_function("pkg.list_pkgs") if "-versionlock" in p
266            }
267            if not pkgs:
268                self.skipTest("No versionlock package found in repositories")
269            for versionlock_pkg in pkgs:
270                ret = self.run_state(
271                    "pkg.installed", name=versionlock_pkg, refresh=False
272                )
273                # Exit loop if a versionlock package installed correctly
274                try:
275                    self.assertSaltTrueReturn(ret)
276                    break
277                except AssertionError:
278                    pass
279            else:
280                self.fail("Could not install versionlock package from {}".format(pkgs))
281
282        self.run_function("pkg.install", [self.pkg])
283
284        try:
285            hold_ret = self.run_function("pkg.hold", [self.pkg])
286            if versionlock_pkg and "-versionlock is not installed" in str(hold_ret):
287                self.skipTest("{}  `{}` is installed".format(hold_ret, versionlock_pkg))
288            self.assertIn(self.pkg, hold_ret)
289            self.assertTrue(hold_ret[self.pkg]["result"])
290
291            unhold_ret = self.run_function("pkg.unhold", [self.pkg])
292            self.assertIn(self.pkg, unhold_ret)
293            self.assertTrue(unhold_ret[self.pkg]["result"])
294            self.run_function("pkg.remove", [self.pkg])
295        finally:
296            if versionlock_pkg:
297                ret = self.run_state("pkg.removed", name=versionlock_pkg)
298                self.assertSaltTrueReturn(ret)
299
300    @pytest.mark.destructive_test
301    @pytest.mark.requires_salt_modules("pkg.refresh_db")
302    @requires_system_grains
303    @pytest.mark.slow_test
304    @pytest.mark.requires_network
305    def test_refresh_db(self, grains):
306        """
307        test refreshing the package database
308        """
309        func = "pkg.refresh_db"
310
311        rtag = salt.utils.pkg.rtag(self.minion_opts)
312        salt.utils.pkg.write_rtag(self.minion_opts)
313        self.assertTrue(os.path.isfile(rtag))
314
315        ret = self.run_function(func)
316        if not isinstance(ret, dict):
317            self.skipTest(
318                "Upstream repo did not return coherent results: {}".format(ret)
319            )
320
321        if grains["os_family"] == "RedHat":
322            self.assertIn(ret, (True, None))
323        elif grains["os_family"] == "Suse":
324            if not isinstance(ret, dict):
325                self.skipTest(
326                    "Upstream repo did not return coherent results. Skipping test."
327                )
328            self.assertNotEqual(ret, {})
329            for source, state in ret.items():
330                self.assertIn(state, (True, False, None))
331
332        self.assertFalse(os.path.isfile(rtag))
333
334    @pytest.mark.requires_salt_modules("pkg.info_installed")
335    @requires_system_grains
336    @pytest.mark.slow_test
337    def test_pkg_info(self, grains):
338        """
339        Test returning useful information on Ubuntu systems.
340        """
341        func = "pkg.info_installed"
342
343        if grains["os_family"] == "Debian":
344            ret = self.run_function(func, ["bash", "dpkg"])
345            keys = ret.keys()
346            self.assertIn("bash", keys)
347            self.assertIn("dpkg", keys)
348        elif grains["os_family"] == "RedHat":
349            ret = self.run_function(func, ["rpm", "bash"])
350            keys = ret.keys()
351            self.assertIn("rpm", keys)
352            self.assertIn("bash", keys)
353        elif grains["os_family"] == "Suse":
354            ret = self.run_function(func, ["less", "zypper"])
355            keys = ret.keys()
356            self.assertIn("less", keys)
357            self.assertIn("zypper", keys)
358        else:
359            ret = self.run_function(func, [self.pkg])
360            keys = ret.keys()
361            self.assertIn(self.pkg, keys)
362
363    @skipIf(True, "Temporary Skip - Causes centos 8 test to fail")
364    @pytest.mark.destructive_test
365    @pytest.mark.requires_salt_modules(
366        "pkg.refresh_db",
367        "pkg.upgrade",
368        "pkg.install",
369        "pkg.list_repo_pkgs",
370        "pkg.list_upgrades",
371    )
372    @requires_system_grains
373    @pytest.mark.slow_test
374    @pytest.mark.requires_network
375    def test_pkg_upgrade_has_pending_upgrades(self, grains):
376        """
377        Test running a system upgrade when there are packages that need upgrading
378        """
379        if grains["os"] == "Arch":
380            self.skipTest("Arch moved to Python 3.8 and we're not ready for it yet")
381
382        func = "pkg.upgrade"
383
384        # First make sure that an up-to-date copy of the package db is available
385        self.run_function("pkg.refresh_db")
386
387        if grains["os_family"] == "Suse":
388            # This test assumes that there are multiple possible versions of a
389            # package available. That makes it brittle if you pick just one
390            # target, as changes in the available packages will break the test.
391            # Therefore, we'll choose from several packages to make sure we get
392            # one that is suitable for this test.
393            packages = ("hwinfo", "avrdude", "diffoscope", "vim")
394            available = self.run_function("pkg.list_repo_pkgs", packages)
395
396            for package in packages:
397                try:
398                    new, old = available[package][:2]
399                except (KeyError, ValueError):
400                    # Package not available, or less than 2 versions
401                    # available. This is not a suitable target.
402                    continue
403                else:
404                    target = package
405                    break
406            else:
407                # None of the packages have more than one version available, so
408                # we need to find new package(s). pkg.list_repo_pkgs can be
409                # used to get an overview of the available packages. We should
410                # try to find packages with few dependencies and small download
411                # sizes, to keep this test from taking longer than necessary.
412                self.fail("No suitable package found for this test")
413
414            # Make sure we have the 2nd-oldest available version installed
415            ret = self.run_function("pkg.install", [target], version=old)
416            if not isinstance(ret, dict):
417                if ret.startswith("ERROR"):
418                    self.skipTest(
419                        "Could not install older {} to complete test.".format(target)
420                    )
421
422            # Run a system upgrade, which should catch the fact that the
423            # targeted package needs upgrading, and upgrade it.
424            ret = self.run_function(func)
425
426            # The changes dictionary should not be empty.
427            if "changes" in ret:
428                self.assertIn(target, ret["changes"])
429            else:
430                self.assertIn(target, ret)
431        else:
432            ret = self.run_function("pkg.list_upgrades")
433            if ret == "" or ret == {}:
434                self.skipTest(
435                    "No updates available for this machine.  Skipping pkg.upgrade test."
436                )
437            else:
438                args = []
439                if grains["os_family"] == "Debian":
440                    args = ["dist_upgrade=True"]
441                ret = self.run_function(func, args)
442                self.assertNotEqual(ret, {})
443
444    @pytest.mark.destructive_test
445    @skipIf(
446        salt.utils.platform.is_darwin(),
447        "The jenkins user is equivalent to root on mac, causing the test to be"
448        " unrunnable",
449    )
450    @pytest.mark.requires_salt_modules("pkg.remove", "pkg.latest_version")
451    @requires_system_grains
452    @pytest.mark.slow_test
453    @pytest.mark.requires_salt_states("pkg.removed")
454    def test_pkg_latest_version(self, grains):
455        """
456        Check that pkg.latest_version returns the latest version of the uninstalled package.
457        The package is not installed. Only the package version is checked.
458        """
459        self.run_state("pkg.removed", name=self.pkg)
460
461        cmd_pkg = []
462        if grains["os_family"] == "RedHat":
463            cmd_pkg = self.run_function("cmd.run", ["yum list {}".format(self.pkg)])
464        elif salt.utils.platform.is_windows():
465            cmd_pkg = self.run_function("pkg.list_available", [self.pkg])
466        elif grains["os_family"] == "Debian":
467            cmd_pkg = self.run_function("cmd.run", ["apt list {}".format(self.pkg)])
468        elif grains["os_family"] == "Arch":
469            cmd_pkg = self.run_function("cmd.run", ["pacman -Si {}".format(self.pkg)])
470        elif grains["os_family"] == "FreeBSD":
471            cmd_pkg = self.run_function(
472                "cmd.run", ["pkg search -S name -qQ version -e {}".format(self.pkg)]
473            )
474        elif grains["os_family"] == "Suse":
475            cmd_pkg = self.run_function("cmd.run", ["zypper info {}".format(self.pkg)])
476        elif grains["os_family"] == "MacOS":
477            brew_bin = salt.utils.path.which("brew")
478            mac_user = self.run_function("file.get_user", [brew_bin])
479            if mac_user == "root":
480                self.skipTest(
481                    "brew cannot run as root, try a user in {}".format(
482                        os.listdir("/Users/")
483                    )
484                )
485            cmd_pkg = self.run_function(
486                "cmd.run", ["brew info {}".format(self.pkg)], run_as=mac_user
487            )
488        else:
489            self.skipTest(
490                "TODO: test not configured for {}".format(grains["os_family"])
491            )
492        pkg_latest = self.run_function("pkg.latest_version", [self.pkg])
493        self.assertIn(pkg_latest, cmd_pkg)
494