1import xml.etree.ElementTree as ET
2
3import pytest
4import salt.modules.config as config
5import salt.modules.virt as virt
6from tests.support.mock import MagicMock
7
8
9class LibvirtMock(MagicMock):  # pylint: disable=too-many-ancestors
10    """
11    Libvirt library mock
12    """
13
14    class virDomain(MagicMock):
15        """
16        virDomain mock
17        """
18
19    class libvirtError(Exception):
20        """
21        libvirtError mock
22        """
23
24        def __init__(self, msg):
25            super().__init__(msg)
26            self.msg = msg
27
28        def get_error_message(self):
29            return self.msg
30
31
32class MappedResultMock(MagicMock):
33    """
34    Mock class consistently return the same mock object based on the first argument.
35    """
36
37    _instances = {}
38
39    def __init__(self):
40        def mapped_results(*args, **kwargs):
41            if args[0] not in self._instances.keys():
42                raise virt.libvirt.libvirtError("Not found: {}".format(args[0]))
43            return self._instances[args[0]]
44
45        super().__init__(side_effect=mapped_results)
46
47    def add(self, name, value=None):
48        self._instances[name] = value or MagicMock()
49
50
51def loader_modules_config():
52    # Create libvirt mock and connection mock
53    mock_libvirt = LibvirtMock()
54    mock_conn = MagicMock()
55    mock_conn.getStoragePoolCapabilities.return_value = "<storagepoolCapabilities/>"
56
57    mock_libvirt.openAuth.return_value = mock_conn
58    return {
59        virt: {
60            "libvirt": mock_libvirt,
61            "__salt__": {"config.get": config.get, "config.option": config.option},
62        },
63        config: {},
64    }
65
66
67@pytest.fixture
68def make_mock_vm():
69    def _make_mock_vm(xml_def=None, running=False, inactive_def=None):
70        mocked_conn = virt.libvirt.openAuth.return_value
71
72        desc = xml_def
73        if not desc:
74            desc = """
75                <domain type='kvm' id='7'>
76                  <name>my_vm</name>
77                  <memory unit='KiB'>1048576</memory>
78                  <currentMemory unit='KiB'>1048576</currentMemory>
79                  <vcpu placement='auto'>1</vcpu>
80                  <on_reboot>restart</on_reboot>
81                  <os>
82                    <type arch='x86_64' machine='pc-i440fx-2.6'>hvm</type>
83                  </os>
84                </domain>
85            """
86        doc = ET.fromstring(desc)
87        name = doc.find("name").text
88        os_type = "hvm"
89        os_type_node = doc.find("os/type")
90        if os_type_node is not None:
91            os_type = os_type_node.text
92
93        mocked_conn.listDefinedDomains.return_value = [name]
94
95        # Configure the mocked domain
96        if not isinstance(mocked_conn.lookupByName, MappedResultMock):
97            mocked_conn.lookupByName = MappedResultMock()
98        mocked_conn.lookupByName.add(name)
99        domain_mock = mocked_conn.lookupByName(name)
100
101        domain_mock.XMLDesc = MappedResultMock()
102        domain_mock.XMLDesc.add(0, desc)
103        domain_mock.XMLDesc.add(
104            virt.libvirt.VIR_DOMAIN_XML_INACTIVE, inactive_def or desc
105        )
106        domain_mock.OSType.return_value = os_type
107
108        # Return state as shutdown
109        domain_mock.info.return_value = [
110            0 if running else 4,
111            2048 * 1024,
112            1024 * 1024,
113            2,
114            1234,
115        ]
116        domain_mock.ID.return_value = 1
117        domain_mock.name.return_value = name
118
119        domain_mock.attachDevice.return_value = 0
120        domain_mock.detachDevice.return_value = 0
121        domain_mock.setMemoryFlags.return_value = 0
122        domain_mock.setVcpusFlags.return_value = 0
123
124        domain_mock.connect.return_value = mocked_conn
125
126        return domain_mock
127
128    return _make_mock_vm
129
130
131@pytest.fixture
132def make_mock_storage_pool():
133    def _make_mock_storage_pool(name, type, volumes, source=None):
134        mocked_conn = virt.libvirt.openAuth.return_value
135
136        # Append the pool name to the list of known mocked pools
137        all_pools = mocked_conn.listStoragePools.return_value
138        if not isinstance(all_pools, list):
139            all_pools = []
140        all_pools.append(name)
141        mocked_conn.listStoragePools.return_value = all_pools
142
143        # Ensure we have mapped results for the pools
144        if not isinstance(mocked_conn.storagePoolLookupByName, MappedResultMock):
145            mocked_conn.storagePoolLookupByName = MappedResultMock()
146
147        # Configure the pool
148        mocked_conn.storagePoolLookupByName.add(name)
149        mocked_pool = mocked_conn.storagePoolLookupByName(name)
150        source_def = source
151        if not source and type == "disk":
152            source = "<device path='/dev/{}'/>".format(name)
153        pool_path = "/path/to/{}".format(name)
154        mocked_pool.XMLDesc.return_value = """
155            <pool type='{}'>
156                <source>
157                {}
158                </source>
159                <target>
160                    <path>{}</path>
161                </target>
162            </pool>
163            """.format(
164            type, source, pool_path
165        )
166        mocked_pool.name.return_value = name
167        mocked_pool.info.return_value = [
168            virt.libvirt.VIR_STORAGE_POOL_RUNNING,
169        ]
170
171        # Append the pool to the listAllStoragePools list
172        all_pools_obj = mocked_conn.listAllStoragePools.return_value
173        if not isinstance(all_pools_obj, list):
174            all_pools_obj = []
175        all_pools_obj.append(mocked_pool)
176        mocked_conn.listAllStoragePools.return_value = all_pools_obj
177
178        # Configure the volumes
179        if not isinstance(mocked_pool.storageVolLookupByName, MappedResultMock):
180            mocked_pool.storageVolLookupByName = MappedResultMock()
181        mocked_pool.listVolumes.return_value = volumes
182
183        all_volumes = []
184        for volume in volumes:
185            mocked_pool.storageVolLookupByName.add(volume)
186            mocked_vol = mocked_pool.storageVolLookupByName(volume)
187            vol_path = "{}/{}".format(pool_path, volume)
188            mocked_vol.XMLDesc.return_value = """
189            <volume>
190                <target>
191                    <path>{}</path>
192                </target>
193            </volume>
194            """.format(
195                vol_path,
196            )
197            mocked_vol.path.return_value = vol_path
198            mocked_vol.name.return_value = volume
199
200            mocked_vol.info.return_value = [
201                0,
202                1234567,
203                12345,
204            ]
205            all_volumes.append(mocked_vol)
206
207        # Set the listAllVolumes return_value
208        mocked_pool.listAllVolumes.return_value = all_volumes
209        return mocked_pool
210
211    return _make_mock_storage_pool
212
213
214@pytest.fixture
215def make_capabilities():
216    def _make_capabilities():
217        mocked_conn = virt.libvirt.openAuth.return_value
218        mocked_conn.getCapabilities.return_value = """
219<capabilities>
220  <host>
221    <uuid>44454c4c-3400-105a-8033-b3c04f4b344a</uuid>
222    <cpu>
223      <arch>x86_64</arch>
224      <model>Nehalem</model>
225      <vendor>Intel</vendor>
226      <microcode version='25'/>
227      <topology sockets='1' cores='4' threads='2'/>
228      <feature name='vme'/>
229      <feature name='ds'/>
230      <feature name='acpi'/>
231      <pages unit='KiB' size='4'/>
232      <pages unit='KiB' size='2048'/>
233    </cpu>
234    <power_management>
235      <suspend_mem/>
236      <suspend_disk/>
237      <suspend_hybrid/>
238    </power_management>
239    <migration_features>
240      <live/>
241      <uri_transports>
242        <uri_transport>tcp</uri_transport>
243        <uri_transport>rdma</uri_transport>
244      </uri_transports>
245    </migration_features>
246    <topology>
247      <cells num='1'>
248        <cell id='0'>
249          <memory unit='KiB'>12367120</memory>
250          <pages unit='KiB' size='4'>3091780</pages>
251          <pages unit='KiB' size='2048'>0</pages>
252          <distances>
253            <sibling id='0' value='10'/>
254          </distances>
255          <cpus num='8'>
256            <cpu id='0' socket_id='0' core_id='0' siblings='0,4'/>
257            <cpu id='1' socket_id='0' core_id='1' siblings='1,5'/>
258            <cpu id='2' socket_id='0' core_id='2' siblings='2,6'/>
259            <cpu id='3' socket_id='0' core_id='3' siblings='3,7'/>
260            <cpu id='4' socket_id='0' core_id='0' siblings='0,4'/>
261            <cpu id='5' socket_id='0' core_id='1' siblings='1,5'/>
262            <cpu id='6' socket_id='0' core_id='2' siblings='2,6'/>
263            <cpu id='7' socket_id='0' core_id='3' siblings='3,7'/>
264          </cpus>
265        </cell>
266      </cells>
267    </topology>
268    <cache>
269      <bank id='0' level='3' type='both' size='8' unit='MiB' cpus='0-7'/>
270    </cache>
271    <secmodel>
272      <model>apparmor</model>
273      <doi>0</doi>
274    </secmodel>
275    <secmodel>
276      <model>dac</model>
277      <doi>0</doi>
278      <baselabel type='kvm'>+487:+486</baselabel>
279      <baselabel type='qemu'>+487:+486</baselabel>
280    </secmodel>
281  </host>
282
283  <guest>
284    <os_type>hvm</os_type>
285    <arch name='i686'>
286      <wordsize>32</wordsize>
287      <emulator>/usr/bin/qemu-system-i386</emulator>
288      <machine maxCpus='255'>pc-i440fx-2.6</machine>
289      <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine>
290      <machine maxCpus='255'>pc-0.12</machine>
291      <domain type='qemu'/>
292      <domain type='kvm'>
293        <emulator>/usr/bin/qemu-kvm</emulator>
294        <machine maxCpus='255'>pc-i440fx-2.6</machine>
295        <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine>
296        <machine maxCpus='255'>pc-0.12</machine>
297      </domain>
298    </arch>
299    <features>
300      <cpuselection/>
301      <deviceboot/>
302      <disksnapshot default='on' toggle='no'/>
303      <acpi default='on' toggle='yes'/>
304      <apic default='on' toggle='no'/>
305      <pae/>
306      <nonpae/>
307    </features>
308  </guest>
309
310  <guest>
311    <os_type>hvm</os_type>
312    <arch name='x86_64'>
313      <wordsize>64</wordsize>
314      <emulator>/usr/bin/qemu-system-x86_64</emulator>
315      <machine maxCpus='255'>pc-i440fx-2.6</machine>
316      <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine>
317      <machine maxCpus='255'>pc-0.12</machine>
318      <domain type='qemu'/>
319      <domain type='kvm'>
320        <emulator>/usr/bin/qemu-kvm</emulator>
321        <machine maxCpus='255'>pc-i440fx-2.6</machine>
322        <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine>
323        <machine maxCpus='255'>pc-0.12</machine>
324      </domain>
325    </arch>
326    <features>
327      <cpuselection/>
328      <deviceboot/>
329      <disksnapshot default='on' toggle='no'/>
330      <acpi default='on' toggle='yes'/>
331      <apic default='on' toggle='no'/>
332    </features>
333  </guest>
334
335</capabilities>"""
336
337    return _make_capabilities
338
339
340@pytest.fixture
341def make_mock_network():
342    def _make_mock_net(xml_def):
343        mocked_conn = virt.libvirt.openAuth.return_value
344
345        doc = ET.fromstring(xml_def)
346        name = doc.find("name").text
347
348        if not isinstance(mocked_conn.networkLookupByName, MappedResultMock):
349            mocked_conn.networkLookupByName = MappedResultMock()
350        mocked_conn.networkLookupByName.add(name)
351        net_mock = mocked_conn.networkLookupByName(name)
352        net_mock.XMLDesc.return_value = xml_def
353
354        # libvirt defaults the autostart to unset
355        net_mock.autostart.return_value = 0
356
357        # Append the network to listAllNetworks return value
358        all_nets = mocked_conn.listAllNetworks.return_value
359        if not isinstance(all_nets, list):
360            all_nets = []
361        all_nets.append(net_mock)
362        mocked_conn.listAllNetworks.return_value = all_nets
363
364        return net_mock
365
366    return _make_mock_net
367
368
369@pytest.fixture
370def make_mock_device():
371    """
372    Create a mock host device
373    """
374
375    def _make_mock_device(xml_def):
376        mocked_conn = virt.libvirt.openAuth.return_value
377        if not isinstance(mocked_conn.nodeDeviceLookupByName, MappedResultMock):
378            mocked_conn.nodeDeviceLookupByName = MappedResultMock()
379
380        doc = ET.fromstring(xml_def)
381        name = doc.find("./name").text
382
383        mocked_conn.nodeDeviceLookupByName.add(name)
384        mocked_device = mocked_conn.nodeDeviceLookupByName(name)
385        mocked_device.name.return_value = name
386        mocked_device.XMLDesc.return_value = xml_def
387        mocked_device.listCaps.return_value = [
388            cap.get("type") for cap in doc.findall("./capability")
389        ]
390        return mocked_device
391
392    return _make_mock_device
393
394
395@pytest.fixture(params=[True, False], ids=["test", "notest"])
396def test(request):
397    """
398    Run the test with both True and False test values
399    """
400    return request.param
401