1import xml.etree.ElementTree as ET 2 3import pytest 4import salt.modules.config as config 5import salt.modules.virt as virt 6from tests.support.mock import MagicMock 7 8 9class LibvirtMock(MagicMock): # pylint: disable=too-many-ancestors 10 """ 11 Libvirt library mock 12 """ 13 14 class virDomain(MagicMock): 15 """ 16 virDomain mock 17 """ 18 19 class libvirtError(Exception): 20 """ 21 libvirtError mock 22 """ 23 24 def __init__(self, msg): 25 super().__init__(msg) 26 self.msg = msg 27 28 def get_error_message(self): 29 return self.msg 30 31 32class MappedResultMock(MagicMock): 33 """ 34 Mock class consistently return the same mock object based on the first argument. 35 """ 36 37 _instances = {} 38 39 def __init__(self): 40 def mapped_results(*args, **kwargs): 41 if args[0] not in self._instances.keys(): 42 raise virt.libvirt.libvirtError("Not found: {}".format(args[0])) 43 return self._instances[args[0]] 44 45 super().__init__(side_effect=mapped_results) 46 47 def add(self, name, value=None): 48 self._instances[name] = value or MagicMock() 49 50 51def loader_modules_config(): 52 # Create libvirt mock and connection mock 53 mock_libvirt = LibvirtMock() 54 mock_conn = MagicMock() 55 mock_conn.getStoragePoolCapabilities.return_value = "<storagepoolCapabilities/>" 56 57 mock_libvirt.openAuth.return_value = mock_conn 58 return { 59 virt: { 60 "libvirt": mock_libvirt, 61 "__salt__": {"config.get": config.get, "config.option": config.option}, 62 }, 63 config: {}, 64 } 65 66 67@pytest.fixture 68def make_mock_vm(): 69 def _make_mock_vm(xml_def=None, running=False, inactive_def=None): 70 mocked_conn = virt.libvirt.openAuth.return_value 71 72 desc = xml_def 73 if not desc: 74 desc = """ 75 <domain type='kvm' id='7'> 76 <name>my_vm</name> 77 <memory unit='KiB'>1048576</memory> 78 <currentMemory unit='KiB'>1048576</currentMemory> 79 <vcpu placement='auto'>1</vcpu> 80 <on_reboot>restart</on_reboot> 81 <os> 82 <type arch='x86_64' machine='pc-i440fx-2.6'>hvm</type> 83 </os> 84 </domain> 85 """ 86 doc = ET.fromstring(desc) 87 name = doc.find("name").text 88 os_type = "hvm" 89 os_type_node = doc.find("os/type") 90 if os_type_node is not None: 91 os_type = os_type_node.text 92 93 mocked_conn.listDefinedDomains.return_value = [name] 94 95 # Configure the mocked domain 96 if not isinstance(mocked_conn.lookupByName, MappedResultMock): 97 mocked_conn.lookupByName = MappedResultMock() 98 mocked_conn.lookupByName.add(name) 99 domain_mock = mocked_conn.lookupByName(name) 100 101 domain_mock.XMLDesc = MappedResultMock() 102 domain_mock.XMLDesc.add(0, desc) 103 domain_mock.XMLDesc.add( 104 virt.libvirt.VIR_DOMAIN_XML_INACTIVE, inactive_def or desc 105 ) 106 domain_mock.OSType.return_value = os_type 107 108 # Return state as shutdown 109 domain_mock.info.return_value = [ 110 0 if running else 4, 111 2048 * 1024, 112 1024 * 1024, 113 2, 114 1234, 115 ] 116 domain_mock.ID.return_value = 1 117 domain_mock.name.return_value = name 118 119 domain_mock.attachDevice.return_value = 0 120 domain_mock.detachDevice.return_value = 0 121 domain_mock.setMemoryFlags.return_value = 0 122 domain_mock.setVcpusFlags.return_value = 0 123 124 domain_mock.connect.return_value = mocked_conn 125 126 return domain_mock 127 128 return _make_mock_vm 129 130 131@pytest.fixture 132def make_mock_storage_pool(): 133 def _make_mock_storage_pool(name, type, volumes, source=None): 134 mocked_conn = virt.libvirt.openAuth.return_value 135 136 # Append the pool name to the list of known mocked pools 137 all_pools = mocked_conn.listStoragePools.return_value 138 if not isinstance(all_pools, list): 139 all_pools = [] 140 all_pools.append(name) 141 mocked_conn.listStoragePools.return_value = all_pools 142 143 # Ensure we have mapped results for the pools 144 if not isinstance(mocked_conn.storagePoolLookupByName, MappedResultMock): 145 mocked_conn.storagePoolLookupByName = MappedResultMock() 146 147 # Configure the pool 148 mocked_conn.storagePoolLookupByName.add(name) 149 mocked_pool = mocked_conn.storagePoolLookupByName(name) 150 source_def = source 151 if not source and type == "disk": 152 source = "<device path='/dev/{}'/>".format(name) 153 pool_path = "/path/to/{}".format(name) 154 mocked_pool.XMLDesc.return_value = """ 155 <pool type='{}'> 156 <source> 157 {} 158 </source> 159 <target> 160 <path>{}</path> 161 </target> 162 </pool> 163 """.format( 164 type, source, pool_path 165 ) 166 mocked_pool.name.return_value = name 167 mocked_pool.info.return_value = [ 168 virt.libvirt.VIR_STORAGE_POOL_RUNNING, 169 ] 170 171 # Append the pool to the listAllStoragePools list 172 all_pools_obj = mocked_conn.listAllStoragePools.return_value 173 if not isinstance(all_pools_obj, list): 174 all_pools_obj = [] 175 all_pools_obj.append(mocked_pool) 176 mocked_conn.listAllStoragePools.return_value = all_pools_obj 177 178 # Configure the volumes 179 if not isinstance(mocked_pool.storageVolLookupByName, MappedResultMock): 180 mocked_pool.storageVolLookupByName = MappedResultMock() 181 mocked_pool.listVolumes.return_value = volumes 182 183 all_volumes = [] 184 for volume in volumes: 185 mocked_pool.storageVolLookupByName.add(volume) 186 mocked_vol = mocked_pool.storageVolLookupByName(volume) 187 vol_path = "{}/{}".format(pool_path, volume) 188 mocked_vol.XMLDesc.return_value = """ 189 <volume> 190 <target> 191 <path>{}</path> 192 </target> 193 </volume> 194 """.format( 195 vol_path, 196 ) 197 mocked_vol.path.return_value = vol_path 198 mocked_vol.name.return_value = volume 199 200 mocked_vol.info.return_value = [ 201 0, 202 1234567, 203 12345, 204 ] 205 all_volumes.append(mocked_vol) 206 207 # Set the listAllVolumes return_value 208 mocked_pool.listAllVolumes.return_value = all_volumes 209 return mocked_pool 210 211 return _make_mock_storage_pool 212 213 214@pytest.fixture 215def make_capabilities(): 216 def _make_capabilities(): 217 mocked_conn = virt.libvirt.openAuth.return_value 218 mocked_conn.getCapabilities.return_value = """ 219<capabilities> 220 <host> 221 <uuid>44454c4c-3400-105a-8033-b3c04f4b344a</uuid> 222 <cpu> 223 <arch>x86_64</arch> 224 <model>Nehalem</model> 225 <vendor>Intel</vendor> 226 <microcode version='25'/> 227 <topology sockets='1' cores='4' threads='2'/> 228 <feature name='vme'/> 229 <feature name='ds'/> 230 <feature name='acpi'/> 231 <pages unit='KiB' size='4'/> 232 <pages unit='KiB' size='2048'/> 233 </cpu> 234 <power_management> 235 <suspend_mem/> 236 <suspend_disk/> 237 <suspend_hybrid/> 238 </power_management> 239 <migration_features> 240 <live/> 241 <uri_transports> 242 <uri_transport>tcp</uri_transport> 243 <uri_transport>rdma</uri_transport> 244 </uri_transports> 245 </migration_features> 246 <topology> 247 <cells num='1'> 248 <cell id='0'> 249 <memory unit='KiB'>12367120</memory> 250 <pages unit='KiB' size='4'>3091780</pages> 251 <pages unit='KiB' size='2048'>0</pages> 252 <distances> 253 <sibling id='0' value='10'/> 254 </distances> 255 <cpus num='8'> 256 <cpu id='0' socket_id='0' core_id='0' siblings='0,4'/> 257 <cpu id='1' socket_id='0' core_id='1' siblings='1,5'/> 258 <cpu id='2' socket_id='0' core_id='2' siblings='2,6'/> 259 <cpu id='3' socket_id='0' core_id='3' siblings='3,7'/> 260 <cpu id='4' socket_id='0' core_id='0' siblings='0,4'/> 261 <cpu id='5' socket_id='0' core_id='1' siblings='1,5'/> 262 <cpu id='6' socket_id='0' core_id='2' siblings='2,6'/> 263 <cpu id='7' socket_id='0' core_id='3' siblings='3,7'/> 264 </cpus> 265 </cell> 266 </cells> 267 </topology> 268 <cache> 269 <bank id='0' level='3' type='both' size='8' unit='MiB' cpus='0-7'/> 270 </cache> 271 <secmodel> 272 <model>apparmor</model> 273 <doi>0</doi> 274 </secmodel> 275 <secmodel> 276 <model>dac</model> 277 <doi>0</doi> 278 <baselabel type='kvm'>+487:+486</baselabel> 279 <baselabel type='qemu'>+487:+486</baselabel> 280 </secmodel> 281 </host> 282 283 <guest> 284 <os_type>hvm</os_type> 285 <arch name='i686'> 286 <wordsize>32</wordsize> 287 <emulator>/usr/bin/qemu-system-i386</emulator> 288 <machine maxCpus='255'>pc-i440fx-2.6</machine> 289 <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine> 290 <machine maxCpus='255'>pc-0.12</machine> 291 <domain type='qemu'/> 292 <domain type='kvm'> 293 <emulator>/usr/bin/qemu-kvm</emulator> 294 <machine maxCpus='255'>pc-i440fx-2.6</machine> 295 <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine> 296 <machine maxCpus='255'>pc-0.12</machine> 297 </domain> 298 </arch> 299 <features> 300 <cpuselection/> 301 <deviceboot/> 302 <disksnapshot default='on' toggle='no'/> 303 <acpi default='on' toggle='yes'/> 304 <apic default='on' toggle='no'/> 305 <pae/> 306 <nonpae/> 307 </features> 308 </guest> 309 310 <guest> 311 <os_type>hvm</os_type> 312 <arch name='x86_64'> 313 <wordsize>64</wordsize> 314 <emulator>/usr/bin/qemu-system-x86_64</emulator> 315 <machine maxCpus='255'>pc-i440fx-2.6</machine> 316 <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine> 317 <machine maxCpus='255'>pc-0.12</machine> 318 <domain type='qemu'/> 319 <domain type='kvm'> 320 <emulator>/usr/bin/qemu-kvm</emulator> 321 <machine maxCpus='255'>pc-i440fx-2.6</machine> 322 <machine canonical='pc-i440fx-2.6' maxCpus='255'>pc</machine> 323 <machine maxCpus='255'>pc-0.12</machine> 324 </domain> 325 </arch> 326 <features> 327 <cpuselection/> 328 <deviceboot/> 329 <disksnapshot default='on' toggle='no'/> 330 <acpi default='on' toggle='yes'/> 331 <apic default='on' toggle='no'/> 332 </features> 333 </guest> 334 335</capabilities>""" 336 337 return _make_capabilities 338 339 340@pytest.fixture 341def make_mock_network(): 342 def _make_mock_net(xml_def): 343 mocked_conn = virt.libvirt.openAuth.return_value 344 345 doc = ET.fromstring(xml_def) 346 name = doc.find("name").text 347 348 if not isinstance(mocked_conn.networkLookupByName, MappedResultMock): 349 mocked_conn.networkLookupByName = MappedResultMock() 350 mocked_conn.networkLookupByName.add(name) 351 net_mock = mocked_conn.networkLookupByName(name) 352 net_mock.XMLDesc.return_value = xml_def 353 354 # libvirt defaults the autostart to unset 355 net_mock.autostart.return_value = 0 356 357 # Append the network to listAllNetworks return value 358 all_nets = mocked_conn.listAllNetworks.return_value 359 if not isinstance(all_nets, list): 360 all_nets = [] 361 all_nets.append(net_mock) 362 mocked_conn.listAllNetworks.return_value = all_nets 363 364 return net_mock 365 366 return _make_mock_net 367 368 369@pytest.fixture 370def make_mock_device(): 371 """ 372 Create a mock host device 373 """ 374 375 def _make_mock_device(xml_def): 376 mocked_conn = virt.libvirt.openAuth.return_value 377 if not isinstance(mocked_conn.nodeDeviceLookupByName, MappedResultMock): 378 mocked_conn.nodeDeviceLookupByName = MappedResultMock() 379 380 doc = ET.fromstring(xml_def) 381 name = doc.find("./name").text 382 383 mocked_conn.nodeDeviceLookupByName.add(name) 384 mocked_device = mocked_conn.nodeDeviceLookupByName(name) 385 mocked_device.name.return_value = name 386 mocked_device.XMLDesc.return_value = xml_def 387 mocked_device.listCaps.return_value = [ 388 cap.get("type") for cap in doc.findall("./capability") 389 ] 390 return mocked_device 391 392 return _make_mock_device 393 394 395@pytest.fixture(params=[True, False], ids=["test", "notest"]) 396def test(request): 397 """ 398 Run the test with both True and False test values 399 """ 400 return request.param 401