1#
2# Copyright 2016 SUSE LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""
16    :codeauthor: Bo Maryniuk <bo@suse.de>
17"""
18
19import os
20
21from salt.modules.inspectlib.collector import Inspector
22from tests.support.helpers import no_symlinks
23from tests.support.mock import MagicMock, patch
24from tests.support.unit import TestCase, skipIf
25
26
27@skipIf(no_symlinks(), "Git missing 'core.symlinks=true' config")
28class InspectorCollectorTestCase(TestCase):
29    """
30    Test inspectlib:collector:Inspector
31    """
32
33    def setUp(self):
34        patcher = patch("os.mkdir", MagicMock())
35        patcher.start()
36        self.addCleanup(patcher.stop)
37
38    def test_env_loader(self):
39        """
40        Get packages on the different distros.
41
42        :return:
43        """
44        cachedir = os.sep + os.sep.join(["foo", "cache"])
45        piddir = os.sep + os.sep.join(["foo", "pid"])
46        inspector = Inspector(cachedir=cachedir, piddir=piddir, pidfilename="bar.pid")
47        self.assertEqual(
48            inspector.dbfile,
49            os.sep + os.sep.join(["foo", "cache", "_minion_collector.db"]),
50        )
51        self.assertEqual(
52            inspector.pidfile, os.sep + os.sep.join(["foo", "pid", "bar.pid"])
53        )
54
55    def test_file_tree(self):
56        """
57        Test file tree.
58
59        :return:
60        """
61
62        inspector = Inspector(
63            cachedir=os.sep + "test", piddir=os.sep + "test", pidfilename="bar.pid"
64        )
65        tree_root = os.path.join(
66            os.path.dirname(os.path.abspath(__file__)), "tree_test"
67        )
68        expected_tree = (
69            [
70                os.sep + os.sep.join(["a", "a", "dummy.a"]),
71                os.sep + os.sep.join(["a", "b", "dummy.b"]),
72                os.sep + os.sep.join(["b", "b.1"]),
73                os.sep + os.sep.join(["b", "b.2"]),
74                os.sep + os.sep.join(["b", "b.3"]),
75            ],
76            [
77                os.sep + "a",
78                os.sep + os.sep.join(["a", "a"]),
79                os.sep + os.sep.join(["a", "b"]),
80                os.sep + os.sep.join(["a", "c"]),
81                os.sep + "b",
82                os.sep + "c",
83            ],
84            [
85                os.sep + os.sep.join(["a", "a", "dummy.ln.a"]),
86                os.sep + os.sep.join(["a", "b", "dummy.ln.b"]),
87                os.sep + os.sep.join(["a", "c", "b.1"]),
88                os.sep + os.sep.join(["b", "b.4"]),
89                os.sep + os.sep.join(["b", "b.5"]),
90                os.sep + os.sep.join(["c", "b.1"]),
91                os.sep + os.sep.join(["c", "b.2"]),
92                os.sep + os.sep.join(["c", "b.3"]),
93            ],
94        )
95        tree_result = []
96        for chunk in inspector._get_all_files(tree_root):
97            buff = []
98            for pth in chunk:
99                buff.append(pth.replace(tree_root, ""))
100            tree_result.append(buff)
101        tree_result = tuple(tree_result)
102        self.assertEqual(expected_tree, tree_result)
103
104    def test_get_unmanaged_files(self):
105        """
106        Test get_unmanaged_files.
107
108        :return:
109        """
110        inspector = Inspector(cachedir="/test", piddir="/test", pidfilename="bar.pid")
111        managed = (
112            ["a", "b", "c"],
113            ["d", "e", "f"],
114            ["g", "h", "i"],
115        )
116        system_all = (
117            ["a", "b", "c"],
118            ["d", "E", "f"],
119            ["G", "H", "i"],
120        )
121        self.assertEqual(
122            inspector._get_unmanaged_files(managed=managed, system_all=system_all),
123            ([], ["E"], ["G", "H"]),
124        )
125
126    def test_pkg_get(self):
127        """
128        Test if grains switching the pkg get method.
129
130        :return:
131        """
132        debian_list = """
133g++
134g++-4.9
135g++-5
136gawk
137gcc
138gcc-4.9
139gcc-4.9-base:amd64
140gcc-4.9-base:i386
141gcc-5
142gcc-5-base:amd64
143gcc-5-base:i386
144gcc-6-base:amd64
145gcc-6-base:i386
146"""
147        inspector = Inspector(cachedir="/test", piddir="/test", pidfilename="bar.pid")
148        inspector.grains_core = MagicMock()
149        inspector.grains_core.os_data = MagicMock()
150        inspector.grains_core.os_data.get = MagicMock(return_value="Debian")
151        with patch.object(
152            inspector, "_Inspector__get_cfg_pkgs_dpkg", MagicMock(return_value="dpkg")
153        ):
154            with patch.object(
155                inspector, "_Inspector__get_cfg_pkgs_rpm", MagicMock(return_value="rpm")
156            ):
157                inspector.grains_core = MagicMock()
158                inspector.grains_core.os_data = MagicMock()
159                inspector.grains_core.os_data().get = MagicMock(return_value="Debian")
160                self.assertEqual(inspector._get_cfg_pkgs(), "dpkg")
161                inspector.grains_core.os_data().get = MagicMock(return_value="Suse")
162                self.assertEqual(inspector._get_cfg_pkgs(), "rpm")
163                inspector.grains_core.os_data().get = MagicMock(return_value="redhat")
164                self.assertEqual(inspector._get_cfg_pkgs(), "rpm")
165