1# Copyright (c) 2014 VMware, Inc. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16""" 17Test suite for VMware VMDK driver volumeops module. 18""" 19 20import ddt 21import mock 22from oslo_utils import units 23from oslo_vmware import exceptions 24from oslo_vmware import vim_util 25 26from cinder import test 27from cinder.volume.drivers.vmware import exceptions as vmdk_exceptions 28from cinder.volume.drivers.vmware import volumeops 29 30 31@ddt.ddt 32class VolumeOpsTestCase(test.TestCase): 33 """Unit tests for volumeops module.""" 34 35 MAX_OBJECTS = 100 36 37 def setUp(self): 38 super(VolumeOpsTestCase, self).setUp() 39 self.session = mock.MagicMock() 40 self.vops = volumeops.VMwareVolumeOps( 41 self.session, self.MAX_OBJECTS, mock.sentinel.extension_key, 42 mock.sentinel.extension_type) 43 44 def test_split_datastore_path(self): 45 test1 = '[datastore1] myfolder/mysubfolder/myvm.vmx' 46 (datastore, folder, file_name) = volumeops.split_datastore_path(test1) 47 self.assertEqual('datastore1', datastore) 48 self.assertEqual('myfolder/mysubfolder/', folder) 49 self.assertEqual('myvm.vmx', file_name) 50 51 test2 = '[datastore2 ] myfolder/myvm.vmdk' 52 (datastore, folder, file_name) = volumeops.split_datastore_path(test2) 53 self.assertEqual('datastore2', datastore) 54 self.assertEqual('myfolder/', folder) 55 self.assertEqual('myvm.vmdk', file_name) 56 57 test3 = 'myfolder/myvm.vmdk' 58 self.assertRaises(IndexError, volumeops.split_datastore_path, test3) 59 60 def vm(self, val): 61 """Create a mock vm in retrieve result format.""" 62 vm = mock.MagicMock() 63 prop = mock.Mock(spec=object) 64 prop.val = val 65 vm.propSet = [prop] 66 return vm 67 68 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 69 'get_backing_by_uuid') 70 def test_get_backing(self, get_backing_by_uuid): 71 ref = mock.sentinel.ref 72 get_backing_by_uuid.return_value = ref 73 74 name = mock.sentinel.name 75 backing_uuid = mock.sentinel.backing_uuid 76 ret = self.vops.get_backing(name, backing_uuid) 77 78 self.assertEqual(ref, ret) 79 get_backing_by_uuid.assert_called_once_with(backing_uuid) 80 81 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 82 'get_backing_by_uuid') 83 def test_get_backing_legacy(self, get_backing_by_uuid): 84 ref = mock.sentinel.ref 85 get_backing_by_uuid.return_value = None 86 name = mock.sentinel.name 87 self.vops._backing_ref_cache[name] = ref 88 89 backing_uuid = mock.sentinel.backing_uuid 90 ret = self.vops.get_backing(name, backing_uuid) 91 92 self.assertEqual(ref, ret) 93 get_backing_by_uuid.assert_called_once_with(backing_uuid) 94 95 def test_get_backing_by_uuid(self): 96 backing = mock.sentinel.backing 97 self.session.invoke_api.return_value = [backing] 98 99 uuid = mock.sentinel.uuid 100 self.assertEqual(backing, self.vops.get_backing_by_uuid(uuid)) 101 self.session.invoke_api.assert_called_once_with( 102 self.session.vim, 103 'FindAllByUuid', 104 self.session.vim.service_content.searchIndex, 105 uuid=uuid, 106 vmSearch=True, 107 instanceUuid=True) 108 109 def _create_property(self, name, val): 110 prop = mock.Mock() 111 prop.name = name 112 prop.val = val 113 return prop 114 115 def _create_backing_obj(self, name, ref, instance_uuid=None, vol_id=None): 116 name_prop = self._create_property('name', name) 117 instance_uuid_prop = self._create_property('config.instanceUuid', 118 instance_uuid) 119 vol_id_val = mock.Mock(value=vol_id) 120 vol_id_prop = self._create_property( 121 'config.extraConfig["cinder.volume.id"]', vol_id_val) 122 123 backing = mock.Mock() 124 backing.obj = ref 125 backing.propSet = [name_prop, instance_uuid_prop, vol_id_prop] 126 return backing 127 128 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 129 'continue_retrieval', return_value=None) 130 def test_build_backing_ref_cache(self, continue_retrieval): 131 uuid1 = 'd68cbee0-c1f7-4886-98a4-cf2201461c6e' 132 ref1 = mock.sentinel.ref1 133 non_vol_backing = self._create_backing_obj( 134 'foo', ref1, instance_uuid=uuid1) 135 136 uuid2 = 'f36f0e87-97e0-4a1c-b788-2f84f1376960' 137 ref2 = mock.sentinel.ref2 138 legacy_vol_backing = self._create_backing_obj( 139 'volume-f36f0e87-97e0-4a1c-b788-2f84f1376960', ref2, 140 instance_uuid=uuid2) 141 142 uuid3 = '405d6afd-43be-4ce0-9e5f-fd49559e2763' 143 ref3 = mock.sentinel.ref3 144 vol_backing = self._create_backing_obj( 145 'volume-405d6afd-43be-4ce0-9e5f-fd49559e2763', ref3, 146 instance_uuid=uuid3, vol_id=uuid3) 147 148 result = mock.Mock(objects=[ 149 non_vol_backing, legacy_vol_backing, vol_backing]) 150 self.session.invoke_api.return_value = result 151 152 self.vops.build_backing_ref_cache() 153 exp_cache = {'foo': ref1, 154 'volume-f36f0e87-97e0-4a1c-b788-2f84f1376960': ref2} 155 self.assertEqual(exp_cache, self.vops._backing_ref_cache) 156 self.session.invoke_api.assert_called_once_with( 157 vim_util, 158 'get_objects', 159 self.session.vim, 160 'VirtualMachine', 161 self.MAX_OBJECTS, 162 properties_to_collect=[ 163 'name', 164 'config.instanceUuid', 165 'config.extraConfig["cinder.volume.id"]']) 166 continue_retrieval.assert_called_once_with(result) 167 168 def test_delete_backing(self): 169 backing = mock.sentinel.backing 170 task = mock.sentinel.task 171 self.session.invoke_api.return_value = task 172 self.vops.delete_backing(backing) 173 self.session.invoke_api.assert_called_once_with(self.session.vim, 174 "Destroy_Task", 175 backing) 176 self.session.wait_for_task(task) 177 178 def test_get_host(self): 179 instance = mock.sentinel.instance 180 host = mock.sentinel.host 181 self.session.invoke_api.return_value = host 182 result = self.vops.get_host(instance) 183 self.assertEqual(host, result) 184 self.session.invoke_api.assert_called_once_with(vim_util, 185 'get_object_property', 186 self.session.vim, 187 instance, 188 'runtime.host') 189 190 def _host_runtime_info( 191 self, connection_state='connected', in_maintenance=False): 192 return mock.Mock(connectionState=connection_state, 193 inMaintenanceMode=in_maintenance) 194 195 def test_get_hosts(self): 196 hosts = mock.sentinel.hosts 197 self.session.invoke_api.return_value = hosts 198 result = self.vops.get_hosts() 199 self.assertEqual(hosts, result) 200 self.session.invoke_api.assert_called_once_with(vim_util, 201 'get_objects', 202 self.session.vim, 203 'HostSystem', 204 self.MAX_OBJECTS) 205 206 def test_continue_retrieval(self): 207 retrieve_result = mock.sentinel.retrieve_result 208 self.session.invoke_api.return_value = retrieve_result 209 result = self.vops.continue_retrieval(retrieve_result) 210 self.assertEqual(retrieve_result, result) 211 self.session.invoke_api.assert_called_once_with(vim_util, 212 'continue_retrieval', 213 self.session.vim, 214 retrieve_result) 215 216 def test_cancel_retrieval(self): 217 retrieve_result = mock.sentinel.retrieve_result 218 self.session.invoke_api.return_value = retrieve_result 219 result = self.vops.cancel_retrieval(retrieve_result) 220 self.assertIsNone(result) 221 self.session.invoke_api.assert_called_once_with(vim_util, 222 'cancel_retrieval', 223 self.session.vim, 224 retrieve_result) 225 226 def test_is_usable(self): 227 mount_info = mock.Mock(spec=object) 228 mount_info.accessMode = "readWrite" 229 mount_info.mounted = True 230 mount_info.accessible = True 231 self.assertTrue(self.vops._is_usable(mount_info)) 232 233 del mount_info.mounted 234 self.assertTrue(self.vops._is_usable(mount_info)) 235 236 mount_info.accessMode = "readonly" 237 self.assertFalse(self.vops._is_usable(mount_info)) 238 239 mount_info.accessMode = "readWrite" 240 mount_info.mounted = False 241 self.assertFalse(self.vops._is_usable(mount_info)) 242 243 mount_info.mounted = True 244 mount_info.accessible = False 245 self.assertFalse(self.vops._is_usable(mount_info)) 246 247 del mount_info.accessible 248 self.assertFalse(self.vops._is_usable(mount_info)) 249 250 def _create_host_mounts(self, access_mode, host, set_accessible=True, 251 is_accessible=True, mounted=True): 252 """Create host mount value of datastore with single mount info. 253 254 :param access_mode: string specifying the read/write permission 255 :param set_accessible: specify whether accessible property 256 should be set 257 :param is_accessible: boolean specifying whether the datastore 258 is accessible to host 259 :param host: managed object reference of the connected 260 host 261 :return: list of host mount info 262 """ 263 mntInfo = mock.Mock(spec=object) 264 mntInfo.accessMode = access_mode 265 if set_accessible: 266 mntInfo.accessible = is_accessible 267 else: 268 del mntInfo.accessible 269 mntInfo.mounted = mounted 270 271 host_mount = mock.Mock(spec=object) 272 host_mount.key = host 273 host_mount.mountInfo = mntInfo 274 host_mounts = mock.Mock(spec=object) 275 host_mounts.DatastoreHostMount = [host_mount] 276 277 return host_mounts 278 279 def test_get_connected_hosts(self): 280 with mock.patch.object(self.vops, 'get_summary') as get_summary: 281 datastore = mock.sentinel.datastore 282 summary = mock.Mock(spec=object) 283 get_summary.return_value = summary 284 285 summary.accessible = False 286 hosts = self.vops.get_connected_hosts(datastore) 287 self.assertEqual([], hosts) 288 289 summary.accessible = True 290 host = mock.Mock(spec=object) 291 host.value = mock.sentinel.host 292 host_mounts = self._create_host_mounts("readWrite", host) 293 self.session.invoke_api.return_value = host_mounts 294 hosts = self.vops.get_connected_hosts(datastore) 295 self.assertEqual([mock.sentinel.host], hosts) 296 self.session.invoke_api.assert_called_once_with( 297 vim_util, 298 'get_object_property', 299 self.session.vim, 300 datastore, 301 'host') 302 303 del host_mounts.DatastoreHostMount 304 hosts = self.vops.get_connected_hosts(datastore) 305 self.assertEqual([], hosts) 306 307 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 308 'get_connected_hosts') 309 def test_is_datastore_accessible(self, get_connected_hosts): 310 host_1 = mock.sentinel.host_1 311 host_2 = mock.sentinel.host_2 312 get_connected_hosts.return_value = [host_1, host_2] 313 314 ds = mock.sentinel.datastore 315 host = mock.Mock(value=mock.sentinel.host_1) 316 self.assertTrue(self.vops.is_datastore_accessible(ds, host)) 317 get_connected_hosts.assert_called_once_with(ds) 318 319 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 320 'get_connected_hosts') 321 def test_is_datastore_accessible_with_inaccessible(self, 322 get_connected_hosts): 323 host_1 = mock.sentinel.host_1 324 get_connected_hosts.return_value = [host_1] 325 326 ds = mock.sentinel.datastore 327 host = mock.Mock(value=mock.sentinel.host_2) 328 self.assertFalse(self.vops.is_datastore_accessible(ds, host)) 329 get_connected_hosts.assert_called_once_with(ds) 330 331 def test_get_parent(self): 332 # Not recursive 333 child = mock.Mock(spec=object) 334 child._type = 'Parent' 335 ret = self.vops._get_parent(child, 'Parent') 336 self.assertEqual(child, ret) 337 338 # Recursive 339 parent = mock.Mock(spec=object) 340 parent._type = 'Parent' 341 child = mock.Mock(spec=object) 342 child._type = 'Child' 343 self.session.invoke_api.return_value = parent 344 ret = self.vops._get_parent(child, 'Parent') 345 self.assertEqual(parent, ret) 346 self.session.invoke_api.assert_called_with(vim_util, 347 'get_object_property', 348 self.session.vim, child, 349 'parent') 350 351 def test_get_dc(self): 352 # set up hierarchy of objects 353 dc = mock.Mock(spec=object) 354 dc._type = 'Datacenter' 355 o1 = mock.Mock(spec=object) 356 o1._type = 'mockType1' 357 o1.parent = dc 358 o2 = mock.Mock(spec=object) 359 o2._type = 'mockType2' 360 o2.parent = o1 361 362 # mock out invoke_api behaviour to fetch parent 363 def mock_invoke_api(vim_util, method, vim, the_object, arg): 364 return the_object.parent 365 366 self.session.invoke_api.side_effect = mock_invoke_api 367 ret = self.vops.get_dc(o2) 368 self.assertEqual(dc, ret) 369 370 # Clear side effects. 371 self.session.invoke_api.side_effect = None 372 373 def test_get_vmfolder(self): 374 self.session.invoke_api.return_value = mock.sentinel.ret 375 ret = self.vops.get_vmfolder(mock.sentinel.dc) 376 self.assertEqual(mock.sentinel.ret, ret) 377 self.session.invoke_api.assert_called_once_with(vim_util, 378 'get_object_property', 379 self.session.vim, 380 mock.sentinel.dc, 381 'vmFolder') 382 383 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 384 'get_entity_name') 385 def test_get_child_folder(self, get_entity_name): 386 child_entity_1 = mock.Mock(_type='Folder') 387 child_entity_2 = mock.Mock(_type='foo') 388 child_entity_3 = mock.Mock(_type='Folder') 389 390 prop_val = mock.Mock(ManagedObjectReference=[child_entity_1, 391 child_entity_2, 392 child_entity_3]) 393 self.session.invoke_api.return_value = prop_val 394 get_entity_name.side_effect = ['bar', '%2fcinder-volumes'] 395 396 parent_folder = mock.sentinel.parent_folder 397 child_name = '/cinder-volumes' 398 ret = self.vops._get_child_folder(parent_folder, child_name) 399 400 self.assertEqual(child_entity_3, ret) 401 self.session.invoke_api.assert_called_once_with( 402 vim_util, 'get_object_property', self.session.vim, parent_folder, 403 'childEntity') 404 get_entity_name.assert_has_calls([mock.call(child_entity_1), 405 mock.call(child_entity_3)]) 406 407 def test_create_folder(self): 408 folder = mock.sentinel.folder 409 self.session.invoke_api.return_value = folder 410 411 parent_folder = mock.sentinel.parent_folder 412 child_folder_name = mock.sentinel.child_folder_name 413 ret = self.vops.create_folder(parent_folder, child_folder_name) 414 415 self.assertEqual(folder, ret) 416 self.session.invoke_api.assert_called_once_with( 417 self.session.vim, 'CreateFolder', parent_folder, 418 name=child_folder_name) 419 420 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 421 '_get_child_folder') 422 def test_create_folder_with_duplicate_name(self, get_child_folder): 423 self.session.invoke_api.side_effect = exceptions.DuplicateName 424 425 folder = mock.sentinel.folder 426 get_child_folder.return_value = folder 427 428 parent_folder = mock.sentinel.parent_folder 429 child_folder_name = mock.sentinel.child_folder_name 430 ret = self.vops.create_folder(parent_folder, child_folder_name) 431 432 self.assertEqual(folder, ret) 433 self.session.invoke_api.assert_called_once_with( 434 self.session.vim, 'CreateFolder', parent_folder, 435 name=child_folder_name) 436 get_child_folder.assert_called_once_with(parent_folder, 437 child_folder_name) 438 439 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 440 'get_vmfolder') 441 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 442 'create_folder') 443 def test_create_vm_inventory_folder(self, create_folder, get_vmfolder): 444 vm_folder_1 = mock.sentinel.vm_folder_1 445 get_vmfolder.return_value = vm_folder_1 446 447 folder_1a = mock.sentinel.folder_1a 448 folder_1b = mock.sentinel.folder_1b 449 create_folder.side_effect = [folder_1a, folder_1b] 450 451 datacenter_1 = mock.Mock(value='dc-1') 452 path_comp = ['a', 'b'] 453 ret = self.vops.create_vm_inventory_folder(datacenter_1, path_comp) 454 455 self.assertEqual(folder_1b, ret) 456 get_vmfolder.assert_called_once_with(datacenter_1) 457 exp_calls = [mock.call(vm_folder_1, 'a'), mock.call(folder_1a, 'b')] 458 self.assertEqual(exp_calls, create_folder.call_args_list) 459 exp_cache = {'/dc-1': vm_folder_1, 460 '/dc-1/a': folder_1a, 461 '/dc-1/a/b': folder_1b} 462 self.assertEqual(exp_cache, self.vops._folder_cache) 463 464 # Test cache 465 get_vmfolder.reset_mock() 466 create_folder.reset_mock() 467 468 folder_1c = mock.sentinel.folder_1c 469 create_folder.side_effect = [folder_1c] 470 471 path_comp = ['a', 'c'] 472 ret = self.vops.create_vm_inventory_folder(datacenter_1, path_comp) 473 474 self.assertEqual(folder_1c, ret) 475 self.assertFalse(get_vmfolder.called) 476 exp_calls = [mock.call(folder_1a, 'c')] 477 self.assertEqual(exp_calls, create_folder.call_args_list) 478 exp_cache = {'/dc-1': vm_folder_1, 479 '/dc-1/a': folder_1a, 480 '/dc-1/a/b': folder_1b, 481 '/dc-1/a/c': folder_1c} 482 self.assertEqual(exp_cache, self.vops._folder_cache) 483 484 # Test cache with different datacenter 485 get_vmfolder.reset_mock() 486 create_folder.reset_mock() 487 488 vm_folder_2 = mock.sentinel.vm_folder_2 489 get_vmfolder.return_value = vm_folder_2 490 491 folder_2a = mock.sentinel.folder_2a 492 folder_2b = mock.sentinel.folder_2b 493 create_folder.side_effect = [folder_2a, folder_2b] 494 495 datacenter_2 = mock.Mock(value='dc-2') 496 path_comp = ['a', 'b'] 497 ret = self.vops.create_vm_inventory_folder(datacenter_2, path_comp) 498 499 self.assertEqual(folder_2b, ret) 500 get_vmfolder.assert_called_once_with(datacenter_2) 501 exp_calls = [mock.call(vm_folder_2, 'a'), mock.call(folder_2a, 'b')] 502 self.assertEqual(exp_calls, create_folder.call_args_list) 503 exp_cache = {'/dc-1': vm_folder_1, 504 '/dc-1/a': folder_1a, 505 '/dc-1/a/b': folder_1b, 506 '/dc-1/a/c': folder_1c, 507 '/dc-2': vm_folder_2, 508 '/dc-2/a': folder_2a, 509 '/dc-2/a/b': folder_2b 510 } 511 self.assertEqual(exp_cache, self.vops._folder_cache) 512 513 def test_create_disk_backing_thin(self): 514 backing = mock.Mock() 515 del backing.eagerlyScrub 516 cf = self.session.vim.client.factory 517 cf.create.return_value = backing 518 519 disk_type = 'thin' 520 ret = self.vops._create_disk_backing(disk_type, None) 521 522 self.assertEqual(backing, ret) 523 self.assertIsInstance(ret.thinProvisioned, bool) 524 self.assertTrue(ret.thinProvisioned) 525 self.assertEqual('', ret.fileName) 526 self.assertEqual('persistent', ret.diskMode) 527 528 def test_create_disk_backing_thick(self): 529 backing = mock.Mock() 530 del backing.eagerlyScrub 531 del backing.thinProvisioned 532 cf = self.session.vim.client.factory 533 cf.create.return_value = backing 534 535 disk_type = 'thick' 536 ret = self.vops._create_disk_backing(disk_type, None) 537 538 self.assertEqual(backing, ret) 539 self.assertEqual('', ret.fileName) 540 self.assertEqual('persistent', ret.diskMode) 541 542 def test_create_disk_backing_eager_zeroed_thick(self): 543 backing = mock.Mock() 544 del backing.thinProvisioned 545 cf = self.session.vim.client.factory 546 cf.create.return_value = backing 547 548 disk_type = 'eagerZeroedThick' 549 ret = self.vops._create_disk_backing(disk_type, None) 550 551 self.assertEqual(backing, ret) 552 self.assertIsInstance(ret.eagerlyScrub, bool) 553 self.assertTrue(ret.eagerlyScrub) 554 self.assertEqual('', ret.fileName) 555 self.assertEqual('persistent', ret.diskMode) 556 557 def test_create_virtual_disk_config_spec(self): 558 559 cf = self.session.vim.client.factory 560 cf.create.side_effect = lambda *args: mock.Mock() 561 562 size_kb = units.Ki 563 controller_key = 200 564 disk_type = 'thick' 565 profile_id = mock.sentinel.profile_id 566 spec = self.vops._create_virtual_disk_config_spec(size_kb, 567 disk_type, 568 controller_key, 569 profile_id, 570 None) 571 572 cf.create.side_effect = None 573 self.assertEqual('add', spec.operation) 574 self.assertEqual('create', spec.fileOperation) 575 device = spec.device 576 self.assertEqual(size_kb, device.capacityInKB) 577 self.assertEqual(-101, device.key) 578 self.assertEqual(0, device.unitNumber) 579 self.assertEqual(controller_key, device.controllerKey) 580 backing = device.backing 581 self.assertEqual('', backing.fileName) 582 self.assertEqual('persistent', backing.diskMode) 583 disk_profiles = spec.profile 584 self.assertEqual(1, len(disk_profiles)) 585 self.assertEqual(profile_id, disk_profiles[0].profileId) 586 587 def test_create_specs_for_ide_disk_add(self): 588 factory = self.session.vim.client.factory 589 factory.create.side_effect = lambda *args: mock.Mock() 590 591 size_kb = 1 592 disk_type = 'thin' 593 adapter_type = 'ide' 594 profile_id = mock.sentinel.profile_id 595 ret = self.vops._create_specs_for_disk_add(size_kb, disk_type, 596 adapter_type, profile_id) 597 598 factory.create.side_effect = None 599 self.assertEqual(1, len(ret)) 600 self.assertEqual(units.Ki, ret[0].device.capacityInKB) 601 self.assertEqual(200, ret[0].device.controllerKey) 602 expected = [mock.call.create('ns0:VirtualDeviceConfigSpec'), 603 mock.call.create('ns0:VirtualDisk'), 604 mock.call.create('ns0:VirtualDiskFlatVer2BackingInfo')] 605 factory.create.assert_has_calls(expected, any_order=True) 606 607 def test_create_specs_for_scsi_disk_add(self): 608 factory = self.session.vim.client.factory 609 factory.create.side_effect = lambda *args: mock.Mock() 610 611 size_kb = 2 * units.Ki 612 disk_type = 'thin' 613 adapter_type = 'lsiLogicsas' 614 profile_id = mock.sentinel.profile_id 615 ret = self.vops._create_specs_for_disk_add(size_kb, disk_type, 616 adapter_type, profile_id) 617 618 factory.create.side_effect = None 619 self.assertEqual(2, len(ret)) 620 self.assertEqual('noSharing', ret[1].device.sharedBus) 621 self.assertEqual(size_kb, ret[0].device.capacityInKB) 622 expected = [mock.call.create('ns0:VirtualLsiLogicSASController'), 623 mock.call.create('ns0:VirtualDeviceConfigSpec'), 624 mock.call.create('ns0:VirtualDisk'), 625 mock.call.create('ns0:VirtualDiskFlatVer2BackingInfo'), 626 mock.call.create('ns0:VirtualDeviceConfigSpec')] 627 factory.create.assert_has_calls(expected, any_order=True) 628 629 def test_get_create_spec_disk_less(self): 630 factory = self.session.vim.client.factory 631 factory.create.side_effect = lambda *args: mock.Mock() 632 633 name = mock.sentinel.name 634 ds_name = mock.sentinel.ds_name 635 profile_id = mock.sentinel.profile_id 636 option_key = mock.sentinel.key 637 option_value = mock.sentinel.value 638 extra_config = {option_key: option_value, 639 volumeops.BACKING_UUID_KEY: mock.sentinel.uuid} 640 ret = self.vops._get_create_spec_disk_less(name, ds_name, profile_id, 641 extra_config) 642 643 factory.create.side_effect = None 644 self.assertEqual(name, ret.name) 645 self.assertEqual(mock.sentinel.uuid, ret.instanceUuid) 646 self.assertEqual('[%s]' % ds_name, ret.files.vmPathName) 647 self.assertEqual("vmx-08", ret.version) 648 self.assertEqual(profile_id, ret.vmProfile[0].profileId) 649 self.assertEqual(1, len(ret.extraConfig)) 650 self.assertEqual(option_key, ret.extraConfig[0].key) 651 self.assertEqual(option_value, ret.extraConfig[0].value) 652 self.assertEqual(mock.sentinel.extension_key, 653 ret.managedBy.extensionKey) 654 self.assertEqual(mock.sentinel.extension_type, ret.managedBy.type) 655 expected = [mock.call.create('ns0:VirtualMachineFileInfo'), 656 mock.call.create('ns0:VirtualMachineConfigSpec'), 657 mock.call.create('ns0:VirtualMachineDefinedProfileSpec'), 658 mock.call.create('ns0:OptionValue'), 659 mock.call.create('ns0:ManagedByInfo')] 660 factory.create.assert_has_calls(expected, any_order=True) 661 662 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 663 '_get_create_spec_disk_less') 664 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 665 '_create_specs_for_disk_add') 666 def test_get_create_spec(self, create_specs_for_disk_add, 667 get_create_spec_disk_less): 668 name = 'vol-1' 669 size_kb = 1024 670 disk_type = 'thin' 671 ds_name = 'nfs-1' 672 profile_id = mock.sentinel.profile_id 673 adapter_type = 'busLogic' 674 extra_config = mock.sentinel.extra_config 675 676 self.vops.get_create_spec(name, size_kb, disk_type, ds_name, 677 profile_id, adapter_type, extra_config) 678 679 get_create_spec_disk_less.assert_called_once_with( 680 name, ds_name, profileId=profile_id, extra_config=extra_config) 681 create_specs_for_disk_add.assert_called_once_with( 682 size_kb, disk_type, adapter_type, profile_id) 683 684 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 685 'get_create_spec') 686 def test_create_backing(self, get_create_spec): 687 create_spec = mock.sentinel.create_spec 688 get_create_spec.return_value = create_spec 689 task = mock.sentinel.task 690 self.session.invoke_api.return_value = task 691 task_info = mock.Mock(spec=object) 692 task_info.result = mock.sentinel.result 693 self.session.wait_for_task.return_value = task_info 694 name = 'backing_name' 695 size_kb = mock.sentinel.size_kb 696 disk_type = mock.sentinel.disk_type 697 adapter_type = mock.sentinel.adapter_type 698 folder = mock.sentinel.folder 699 resource_pool = mock.sentinel.resource_pool 700 host = mock.sentinel.host 701 ds_name = mock.sentinel.ds_name 702 profile_id = mock.sentinel.profile_id 703 extra_config = mock.sentinel.extra_config 704 ret = self.vops.create_backing(name, size_kb, disk_type, folder, 705 resource_pool, host, ds_name, 706 profile_id, adapter_type, extra_config) 707 self.assertEqual(mock.sentinel.result, ret) 708 get_create_spec.assert_called_once_with( 709 name, size_kb, disk_type, ds_name, profile_id=profile_id, 710 adapter_type=adapter_type, extra_config=extra_config) 711 self.session.invoke_api.assert_called_once_with(self.session.vim, 712 'CreateVM_Task', 713 folder, 714 config=create_spec, 715 pool=resource_pool, 716 host=host) 717 self.session.wait_for_task.assert_called_once_with(task) 718 719 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 720 '_get_create_spec_disk_less') 721 def test_create_backing_disk_less(self, get_create_spec_disk_less): 722 create_spec = mock.sentinel.create_spec 723 get_create_spec_disk_less.return_value = create_spec 724 task = mock.sentinel.task 725 self.session.invoke_api.return_value = task 726 task_info = mock.Mock(spec=object) 727 task_info.result = mock.sentinel.result 728 self.session.wait_for_task.return_value = task_info 729 name = 'backing_name' 730 folder = mock.sentinel.folder 731 resource_pool = mock.sentinel.resource_pool 732 host = mock.sentinel.host 733 ds_name = mock.sentinel.ds_name 734 profile_id = mock.sentinel.profile_id 735 extra_config = mock.sentinel.extra_config 736 ret = self.vops.create_backing_disk_less(name, folder, resource_pool, 737 host, ds_name, profile_id, 738 extra_config) 739 740 self.assertEqual(mock.sentinel.result, ret) 741 get_create_spec_disk_less.assert_called_once_with( 742 name, ds_name, profileId=profile_id, extra_config=extra_config) 743 self.session.invoke_api.assert_called_once_with(self.session.vim, 744 'CreateVM_Task', 745 folder, 746 config=create_spec, 747 pool=resource_pool, 748 host=host) 749 self.session.wait_for_task.assert_called_once_with(task) 750 751 def test_get_datastore(self): 752 backing = mock.sentinel.backing 753 datastore = mock.Mock(spec=object) 754 datastore.ManagedObjectReference = [mock.sentinel.ds] 755 self.session.invoke_api.return_value = datastore 756 ret = self.vops.get_datastore(backing) 757 self.assertEqual(mock.sentinel.ds, ret) 758 self.session.invoke_api.assert_called_once_with(vim_util, 759 'get_object_property', 760 self.session.vim, 761 backing, 'datastore') 762 763 def test_get_summary(self): 764 datastore = mock.sentinel.datastore 765 summary = mock.sentinel.summary 766 self.session.invoke_api.return_value = summary 767 ret = self.vops.get_summary(datastore) 768 self.assertEqual(summary, ret) 769 self.session.invoke_api.assert_called_once_with(vim_util, 770 'get_object_property', 771 self.session.vim, 772 datastore, 773 'summary') 774 775 def test_get_relocate_spec(self): 776 777 delete_disk_attribute = True 778 779 def _create_side_effect(type): 780 obj = mock.Mock() 781 if type == "ns0:VirtualDiskFlatVer2BackingInfo": 782 del obj.eagerlyScrub 783 elif (type == "ns0:VirtualMachineRelocateSpec" and 784 delete_disk_attribute): 785 del obj.disk 786 else: 787 pass 788 return obj 789 790 factory = self.session.vim.client.factory 791 factory.create.side_effect = _create_side_effect 792 793 datastore = mock.sentinel.datastore 794 resource_pool = mock.sentinel.resource_pool 795 host = mock.sentinel.host 796 disk_move_type = mock.sentinel.disk_move_type 797 ret = self.vops._get_relocate_spec(datastore, resource_pool, host, 798 disk_move_type) 799 800 self.assertEqual(datastore, ret.datastore) 801 self.assertEqual(resource_pool, ret.pool) 802 self.assertEqual(host, ret.host) 803 self.assertEqual(disk_move_type, ret.diskMoveType) 804 805 # Test with disk locator. 806 delete_disk_attribute = False 807 disk_type = 'thin' 808 disk_device = mock.Mock() 809 ret = self.vops._get_relocate_spec(datastore, resource_pool, host, 810 disk_move_type, disk_type, 811 disk_device) 812 813 factory.create.side_effect = None 814 self.assertEqual(datastore, ret.datastore) 815 self.assertEqual(resource_pool, ret.pool) 816 self.assertEqual(host, ret.host) 817 self.assertEqual(disk_move_type, ret.diskMoveType) 818 self.assertIsInstance(ret.disk, list) 819 self.assertEqual(1, len(ret.disk)) 820 disk_locator = ret.disk[0] 821 self.assertEqual(datastore, disk_locator.datastore) 822 self.assertEqual(disk_device.key, disk_locator.diskId) 823 backing = disk_locator.diskBackingInfo 824 self.assertIsInstance(backing.thinProvisioned, bool) 825 self.assertTrue(backing.thinProvisioned) 826 self.assertEqual('', backing.fileName) 827 self.assertEqual('persistent', backing.diskMode) 828 829 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 830 '_get_disk_device') 831 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 832 '_get_relocate_spec') 833 def test_relocate_backing(self, get_relocate_spec, get_disk_device): 834 disk_device = mock.sentinel.disk_device 835 get_disk_device.return_value = disk_device 836 837 spec = mock.sentinel.relocate_spec 838 get_relocate_spec.return_value = spec 839 840 task = mock.sentinel.task 841 self.session.invoke_api.return_value = task 842 843 backing = mock.sentinel.backing 844 datastore = mock.sentinel.datastore 845 resource_pool = mock.sentinel.resource_pool 846 host = mock.sentinel.host 847 disk_type = mock.sentinel.disk_type 848 self.vops.relocate_backing(backing, datastore, resource_pool, host, 849 disk_type) 850 # Verify calls 851 disk_move_type = 'moveAllDiskBackingsAndAllowSharing' 852 get_disk_device.assert_called_once_with(backing) 853 get_relocate_spec.assert_called_once_with(datastore, resource_pool, 854 host, disk_move_type, 855 disk_type, disk_device) 856 self.session.invoke_api.assert_called_once_with(self.session.vim, 857 'RelocateVM_Task', 858 backing, 859 spec=spec) 860 self.session.wait_for_task.assert_called_once_with(task) 861 862 def test_move_backing_to_folder(self): 863 task = mock.sentinel.task 864 self.session.invoke_api.return_value = task 865 backing = mock.sentinel.backing 866 folder = mock.sentinel.folder 867 self.vops.move_backing_to_folder(backing, folder) 868 # Verify calls 869 self.session.invoke_api.assert_called_once_with(self.session.vim, 870 'MoveIntoFolder_Task', 871 folder, 872 list=[backing]) 873 self.session.wait_for_task.assert_called_once_with(task) 874 875 def test_create_snapshot_operation(self): 876 task = mock.sentinel.task 877 self.session.invoke_api.return_value = task 878 task_info = mock.Mock(spec=object) 879 task_info.result = mock.sentinel.result 880 self.session.wait_for_task.return_value = task_info 881 backing = mock.sentinel.backing 882 name = mock.sentinel.name 883 desc = mock.sentinel.description 884 quiesce = True 885 ret = self.vops.create_snapshot(backing, name, desc, quiesce) 886 self.assertEqual(mock.sentinel.result, ret) 887 self.session.invoke_api.assert_called_once_with(self.session.vim, 888 'CreateSnapshot_Task', 889 backing, name=name, 890 description=desc, 891 memory=False, 892 quiesce=quiesce) 893 self.session.wait_for_task.assert_called_once_with(task) 894 895 def test_get_snapshot_from_tree(self): 896 volops = volumeops.VMwareVolumeOps 897 name = mock.sentinel.name 898 # Test snapshot == 'None' 899 ret = volops._get_snapshot_from_tree(name, None) 900 self.assertIsNone(ret) 901 # Test root == snapshot 902 snapshot = mock.sentinel.snapshot 903 node = mock.Mock(spec=object) 904 node.name = name 905 node.snapshot = snapshot 906 ret = volops._get_snapshot_from_tree(name, node) 907 self.assertEqual(snapshot, ret) 908 # Test root.childSnapshotList == None 909 root = mock.Mock(spec=object) 910 root.name = 'root' 911 del root.childSnapshotList 912 ret = volops._get_snapshot_from_tree(name, root) 913 self.assertIsNone(ret) 914 # Test root.child == snapshot 915 root.childSnapshotList = [node] 916 ret = volops._get_snapshot_from_tree(name, root) 917 self.assertEqual(snapshot, ret) 918 919 def test_get_snapshot(self): 920 # build out the root snapshot tree 921 snapshot_name = mock.sentinel.snapshot_name 922 snapshot = mock.sentinel.snapshot 923 root = mock.Mock(spec=object) 924 root.name = 'root' 925 node = mock.Mock(spec=object) 926 node.name = snapshot_name 927 node.snapshot = snapshot 928 root.childSnapshotList = [node] 929 # Test rootSnapshotList is not None 930 snapshot_tree = mock.Mock(spec=object) 931 snapshot_tree.rootSnapshotList = [root] 932 self.session.invoke_api.return_value = snapshot_tree 933 backing = mock.sentinel.backing 934 ret = self.vops.get_snapshot(backing, snapshot_name) 935 self.assertEqual(snapshot, ret) 936 self.session.invoke_api.assert_called_with(vim_util, 937 'get_object_property', 938 self.session.vim, 939 backing, 940 'snapshot') 941 # Test rootSnapshotList == None 942 snapshot_tree.rootSnapshotList = None 943 ret = self.vops.get_snapshot(backing, snapshot_name) 944 self.assertIsNone(ret) 945 self.session.invoke_api.assert_called_with(vim_util, 946 'get_object_property', 947 self.session.vim, 948 backing, 949 'snapshot') 950 951 def test_snapshot_exists(self): 952 backing = mock.sentinel.backing 953 invoke_api = self.session.invoke_api 954 invoke_api.return_value = None 955 956 self.assertFalse(self.vops.snapshot_exists(backing)) 957 invoke_api.assert_called_once_with(vim_util, 958 'get_object_property', 959 self.session.vim, 960 backing, 961 'snapshot') 962 963 snapshot = mock.Mock() 964 invoke_api.return_value = snapshot 965 snapshot.rootSnapshotList = None 966 self.assertFalse(self.vops.snapshot_exists(backing)) 967 968 snapshot.rootSnapshotList = [mock.Mock()] 969 self.assertTrue(self.vops.snapshot_exists(backing)) 970 971 def test_delete_snapshot(self): 972 backing = mock.sentinel.backing 973 snapshot_name = mock.sentinel.snapshot_name 974 # Test snapshot is None 975 with mock.patch.object(self.vops, 'get_snapshot') as get_snapshot: 976 get_snapshot.return_value = None 977 self.vops.delete_snapshot(backing, snapshot_name) 978 get_snapshot.assert_called_once_with(backing, snapshot_name) 979 # Test snapshot is not None 980 snapshot = mock.sentinel.snapshot 981 task = mock.sentinel.task 982 invoke_api = self.session.invoke_api 983 invoke_api.return_value = task 984 with mock.patch.object(self.vops, 'get_snapshot') as get_snapshot: 985 get_snapshot.return_value = snapshot 986 self.vops.delete_snapshot(backing, snapshot_name) 987 get_snapshot.assert_called_with(backing, snapshot_name) 988 invoke_api.assert_called_once_with(self.session.vim, 989 'RemoveSnapshot_Task', 990 snapshot, removeChildren=False) 991 self.session.wait_for_task.assert_called_once_with(task) 992 993 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 994 'get_snapshot') 995 def test_revert_to_snapshot_with_missing_snapshot(self, get_snapshot): 996 get_snapshot.return_value = None 997 998 backing = mock.sentinel.backing 999 self.assertRaises(vmdk_exceptions.SnapshotNotFoundException, 1000 self.vops.revert_to_snapshot, backing, 'foo') 1001 get_snapshot.assert_called_once_with(backing, 'foo') 1002 1003 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1004 'get_snapshot') 1005 def test_revert_to_snapshot(self, get_snapshot): 1006 snapshot = mock.sentinel.snapshot 1007 get_snapshot.return_value = snapshot 1008 1009 task = mock.sentinel.task 1010 self.session.invoke_api.return_value = task 1011 1012 backing = mock.sentinel.backing 1013 self.vops.revert_to_snapshot(backing, 'foo') 1014 1015 get_snapshot.assert_called_once_with(backing, 'foo') 1016 self.session.invoke_api.assert_called_once_with( 1017 self.session.vim, 'RevertToSnapshot_Task', snapshot) 1018 self.session.wait_for_task.assert_called_once_with(task) 1019 1020 def test_get_folder(self): 1021 folder = mock.sentinel.folder 1022 backing = mock.sentinel.backing 1023 with mock.patch.object(self.vops, '_get_parent') as get_parent: 1024 get_parent.return_value = folder 1025 ret = self.vops._get_folder(backing) 1026 self.assertEqual(folder, ret) 1027 get_parent.assert_called_once_with(backing, 'Folder') 1028 1029 def _verify_extra_config(self, option_values, key, value): 1030 self.assertEqual(1, len(option_values)) 1031 self.assertEqual(key, option_values[0].key) 1032 self.assertEqual(value, option_values[0].value) 1033 1034 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1035 '_get_relocate_spec') 1036 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1037 '_get_disk_device') 1038 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1039 '_create_device_change_for_disk_removal') 1040 def _test_get_clone_spec( 1041 self, create_device_change_for_disk_removal, get_disk_device, 1042 get_relocate_spec, disk_type=None): 1043 factory = self.session.vim.client.factory 1044 factory.create.side_effect = lambda *args: mock.Mock() 1045 relocate_spec = mock.sentinel.relocate_spec 1046 get_relocate_spec.return_value = relocate_spec 1047 1048 if disk_type: 1049 disk_device = mock.sentinel.disk_device 1050 get_disk_device.return_value = disk_device 1051 else: 1052 disk_device = None 1053 1054 dev_change = mock.sentinel.dev_change 1055 create_device_change_for_disk_removal.return_value = dev_change 1056 1057 datastore = mock.sentinel.datastore 1058 disk_move_type = mock.sentinel.disk_move_type 1059 snapshot = mock.sentinel.snapshot 1060 backing = mock.sentinel.backing 1061 host = mock.sentinel.host 1062 rp = mock.sentinel.rp 1063 key = mock.sentinel.key 1064 value = mock.sentinel.value 1065 extra_config = {key: value, 1066 volumeops.BACKING_UUID_KEY: mock.sentinel.uuid} 1067 disks_to_clone = [mock.sentinel.disk_uuid] 1068 ret = self.vops._get_clone_spec(datastore, 1069 disk_move_type, 1070 snapshot, 1071 backing, 1072 disk_type, 1073 host=host, 1074 resource_pool=rp, 1075 extra_config=extra_config, 1076 disks_to_clone=disks_to_clone) 1077 1078 self.assertEqual(relocate_spec, ret.location) 1079 self.assertFalse(ret.powerOn) 1080 self.assertFalse(ret.template) 1081 self.assertEqual(snapshot, ret.snapshot) 1082 self.assertEqual(mock.sentinel.uuid, ret.config.instanceUuid) 1083 self.assertEqual(mock.sentinel.extension_key, 1084 ret.config.managedBy.extensionKey) 1085 self.assertEqual(mock.sentinel.extension_type, 1086 ret.config.managedBy.type) 1087 get_relocate_spec.assert_called_once_with(datastore, rp, host, 1088 disk_move_type, disk_type, 1089 disk_device) 1090 self._verify_extra_config(ret.config.extraConfig, key, value) 1091 create_device_change_for_disk_removal.assert_called_once_with( 1092 backing, disks_to_clone) 1093 self.assertEqual(dev_change, ret.config.deviceChange) 1094 1095 def test_get_clone_spec(self): 1096 self._test_get_clone_spec() 1097 1098 def test_get_clone_spec_with_thin_disk_type(self): 1099 self._test_get_clone_spec(disk_type='thin') 1100 1101 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1102 '_get_disk_devices') 1103 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1104 '_create_spec_for_disk_remove') 1105 def test_create_device_change_for_disk_removal( 1106 self, create_spec_for_disk_remove, get_disk_devices): 1107 uuid_1 = mock.sentinel.uuid_1 1108 disk_dev_1 = self._create_disk_device('foo', uuid_1) 1109 1110 uuid_2 = mock.sentinel.uuid_2 1111 disk_dev_2 = self._create_disk_device('bar', uuid_2) 1112 1113 get_disk_devices.return_value = [disk_dev_1, disk_dev_2] 1114 1115 spec = mock.sentinel.spec 1116 create_spec_for_disk_remove.return_value = spec 1117 1118 backing = mock.sentinel.backing 1119 disks_to_clone = [uuid_2] 1120 ret = self.vops._create_device_change_for_disk_removal( 1121 backing, disks_to_clone) 1122 1123 get_disk_devices.assert_called_once_with(backing) 1124 create_spec_for_disk_remove.assert_called_once_with(disk_dev_1) 1125 self.assertEqual([spec], ret) 1126 1127 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1128 '_get_folder') 1129 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1130 '_get_clone_spec') 1131 def _test_clone_backing( 1132 self, clone_type, folder, get_clone_spec, get_folder): 1133 backing_folder = mock.sentinel.backing_folder 1134 get_folder.return_value = backing_folder 1135 1136 clone_spec = mock.sentinel.clone_spec 1137 get_clone_spec.return_value = clone_spec 1138 1139 task = mock.sentinel.task 1140 self.session.invoke_api.return_value = task 1141 1142 clone = mock.sentinel.clone 1143 self.session.wait_for_task.return_value = mock.Mock(result=clone) 1144 1145 name = mock.sentinel.name 1146 backing = mock.sentinel.backing 1147 snapshot = mock.sentinel.snapshot 1148 datastore = mock.sentinel.datastore 1149 disk_type = mock.sentinel.disk_type 1150 host = mock.sentinel.host 1151 resource_pool = mock.sentinel.resource_pool 1152 extra_config = mock.sentinel.extra_config 1153 ret = self.vops.clone_backing( 1154 name, backing, snapshot, clone_type, datastore, 1155 disk_type=disk_type, host=host, resource_pool=resource_pool, 1156 extra_config=extra_config, folder=folder) 1157 1158 if folder: 1159 self.assertFalse(get_folder.called) 1160 else: 1161 get_folder.assert_called_once_with(backing) 1162 1163 if clone_type == 'linked': 1164 exp_disk_move_type = 'createNewChildDiskBacking' 1165 else: 1166 exp_disk_move_type = 'moveAllDiskBackingsAndDisallowSharing' 1167 get_clone_spec.assert_called_once_with( 1168 datastore, exp_disk_move_type, snapshot, backing, disk_type, 1169 host=host, resource_pool=resource_pool, extra_config=extra_config, 1170 disks_to_clone=None) 1171 1172 exp_folder = folder if folder else backing_folder 1173 self.session.invoke_api.assert_called_once_with( 1174 self.session.vim, 'CloneVM_Task', backing, folder=exp_folder, 1175 name=name, spec=clone_spec) 1176 1177 self.session.wait_for_task.assert_called_once_with(task) 1178 self.assertEqual(clone, ret) 1179 1180 @ddt.data('linked', 'full') 1181 def test_clone_backing(self, clone_type): 1182 self._test_clone_backing(clone_type, mock.sentinel.folder) 1183 1184 def test_clone_backing_with_empty_folder(self): 1185 self._test_clone_backing('linked', None) 1186 1187 def _create_controller_device(self, controller_type): 1188 dev = mock.Mock() 1189 dev.__class__.__name__ = controller_type 1190 return dev 1191 1192 def test_get_controller(self): 1193 disk = self._create_disk_device('foo.vmdk') 1194 controller1 = self._create_controller_device( 1195 volumeops.ControllerType.LSI_LOGIC) 1196 controller2 = self._create_controller_device( 1197 volumeops.ControllerType.PARA_VIRTUAL) 1198 self.session.invoke_api.return_value = [disk, controller1, controller2] 1199 1200 backing = mock.sentinel.backing 1201 ret = self.vops._get_controller( 1202 backing, volumeops.VirtualDiskAdapterType.PARA_VIRTUAL) 1203 self.assertEqual(controller2, ret) 1204 self.session.invoke_api.assert_called_once_with( 1205 vim_util, 'get_object_property', self.session.vim, backing, 1206 'config.hardware.device') 1207 1208 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1209 '_get_controller', return_value=None) 1210 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1211 '_create_specs_for_disk_add') 1212 def test_attach_disk_to_backing(self, create_spec, get_controller): 1213 reconfig_spec = mock.Mock() 1214 self.session.vim.client.factory.create.return_value = reconfig_spec 1215 disk_add_config_specs = mock.Mock() 1216 create_spec.return_value = disk_add_config_specs 1217 task = mock.Mock() 1218 self.session.invoke_api.return_value = task 1219 1220 backing = mock.Mock() 1221 size_in_kb = units.Ki 1222 disk_type = "thin" 1223 adapter_type = "ide" 1224 profile_id = mock.sentinel.profile_id 1225 vmdk_ds_file_path = mock.sentinel.vmdk_ds_file_path 1226 self.vops.attach_disk_to_backing(backing, size_in_kb, disk_type, 1227 adapter_type, profile_id, 1228 vmdk_ds_file_path) 1229 1230 get_controller.assert_called_once_with(backing, adapter_type) 1231 self.assertEqual(disk_add_config_specs, reconfig_spec.deviceChange) 1232 create_spec.assert_called_once_with( 1233 size_in_kb, disk_type, adapter_type, profile_id, 1234 vmdk_ds_file_path=vmdk_ds_file_path) 1235 self.session.invoke_api.assert_called_once_with(self.session.vim, 1236 "ReconfigVM_Task", 1237 backing, 1238 spec=reconfig_spec) 1239 self.session.wait_for_task.assert_called_once_with(task) 1240 1241 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1242 '_get_controller') 1243 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1244 '_create_virtual_disk_config_spec') 1245 def test_attach_disk_to_backing_existing_controller( 1246 self, create_disk_spec, get_controller): 1247 key = mock.sentinel.key 1248 controller = mock.Mock(key=key) 1249 get_controller.return_value = controller 1250 1251 reconfig_spec = mock.Mock() 1252 self.session.vim.client.factory.create.return_value = reconfig_spec 1253 disk_spec = mock.Mock() 1254 create_disk_spec.return_value = disk_spec 1255 task = mock.Mock() 1256 self.session.invoke_api.return_value = task 1257 1258 backing = mock.Mock() 1259 size_in_kb = units.Ki 1260 disk_type = "thin" 1261 adapter_type = "ide" 1262 profile_id = mock.sentinel.profile_id 1263 vmdk_ds_file_path = mock.sentinel.vmdk_ds_file_path 1264 self.vops.attach_disk_to_backing(backing, size_in_kb, disk_type, 1265 adapter_type, profile_id, 1266 vmdk_ds_file_path) 1267 1268 get_controller.assert_called_once_with(backing, adapter_type) 1269 self.assertEqual([disk_spec], reconfig_spec.deviceChange) 1270 create_disk_spec.assert_called_once_with( 1271 size_in_kb, disk_type, key, profile_id, vmdk_ds_file_path) 1272 self.session.invoke_api.assert_called_once_with(self.session.vim, 1273 "ReconfigVM_Task", 1274 backing, 1275 spec=reconfig_spec) 1276 self.session.wait_for_task.assert_called_once_with(task) 1277 1278 def test_create_spec_for_disk_remove(self): 1279 disk_spec = mock.Mock() 1280 self.session.vim.client.factory.create.return_value = disk_spec 1281 1282 disk_device = mock.sentinel.disk_device 1283 self.vops._create_spec_for_disk_remove(disk_device) 1284 1285 self.session.vim.client.factory.create.assert_called_once_with( 1286 'ns0:VirtualDeviceConfigSpec') 1287 self.assertEqual('remove', disk_spec.operation) 1288 self.assertEqual(disk_device, disk_spec.device) 1289 1290 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1291 '_create_spec_for_disk_remove') 1292 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1293 '_reconfigure_backing') 1294 def test_detach_disk_from_backing(self, reconfigure_backing, create_spec): 1295 disk_spec = mock.sentinel.disk_spec 1296 create_spec.return_value = disk_spec 1297 1298 reconfig_spec = mock.Mock() 1299 self.session.vim.client.factory.create.return_value = reconfig_spec 1300 1301 backing = mock.sentinel.backing 1302 disk_device = mock.sentinel.disk_device 1303 self.vops.detach_disk_from_backing(backing, disk_device) 1304 1305 create_spec.assert_called_once_with(disk_device) 1306 self.session.vim.client.factory.create.assert_called_once_with( 1307 'ns0:VirtualMachineConfigSpec') 1308 self.assertEqual([disk_spec], reconfig_spec.deviceChange) 1309 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 1310 1311 def test_rename_backing(self): 1312 task = mock.sentinel.task 1313 self.session.invoke_api.return_value = task 1314 1315 backing = mock.sentinel.backing 1316 new_name = mock.sentinel.new_name 1317 self.vops.rename_backing(backing, new_name) 1318 1319 self.session.invoke_api.assert_called_once_with(self.session.vim, 1320 "Rename_Task", 1321 backing, 1322 newName=new_name) 1323 self.session.wait_for_task.assert_called_once_with(task) 1324 1325 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1326 '_get_disk_device') 1327 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1328 '_reconfigure_backing') 1329 def test_update_backing_disk_uuid(self, reconfigure_backing, 1330 get_disk_device): 1331 disk_spec = mock.Mock() 1332 reconfig_spec = mock.Mock() 1333 self.session.vim.client.factory.create.side_effect = [disk_spec, 1334 reconfig_spec] 1335 1336 disk_device = mock.Mock() 1337 get_disk_device.return_value = disk_device 1338 1339 self.vops.update_backing_disk_uuid(mock.sentinel.backing, 1340 mock.sentinel.disk_uuid) 1341 1342 get_disk_device.assert_called_once_with(mock.sentinel.backing) 1343 self.assertEqual(mock.sentinel.disk_uuid, disk_device.backing.uuid) 1344 self.assertEqual('edit', disk_spec.operation) 1345 self.assertEqual(disk_device, disk_spec.device) 1346 self.assertEqual([disk_spec], reconfig_spec.deviceChange) 1347 reconfigure_backing.assert_called_once_with(mock.sentinel.backing, 1348 reconfig_spec) 1349 exp_factory_create_calls = [mock.call('ns0:VirtualDeviceConfigSpec'), 1350 mock.call('ns0:VirtualMachineConfigSpec')] 1351 self.assertEqual(exp_factory_create_calls, 1352 self.session.vim.client.factory.create.call_args_list) 1353 1354 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1355 '_get_extra_config_option_values') 1356 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1357 '_reconfigure_backing') 1358 def test_update_backing_extra_config(self, 1359 reconfigure_backing, 1360 get_extra_config_option_values): 1361 reconfig_spec = mock.Mock() 1362 self.session.vim.client.factory.create.return_value = reconfig_spec 1363 1364 option_values = mock.sentinel.option_values 1365 get_extra_config_option_values.return_value = option_values 1366 1367 backing = mock.sentinel.backing 1368 option_key = mock.sentinel.key 1369 option_value = mock.sentinel.value 1370 extra_config = {option_key: option_value, 1371 volumeops.BACKING_UUID_KEY: mock.sentinel.uuid} 1372 self.vops.update_backing_extra_config(backing, extra_config) 1373 1374 get_extra_config_option_values.assert_called_once_with( 1375 {option_key: option_value}) 1376 self.assertEqual(mock.sentinel.uuid, reconfig_spec.instanceUuid) 1377 self.assertEqual(option_values, reconfig_spec.extraConfig) 1378 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 1379 1380 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1381 '_reconfigure_backing') 1382 def test_update_backing_uuid(self, reconfigure_backing): 1383 reconfig_spec = mock.Mock() 1384 self.session.vim.client.factory.create.return_value = reconfig_spec 1385 1386 backing = mock.sentinel.backing 1387 uuid = mock.sentinel.uuid 1388 self.vops.update_backing_uuid(backing, uuid) 1389 1390 self.assertEqual(mock.sentinel.uuid, reconfig_spec.instanceUuid) 1391 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 1392 1393 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1394 '_get_disk_device') 1395 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1396 '_reconfigure_backing') 1397 def test_change_backing_profile_to_empty_profile( 1398 self, reconfigure_backing, get_disk_device): 1399 reconfig_spec = mock.Mock() 1400 empty_profile_spec = mock.sentinel.empty_profile_spec 1401 disk_spec = mock.Mock() 1402 self.session.vim.client.factory.create.side_effect = [ 1403 empty_profile_spec, reconfig_spec, disk_spec] 1404 1405 disk_device = mock.sentinel.disk_device 1406 get_disk_device.return_value = disk_device 1407 1408 backing = mock.sentinel.backing 1409 self.vops.change_backing_profile(backing, None) 1410 1411 self.assertEqual([empty_profile_spec], reconfig_spec.vmProfile) 1412 get_disk_device.assert_called_once_with(backing) 1413 self.assertEqual(disk_device, disk_spec.device) 1414 self.assertEqual('edit', disk_spec.operation) 1415 self.assertEqual([empty_profile_spec], disk_spec.profile) 1416 self.assertEqual([disk_spec], reconfig_spec.deviceChange) 1417 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 1418 1419 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1420 '_get_disk_device') 1421 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1422 '_reconfigure_backing') 1423 def test_change_backing_profile( 1424 self, reconfigure_backing, get_disk_device): 1425 reconfig_spec = mock.Mock() 1426 profile_spec = mock.Mock() 1427 disk_spec = mock.Mock() 1428 self.session.vim.client.factory.create.side_effect = [ 1429 profile_spec, reconfig_spec, disk_spec] 1430 1431 disk_device = mock.sentinel.disk_device 1432 get_disk_device.return_value = disk_device 1433 1434 backing = mock.sentinel.backing 1435 unique_id = mock.sentinel.unique_id 1436 profile_id = mock.Mock(uniqueId=unique_id) 1437 self.vops.change_backing_profile(backing, profile_id) 1438 1439 self.assertEqual(unique_id, profile_spec.profileId) 1440 self.assertEqual([profile_spec], reconfig_spec.vmProfile) 1441 get_disk_device.assert_called_once_with(backing) 1442 self.assertEqual(disk_device, disk_spec.device) 1443 self.assertEqual('edit', disk_spec.operation) 1444 self.assertEqual([profile_spec], disk_spec.profile) 1445 self.assertEqual([disk_spec], reconfig_spec.deviceChange) 1446 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 1447 1448 def test_delete_file(self): 1449 file_mgr = mock.sentinel.file_manager 1450 self.session.vim.service_content.fileManager = file_mgr 1451 task = mock.sentinel.task 1452 invoke_api = self.session.invoke_api 1453 invoke_api.return_value = task 1454 # Test delete file 1455 file_path = mock.sentinel.file_path 1456 datacenter = mock.sentinel.datacenter 1457 self.vops.delete_file(file_path, datacenter) 1458 # verify calls 1459 invoke_api.assert_called_once_with(self.session.vim, 1460 'DeleteDatastoreFile_Task', 1461 file_mgr, 1462 name=file_path, 1463 datacenter=datacenter) 1464 self.session.wait_for_task.assert_called_once_with(task) 1465 1466 def test_create_datastore_folder(self): 1467 file_manager = mock.sentinel.file_manager 1468 self.session.vim.service_content.fileManager = file_manager 1469 invoke_api = self.session.invoke_api 1470 1471 ds_name = "nfs" 1472 folder_path = "test/" 1473 datacenter = mock.sentinel.datacenter 1474 1475 self.vops.create_datastore_folder(ds_name, folder_path, datacenter) 1476 invoke_api.assert_called_once_with(self.session.vim, 1477 'MakeDirectory', 1478 file_manager, 1479 name="[nfs] test/", 1480 datacenter=datacenter) 1481 1482 def test_create_datastore_folder_with_existing_folder(self): 1483 file_manager = mock.sentinel.file_manager 1484 self.session.vim.service_content.fileManager = file_manager 1485 invoke_api = self.session.invoke_api 1486 invoke_api.side_effect = exceptions.FileAlreadyExistsException 1487 1488 ds_name = "nfs" 1489 folder_path = "test/" 1490 datacenter = mock.sentinel.datacenter 1491 1492 self.vops.create_datastore_folder(ds_name, folder_path, datacenter) 1493 invoke_api.assert_called_once_with(self.session.vim, 1494 'MakeDirectory', 1495 file_manager, 1496 name="[nfs] test/", 1497 datacenter=datacenter) 1498 invoke_api.side_effect = None 1499 1500 def test_create_datastore_folder_with_invoke_api_error(self): 1501 file_manager = mock.sentinel.file_manager 1502 self.session.vim.service_content.fileManager = file_manager 1503 invoke_api = self.session.invoke_api 1504 invoke_api.side_effect = exceptions.VimFaultException( 1505 ["FileFault"], "error") 1506 1507 ds_name = "nfs" 1508 folder_path = "test/" 1509 datacenter = mock.sentinel.datacenter 1510 1511 self.assertRaises(exceptions.VimFaultException, 1512 self.vops.create_datastore_folder, 1513 ds_name, 1514 folder_path, 1515 datacenter) 1516 invoke_api.assert_called_once_with(self.session.vim, 1517 'MakeDirectory', 1518 file_manager, 1519 name="[nfs] test/", 1520 datacenter=datacenter) 1521 invoke_api.side_effect = None 1522 1523 def test_get_path_name(self): 1524 path = mock.Mock(spec=object) 1525 path_name = mock.sentinel.vm_path_name 1526 path.vmPathName = path_name 1527 invoke_api = self.session.invoke_api 1528 invoke_api.return_value = path 1529 backing = mock.sentinel.backing 1530 ret = self.vops.get_path_name(backing) 1531 self.assertEqual(path_name, ret) 1532 invoke_api.assert_called_once_with(vim_util, 'get_object_property', 1533 self.session.vim, backing, 1534 'config.files') 1535 1536 def test_get_entity_name(self): 1537 entity_name = mock.sentinel.entity_name 1538 invoke_api = self.session.invoke_api 1539 invoke_api.return_value = entity_name 1540 entity = mock.sentinel.entity 1541 ret = self.vops.get_entity_name(entity) 1542 self.assertEqual(entity_name, ret) 1543 invoke_api.assert_called_once_with(vim_util, 'get_object_property', 1544 self.session.vim, entity, 'name') 1545 1546 def test_get_vmdk_path(self): 1547 # Setup hardware_devices for test 1548 device = mock.Mock() 1549 device.__class__.__name__ = 'VirtualDisk' 1550 backing = mock.Mock() 1551 backing.__class__.__name__ = 'VirtualDiskFlatVer2BackingInfo' 1552 backing.fileName = mock.sentinel.vmdk_path 1553 device.backing = backing 1554 invoke_api = self.session.invoke_api 1555 invoke_api.return_value = [device] 1556 # Test get_vmdk_path 1557 ret = self.vops.get_vmdk_path(backing) 1558 self.assertEqual(mock.sentinel.vmdk_path, ret) 1559 invoke_api.assert_called_once_with(vim_util, 'get_object_property', 1560 self.session.vim, backing, 1561 'config.hardware.device') 1562 1563 backing.__class__.__name__ = ' VirtualDiskSparseVer2BackingInfo' 1564 self.assertRaises(AssertionError, self.vops.get_vmdk_path, backing) 1565 1566 # Test with no disk device. 1567 invoke_api.return_value = [] 1568 self.assertRaises(vmdk_exceptions.VirtualDiskNotFoundException, 1569 self.vops.get_vmdk_path, 1570 backing) 1571 1572 def test_get_disk_size(self): 1573 # Test with valid disk device. 1574 device = mock.Mock() 1575 device.__class__.__name__ = 'VirtualDisk' 1576 disk_size_bytes = 1024 1577 device.capacityInKB = disk_size_bytes / units.Ki 1578 invoke_api = self.session.invoke_api 1579 invoke_api.return_value = [device] 1580 1581 self.assertEqual(disk_size_bytes, 1582 self.vops.get_disk_size(mock.sentinel.backing)) 1583 1584 # Test with no disk device. 1585 invoke_api.return_value = [] 1586 1587 self.assertRaises(vmdk_exceptions.VirtualDiskNotFoundException, 1588 self.vops.get_disk_size, 1589 mock.sentinel.backing) 1590 1591 def test_create_virtual_disk(self): 1592 task = mock.Mock() 1593 invoke_api = self.session.invoke_api 1594 invoke_api.return_value = task 1595 spec = mock.Mock() 1596 factory = self.session.vim.client.factory 1597 factory.create.return_value = spec 1598 disk_mgr = self.session.vim.service_content.virtualDiskManager 1599 1600 dc_ref = mock.Mock() 1601 vmdk_ds_file_path = mock.Mock() 1602 size_in_kb = 1024 1603 adapter_type = 'ide' 1604 disk_type = 'thick' 1605 self.vops.create_virtual_disk(dc_ref, vmdk_ds_file_path, size_in_kb, 1606 adapter_type, disk_type) 1607 1608 self.assertEqual(volumeops.VirtualDiskAdapterType.IDE, 1609 spec.adapterType) 1610 self.assertEqual(volumeops.VirtualDiskType.PREALLOCATED, spec.diskType) 1611 self.assertEqual(size_in_kb, spec.capacityKb) 1612 invoke_api.assert_called_once_with(self.session.vim, 1613 'CreateVirtualDisk_Task', 1614 disk_mgr, 1615 name=vmdk_ds_file_path, 1616 datacenter=dc_ref, 1617 spec=spec) 1618 self.session.wait_for_task.assert_called_once_with(task) 1619 1620 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1621 'create_virtual_disk') 1622 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1623 'delete_file') 1624 def test_create_flat_extent_virtual_disk_descriptor(self, delete_file, 1625 create_virtual_disk): 1626 dc_ref = mock.Mock() 1627 path = mock.Mock() 1628 size_in_kb = 1024 1629 adapter_type = 'ide' 1630 disk_type = 'thick' 1631 1632 self.vops.create_flat_extent_virtual_disk_descriptor(dc_ref, 1633 path, 1634 size_in_kb, 1635 adapter_type, 1636 disk_type) 1637 create_virtual_disk.assert_called_once_with( 1638 dc_ref, path.get_descriptor_ds_file_path(), size_in_kb, 1639 adapter_type, disk_type) 1640 delete_file.assert_called_once_with( 1641 path.get_flat_extent_ds_file_path(), dc_ref) 1642 1643 def test_copy_vmdk_file(self): 1644 task = mock.sentinel.task 1645 invoke_api = self.session.invoke_api 1646 invoke_api.return_value = task 1647 1648 disk_mgr = self.session.vim.service_content.virtualDiskManager 1649 src_dc_ref = mock.sentinel.src_dc_ref 1650 src_vmdk_file_path = mock.sentinel.src_vmdk_file_path 1651 dest_dc_ref = mock.sentinel.dest_dc_ref 1652 dest_vmdk_file_path = mock.sentinel.dest_vmdk_file_path 1653 self.vops.copy_vmdk_file(src_dc_ref, src_vmdk_file_path, 1654 dest_vmdk_file_path, dest_dc_ref) 1655 1656 invoke_api.assert_called_once_with(self.session.vim, 1657 'CopyVirtualDisk_Task', 1658 disk_mgr, 1659 sourceName=src_vmdk_file_path, 1660 sourceDatacenter=src_dc_ref, 1661 destName=dest_vmdk_file_path, 1662 destDatacenter=dest_dc_ref, 1663 force=True) 1664 self.session.wait_for_task.assert_called_once_with(task) 1665 1666 def test_copy_vmdk_file_with_default_dest_datacenter(self): 1667 task = mock.sentinel.task 1668 invoke_api = self.session.invoke_api 1669 invoke_api.return_value = task 1670 1671 disk_mgr = self.session.vim.service_content.virtualDiskManager 1672 src_dc_ref = mock.sentinel.src_dc_ref 1673 src_vmdk_file_path = mock.sentinel.src_vmdk_file_path 1674 dest_vmdk_file_path = mock.sentinel.dest_vmdk_file_path 1675 self.vops.copy_vmdk_file(src_dc_ref, src_vmdk_file_path, 1676 dest_vmdk_file_path) 1677 1678 invoke_api.assert_called_once_with(self.session.vim, 1679 'CopyVirtualDisk_Task', 1680 disk_mgr, 1681 sourceName=src_vmdk_file_path, 1682 sourceDatacenter=src_dc_ref, 1683 destName=dest_vmdk_file_path, 1684 destDatacenter=src_dc_ref, 1685 force=True) 1686 self.session.wait_for_task.assert_called_once_with(task) 1687 1688 def test_move_vmdk_file(self): 1689 task = mock.sentinel.task 1690 invoke_api = self.session.invoke_api 1691 invoke_api.return_value = task 1692 1693 disk_mgr = self.session.vim.service_content.virtualDiskManager 1694 src_dc_ref = mock.sentinel.src_dc_ref 1695 src_vmdk_file_path = mock.sentinel.src_vmdk_file_path 1696 dest_dc_ref = mock.sentinel.dest_dc_ref 1697 dest_vmdk_file_path = mock.sentinel.dest_vmdk_file_path 1698 self.vops.move_vmdk_file(src_dc_ref, 1699 src_vmdk_file_path, 1700 dest_vmdk_file_path, 1701 dest_dc_ref=dest_dc_ref) 1702 1703 invoke_api.assert_called_once_with(self.session.vim, 1704 'MoveVirtualDisk_Task', 1705 disk_mgr, 1706 sourceName=src_vmdk_file_path, 1707 sourceDatacenter=src_dc_ref, 1708 destName=dest_vmdk_file_path, 1709 destDatacenter=dest_dc_ref, 1710 force=True) 1711 self.session.wait_for_task.assert_called_once_with(task) 1712 1713 def test_delete_vmdk_file(self): 1714 task = mock.sentinel.task 1715 invoke_api = self.session.invoke_api 1716 invoke_api.return_value = task 1717 disk_mgr = self.session.vim.service_content.virtualDiskManager 1718 dc_ref = self.session.dc_ref 1719 vmdk_file_path = self.session.vmdk_file 1720 self.vops.delete_vmdk_file(vmdk_file_path, dc_ref) 1721 invoke_api.assert_called_once_with(self.session.vim, 1722 'DeleteVirtualDisk_Task', 1723 disk_mgr, 1724 name=vmdk_file_path, 1725 datacenter=dc_ref) 1726 self.session.wait_for_task.assert_called_once_with(task) 1727 1728 def test_extend_virtual_disk(self): 1729 """Test volumeops.extend_virtual_disk.""" 1730 task = mock.sentinel.task 1731 invoke_api = self.session.invoke_api 1732 invoke_api.return_value = task 1733 disk_mgr = self.session.vim.service_content.virtualDiskManager 1734 fake_size = 5 1735 fake_size_in_kb = fake_size * units.Mi 1736 fake_name = 'fake_volume_0000000001' 1737 fake_dc = mock.sentinel.datacenter 1738 self.vops.extend_virtual_disk(fake_size, 1739 fake_name, fake_dc) 1740 invoke_api.assert_called_once_with(self.session.vim, 1741 "ExtendVirtualDisk_Task", 1742 disk_mgr, 1743 name=fake_name, 1744 datacenter=fake_dc, 1745 newCapacityKb=fake_size_in_kb, 1746 eagerZero=False) 1747 self.session.wait_for_task.assert_called_once_with(task) 1748 1749 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1750 '_get_all_clusters') 1751 def test_get_cluster_refs(self, get_all_clusters): 1752 cls_1 = mock.sentinel.cls_1 1753 cls_2 = mock.sentinel.cls_2 1754 clusters = {"cls_1": cls_1, "cls_2": cls_2} 1755 get_all_clusters.return_value = clusters 1756 1757 self.assertEqual({"cls_2": cls_2}, 1758 self.vops.get_cluster_refs(["cls_2"])) 1759 1760 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1761 '_get_all_clusters') 1762 def test_get_cluster_refs_with_invalid_cluster(self, get_all_clusters): 1763 cls_1 = mock.sentinel.cls_1 1764 cls_2 = mock.sentinel.cls_2 1765 clusters = {"cls_1": cls_1, "cls_2": cls_2} 1766 get_all_clusters.return_value = clusters 1767 1768 self.assertRaises(vmdk_exceptions.ClusterNotFoundException, 1769 self.vops.get_cluster_refs, 1770 ["cls_1", "cls_3"]) 1771 1772 def test_get_cluster_hosts(self): 1773 host_1 = mock.sentinel.host_1 1774 host_2 = mock.sentinel.host_2 1775 hosts = mock.Mock(ManagedObjectReference=[host_1, host_2]) 1776 self.session.invoke_api.return_value = hosts 1777 1778 cluster = mock.sentinel.cluster 1779 ret = self.vops.get_cluster_hosts(cluster) 1780 1781 self.assertEqual([host_1, host_2], ret) 1782 self.session.invoke_api.assert_called_once_with(vim_util, 1783 'get_object_property', 1784 self.session.vim, 1785 cluster, 1786 'host') 1787 1788 def test_get_cluster_hosts_with_no_host(self): 1789 self.session.invoke_api.return_value = None 1790 1791 cluster = mock.sentinel.cluster 1792 ret = self.vops.get_cluster_hosts(cluster) 1793 1794 self.assertEqual([], ret) 1795 self.session.invoke_api.assert_called_once_with(vim_util, 1796 'get_object_property', 1797 self.session.vim, 1798 cluster, 1799 'host') 1800 1801 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1802 'continue_retrieval', return_value=None) 1803 def test_get_all_clusters(self, continue_retrieval): 1804 prop_1 = mock.Mock(val='test_cluster_1') 1805 cls_1 = mock.Mock(propSet=[prop_1], obj=mock.sentinel.mor_1) 1806 prop_2 = mock.Mock(val='/test_cluster_2') 1807 cls_2 = mock.Mock(propSet=[prop_2], obj=mock.sentinel.mor_2) 1808 1809 retrieve_result = mock.Mock(objects=[cls_1, cls_2]) 1810 self.session.invoke_api.return_value = retrieve_result 1811 1812 ret = self.vops._get_all_clusters() 1813 exp = {'test_cluster_1': mock.sentinel.mor_1, 1814 '/test_cluster_2': mock.sentinel.mor_2} 1815 self.assertEqual(exp, ret) 1816 self.session.invoke_api.assert_called_once_with( 1817 vim_util, 'get_objects', self.session.vim, 1818 'ClusterComputeResource', self.MAX_OBJECTS) 1819 continue_retrieval.assert_called_once_with(retrieve_result) 1820 1821 def test_get_entity_by_inventory_path(self): 1822 self.session.invoke_api.return_value = mock.sentinel.ref 1823 1824 path = mock.sentinel.path 1825 ret = self.vops.get_entity_by_inventory_path(path) 1826 self.assertEqual(mock.sentinel.ref, ret) 1827 self.session.invoke_api.assert_called_once_with( 1828 self.session.vim, 1829 "FindByInventoryPath", 1830 self.session.vim.service_content.searchIndex, 1831 inventoryPath=path) 1832 1833 def test_get_inventory_path(self): 1834 1835 path = mock.sentinel.path 1836 self.session.invoke_api.return_value = path 1837 1838 entity = mock.sentinel.entity 1839 self.assertEqual(path, self.vops.get_inventory_path(entity)) 1840 self.session.invoke_api.assert_called_once_with( 1841 vim_util, 'get_inventory_path', self.session.vim, entity) 1842 1843 def test_get_disk_devices(self): 1844 disk_device = mock.Mock() 1845 disk_device.__class__.__name__ = 'VirtualDisk' 1846 1847 controller_device = mock.Mock() 1848 controller_device.__class__.__name__ = 'VirtualLSILogicController' 1849 1850 devices = mock.Mock() 1851 devices.__class__.__name__ = "ArrayOfVirtualDevice" 1852 devices.VirtualDevice = [disk_device, controller_device] 1853 self.session.invoke_api.return_value = devices 1854 1855 vm = mock.sentinel.vm 1856 self.assertEqual([disk_device], self.vops._get_disk_devices(vm)) 1857 self.session.invoke_api.assert_called_once_with( 1858 vim_util, 'get_object_property', self.session.vim, 1859 vm, 'config.hardware.device') 1860 1861 def _create_disk_device(self, file_name, uuid=None): 1862 backing = mock.Mock(fileName=file_name) 1863 backing.__class__.__name__ = 'VirtualDiskFlatVer2BackingInfo' 1864 backing.uuid = uuid 1865 return mock.Mock(backing=backing) 1866 1867 def test_mark_backing_as_template(self): 1868 backing = mock.Mock() 1869 self.vops.mark_backing_as_template(backing) 1870 self.session.invoke_api.assert_called_once_with( 1871 self.session.vim, 'MarkAsTemplate', backing) 1872 1873 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1874 '_get_disk_devices') 1875 def test_get_disk_device(self, get_disk_devices): 1876 dev_1 = self._create_disk_device('[ds1] foo/foo.vmdk') 1877 dev_2 = self._create_disk_device('[ds1] foo/foo_1.vmdk') 1878 get_disk_devices.return_value = [dev_1, dev_2] 1879 1880 vm = mock.sentinel.vm 1881 self.assertEqual(dev_2, 1882 self.vops.get_disk_device(vm, '[ds1] foo/foo_1.vmdk')) 1883 get_disk_devices.assert_called_once_with(vm) 1884 1885 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1886 'get_entity_by_inventory_path') 1887 def test_copy_datastore_file(self, get_entity_by_inventory_path): 1888 get_entity_by_inventory_path.return_value = mock.sentinel.src_dc_ref 1889 self.session.invoke_api.return_value = mock.sentinel.task 1890 1891 vsphere_url = "vsphere://hostname/folder/openstack_glance/img_uuid?" \ 1892 "dcPath=dc1&dsName=ds1" 1893 self.vops.copy_datastore_file(vsphere_url, mock.sentinel.dest_dc_ref, 1894 mock.sentinel.dest_ds_file_path) 1895 1896 get_entity_by_inventory_path.assert_called_once_with('dc1') 1897 self.session.invoke_api.assert_called_once_with( 1898 self.session.vim, 1899 'CopyDatastoreFile_Task', 1900 self.session.vim.service_content.fileManager, 1901 sourceName='[ds1] openstack_glance/img_uuid', 1902 sourceDatacenter=mock.sentinel.src_dc_ref, 1903 destinationName=mock.sentinel.dest_ds_file_path, 1904 destinationDatacenter=mock.sentinel.dest_dc_ref) 1905 self.session.wait_for_task.assert_called_once_with(mock.sentinel.task) 1906 1907 @ddt.data(volumeops.VirtualDiskType.EAGER_ZEROED_THICK, 1908 volumeops.VirtualDiskType.PREALLOCATED, 1909 volumeops.VirtualDiskType.THIN) 1910 def test_create_fcd_backing_spec(self, disk_type): 1911 spec = mock.Mock() 1912 self.session.vim.client.factory.create.return_value = spec 1913 1914 ds_ref = mock.sentinel.ds_ref 1915 ret = self.vops._create_fcd_backing_spec(disk_type, ds_ref) 1916 1917 if disk_type == volumeops.VirtualDiskType.PREALLOCATED: 1918 prov_type = 'lazyZeroedThick' 1919 else: 1920 prov_type = disk_type 1921 self.assertEqual(prov_type, ret.provisioningType) 1922 self.assertEqual(ds_ref, ret.datastore) 1923 self.session.vim.client.factory.create.assert_called_once_with( 1924 'ns0:VslmCreateSpecDiskFileBackingSpec') 1925 1926 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1927 '_create_fcd_backing_spec') 1928 def test_create_fcd(self, create_fcd_backing_spec): 1929 spec = mock.Mock() 1930 self.session.vim.client.factory.create.return_value = spec 1931 1932 backing_spec = mock.sentinel.backing_spec 1933 create_fcd_backing_spec.return_value = backing_spec 1934 1935 task = mock.sentinel.task 1936 self.session.invoke_api.return_value = task 1937 1938 task_info = mock.Mock() 1939 fcd_id = mock.sentinel.fcd_id 1940 task_info.result.config.id.id = fcd_id 1941 self.session.wait_for_task.return_value = task_info 1942 1943 name = mock.sentinel.name 1944 size_mb = 1024 1945 ds_ref_val = mock.sentinel.ds_ref_val 1946 ds_ref = mock.Mock(value=ds_ref_val) 1947 disk_type = mock.sentinel.disk_type 1948 ret = self.vops.create_fcd(name, size_mb, ds_ref, disk_type) 1949 1950 self.assertEqual(fcd_id, ret.fcd_id) 1951 self.assertEqual(ds_ref_val, ret.ds_ref_val) 1952 self.session.vim.client.factory.create.assert_called_once_with( 1953 'ns0:VslmCreateSpec') 1954 create_fcd_backing_spec.assert_called_once_with(disk_type, ds_ref) 1955 self.assertEqual(1024, spec.capacityInMB) 1956 self.assertEqual(name, spec.name) 1957 self.assertEqual(backing_spec, spec.backingSpec) 1958 self.session.invoke_api.assert_called_once_with( 1959 self.session.vim, 1960 'CreateDisk_Task', 1961 self.session.vim.service_content.vStorageObjectManager, 1962 spec=spec) 1963 self.session.wait_for_task.assert_called_once_with(task) 1964 1965 def test_delete_fcd(self): 1966 task = mock.sentinel.task 1967 self.session.invoke_api.return_value = task 1968 1969 fcd_location = mock.Mock() 1970 fcd_id = mock.sentinel.fcd_id 1971 fcd_location.id.return_value = fcd_id 1972 ds_ref = mock.sentinel.ds_ref 1973 fcd_location.ds_ref.return_value = ds_ref 1974 1975 self.vops.delete_fcd(fcd_location) 1976 self.session.invoke_api.assert_called_once_with( 1977 self.session.vim, 1978 'DeleteVStorageObject_Task', 1979 self.session.vim.service_content.vStorageObjectManager, 1980 id=fcd_id, 1981 datastore=ds_ref) 1982 self.session.wait_for_task(task) 1983 1984 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 1985 '_create_fcd_backing_spec') 1986 def test_clone_fcd(self, create_fcd_backing_spec): 1987 spec = mock.Mock() 1988 self.session.vim.client.factory.create.return_value = spec 1989 1990 backing_spec = mock.sentinel.backing_spec 1991 create_fcd_backing_spec.return_value = backing_spec 1992 1993 task = mock.sentinel.task 1994 self.session.invoke_api.return_value = task 1995 1996 task_info = mock.Mock() 1997 fcd_id = mock.sentinel.fcd_id 1998 task_info.result.config.id.id = fcd_id 1999 self.session.wait_for_task.return_value = task_info 2000 2001 fcd_location = mock.Mock() 2002 fcd_id = mock.sentinel.fcd_id 2003 fcd_location.id.return_value = fcd_id 2004 ds_ref = mock.sentinel.ds_ref 2005 fcd_location.ds_ref.return_value = ds_ref 2006 2007 name = mock.sentinel.name 2008 dest_ds_ref_val = mock.sentinel.dest_ds_ref_val 2009 dest_ds_ref = mock.Mock(value=dest_ds_ref_val) 2010 disk_type = mock.sentinel.disk_type 2011 ret = self.vops.clone_fcd(name, fcd_location, dest_ds_ref, disk_type) 2012 2013 self.assertEqual(fcd_id, ret.fcd_id) 2014 self.assertEqual(dest_ds_ref_val, ret.ds_ref_val) 2015 self.session.vim.client.factory.create.assert_called_once_with( 2016 'ns0:VslmCloneSpec') 2017 create_fcd_backing_spec.assert_called_once_with(disk_type, dest_ds_ref) 2018 self.assertEqual(name, spec.name) 2019 self.assertEqual(backing_spec, spec.backingSpec) 2020 self.session.invoke_api.assert_called_once_with( 2021 self.session.vim, 2022 'CloneVStorageObject_Task', 2023 self.session.vim.service_content.vStorageObjectManager, 2024 id=fcd_id, 2025 datastore=ds_ref, 2026 spec=spec) 2027 self.session.wait_for_task.assert_called_once_with(task) 2028 2029 def test_extend_fcd(self): 2030 task = mock.sentinel.task 2031 self.session.invoke_api.return_value = task 2032 2033 fcd_location = mock.Mock() 2034 fcd_id = mock.sentinel.fcd_id 2035 fcd_location.id.return_value = fcd_id 2036 ds_ref = mock.sentinel.ds_ref 2037 fcd_location.ds_ref.return_value = ds_ref 2038 2039 new_size_mb = 1024 2040 self.vops.extend_fcd(fcd_location, new_size_mb) 2041 self.session.invoke_api.assert_called_once_with( 2042 self.session.vim, 2043 'ExtendDisk_Task', 2044 self.session.vim.service_content.vStorageObjectManager, 2045 id=fcd_id, 2046 datastore=ds_ref, 2047 newCapacityInMB=new_size_mb) 2048 self.session.wait_for_task(task) 2049 2050 def test_register_disk(self): 2051 fcd = mock.Mock() 2052 fcd_id = mock.sentinel.fcd_id 2053 fcd.config.id = mock.Mock(id=fcd_id) 2054 self.session.invoke_api.return_value = fcd 2055 2056 vmdk_url = mock.sentinel.vmdk_url 2057 name = mock.sentinel.name 2058 ds_ref_val = mock.sentinel.ds_ref_val 2059 ds_ref = mock.Mock(value=ds_ref_val) 2060 ret = self.vops.register_disk(vmdk_url, name, ds_ref) 2061 2062 self.assertEqual(fcd_id, ret.fcd_id) 2063 self.assertEqual(ds_ref_val, ret.ds_ref_val) 2064 self.session.invoke_api.assert_called_once_with( 2065 self.session.vim, 2066 'RegisterDisk', 2067 self.session.vim.service_content.vStorageObjectManager, 2068 path=vmdk_url, 2069 name=name) 2070 2071 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 2072 '_create_controller_config_spec') 2073 @mock.patch('cinder.volume.drivers.vmware.volumeops.VMwareVolumeOps.' 2074 '_reconfigure_backing') 2075 def test_attach_fcd(self, reconfigure_backing, create_controller_spec): 2076 reconfig_spec = mock.Mock() 2077 self.session.vim.client.factory.create.return_value = reconfig_spec 2078 spec = mock.Mock() 2079 create_controller_spec.return_value = spec 2080 2081 task = mock.sentinel.task 2082 self.session.invoke_api.return_value = task 2083 2084 backing = mock.sentinel.backing 2085 fcd_location = mock.Mock() 2086 fcd_id = mock.sentinel.fcd_id 2087 fcd_location.id.return_value = fcd_id 2088 ds_ref = mock.sentinel.ds_ref 2089 fcd_location.ds_ref.return_value = ds_ref 2090 self.vops.attach_fcd(backing, fcd_location) 2091 2092 self.session.vim.client.factory.create.assert_called_once_with( 2093 'ns0:VirtualMachineConfigSpec') 2094 create_controller_spec.assert_called_once_with( 2095 volumeops.VirtualDiskAdapterType.LSI_LOGIC) 2096 self.assertEqual([spec], reconfig_spec.deviceChange) 2097 reconfigure_backing.assert_called_once_with(backing, reconfig_spec) 2098 self.session.invoke_api.assert_called_once_with( 2099 self.session.vim, 2100 'AttachDisk_Task', 2101 backing, 2102 diskId=fcd_id, 2103 datastore=ds_ref) 2104 self.session.wait_for_task.assert_called_once_with(task) 2105 2106 def test_detach_fcd(self): 2107 task = mock.sentinel.task 2108 self.session.invoke_api.return_value = task 2109 2110 backing = mock.sentinel.backing 2111 fcd_location = mock.Mock() 2112 fcd_id = mock.sentinel.fcd_id 2113 fcd_location.id.return_value = fcd_id 2114 self.vops.detach_fcd(backing, fcd_location) 2115 2116 self.session.invoke_api.assert_called_once_with( 2117 self.session.vim, 2118 'DetachDisk_Task', 2119 backing, 2120 diskId=fcd_id) 2121 self.session.wait_for_task.assert_called_once_with(task) 2122 2123 2124class VirtualDiskPathTest(test.TestCase): 2125 """Unit tests for VirtualDiskPath.""" 2126 2127 def setUp(self): 2128 super(VirtualDiskPathTest, self).setUp() 2129 self._path = volumeops.VirtualDiskPath("nfs", "A/B/", "disk") 2130 2131 def test_get_datastore_file_path(self): 2132 self.assertEqual("[nfs] A/B/disk.vmdk", 2133 self._path.get_datastore_file_path("nfs", 2134 "A/B/disk.vmdk")) 2135 2136 def test_get_descriptor_file_path(self): 2137 self.assertEqual("A/B/disk.vmdk", 2138 self._path.get_descriptor_file_path()) 2139 2140 def test_get_descriptor_ds_file_path(self): 2141 self.assertEqual("[nfs] A/B/disk.vmdk", 2142 self._path.get_descriptor_ds_file_path()) 2143 2144 2145class FlatExtentVirtualDiskPathTest(test.TestCase): 2146 """Unit tests for FlatExtentVirtualDiskPath.""" 2147 2148 def setUp(self): 2149 super(FlatExtentVirtualDiskPathTest, self).setUp() 2150 self._path = volumeops.FlatExtentVirtualDiskPath("nfs", "A/B/", "disk") 2151 2152 def test_get_flat_extent_file_path(self): 2153 self.assertEqual("A/B/disk-flat.vmdk", 2154 self._path.get_flat_extent_file_path()) 2155 2156 def test_get_flat_extent_ds_file_path(self): 2157 self.assertEqual("[nfs] A/B/disk-flat.vmdk", 2158 self._path.get_flat_extent_ds_file_path()) 2159 2160 2161class VirtualDiskTypeTest(test.TestCase): 2162 """Unit tests for VirtualDiskType.""" 2163 2164 def test_is_valid(self): 2165 self.assertTrue(volumeops.VirtualDiskType.is_valid("thick")) 2166 self.assertTrue(volumeops.VirtualDiskType.is_valid("thin")) 2167 self.assertTrue(volumeops.VirtualDiskType.is_valid("eagerZeroedThick")) 2168 self.assertFalse(volumeops.VirtualDiskType.is_valid("preallocated")) 2169 2170 def test_validate(self): 2171 volumeops.VirtualDiskType.validate("thick") 2172 volumeops.VirtualDiskType.validate("thin") 2173 volumeops.VirtualDiskType.validate("eagerZeroedThick") 2174 self.assertRaises(vmdk_exceptions.InvalidDiskTypeException, 2175 volumeops.VirtualDiskType.validate, 2176 "preallocated") 2177 2178 def test_get_virtual_disk_type(self): 2179 self.assertEqual("preallocated", 2180 volumeops.VirtualDiskType.get_virtual_disk_type( 2181 "thick")) 2182 self.assertEqual("thin", 2183 volumeops.VirtualDiskType.get_virtual_disk_type( 2184 "thin")) 2185 self.assertEqual("eagerZeroedThick", 2186 volumeops.VirtualDiskType.get_virtual_disk_type( 2187 "eagerZeroedThick")) 2188 self.assertRaises(vmdk_exceptions.InvalidDiskTypeException, 2189 volumeops.VirtualDiskType.get_virtual_disk_type, 2190 "preallocated") 2191 2192 2193class VirtualDiskAdapterTypeTest(test.TestCase): 2194 """Unit tests for VirtualDiskAdapterType.""" 2195 2196 def test_is_valid(self): 2197 self.assertTrue(volumeops.VirtualDiskAdapterType.is_valid("lsiLogic")) 2198 self.assertTrue(volumeops.VirtualDiskAdapterType.is_valid("busLogic")) 2199 self.assertTrue(volumeops.VirtualDiskAdapterType.is_valid( 2200 "lsiLogicsas")) 2201 self.assertTrue( 2202 volumeops.VirtualDiskAdapterType.is_valid("paraVirtual")) 2203 self.assertTrue(volumeops.VirtualDiskAdapterType.is_valid("ide")) 2204 self.assertFalse(volumeops.VirtualDiskAdapterType.is_valid("pvscsi")) 2205 2206 def test_validate(self): 2207 volumeops.VirtualDiskAdapterType.validate("lsiLogic") 2208 volumeops.VirtualDiskAdapterType.validate("busLogic") 2209 volumeops.VirtualDiskAdapterType.validate("lsiLogicsas") 2210 volumeops.VirtualDiskAdapterType.validate("paraVirtual") 2211 volumeops.VirtualDiskAdapterType.validate("ide") 2212 self.assertRaises(vmdk_exceptions.InvalidAdapterTypeException, 2213 volumeops.VirtualDiskAdapterType.validate, 2214 "pvscsi") 2215 2216 def test_get_adapter_type(self): 2217 self.assertEqual("lsiLogic", 2218 volumeops.VirtualDiskAdapterType.get_adapter_type( 2219 "lsiLogic")) 2220 self.assertEqual("busLogic", 2221 volumeops.VirtualDiskAdapterType.get_adapter_type( 2222 "busLogic")) 2223 self.assertEqual("lsiLogic", 2224 volumeops.VirtualDiskAdapterType.get_adapter_type( 2225 "lsiLogicsas")) 2226 self.assertEqual("lsiLogic", 2227 volumeops.VirtualDiskAdapterType.get_adapter_type( 2228 "paraVirtual")) 2229 self.assertEqual("ide", 2230 volumeops.VirtualDiskAdapterType.get_adapter_type( 2231 "ide")) 2232 self.assertRaises(vmdk_exceptions.InvalidAdapterTypeException, 2233 volumeops.VirtualDiskAdapterType.get_adapter_type, 2234 "pvscsi") 2235 2236 2237class ControllerTypeTest(test.TestCase): 2238 """Unit tests for ControllerType.""" 2239 2240 def test_get_controller_type(self): 2241 self.assertEqual(volumeops.ControllerType.LSI_LOGIC, 2242 volumeops.ControllerType.get_controller_type( 2243 'lsiLogic')) 2244 self.assertEqual(volumeops.ControllerType.BUS_LOGIC, 2245 volumeops.ControllerType.get_controller_type( 2246 'busLogic')) 2247 self.assertEqual(volumeops.ControllerType.LSI_LOGIC_SAS, 2248 volumeops.ControllerType.get_controller_type( 2249 'lsiLogicsas')) 2250 self.assertEqual(volumeops.ControllerType.PARA_VIRTUAL, 2251 volumeops.ControllerType.get_controller_type( 2252 'paraVirtual')) 2253 self.assertEqual(volumeops.ControllerType.IDE, 2254 volumeops.ControllerType.get_controller_type( 2255 'ide')) 2256 self.assertRaises(vmdk_exceptions.InvalidAdapterTypeException, 2257 volumeops.ControllerType.get_controller_type, 2258 'invalid_type') 2259 2260 def test_is_scsi_controller(self): 2261 self.assertTrue(volumeops.ControllerType.is_scsi_controller( 2262 volumeops.ControllerType.LSI_LOGIC)) 2263 self.assertTrue(volumeops.ControllerType.is_scsi_controller( 2264 volumeops.ControllerType.BUS_LOGIC)) 2265 self.assertTrue(volumeops.ControllerType.is_scsi_controller( 2266 volumeops.ControllerType.LSI_LOGIC_SAS)) 2267 self.assertTrue(volumeops.ControllerType.is_scsi_controller( 2268 volumeops.ControllerType.PARA_VIRTUAL)) 2269 self.assertFalse(volumeops.ControllerType.is_scsi_controller( 2270 volumeops.ControllerType.IDE)) 2271 2272 2273class FcdLocationTest(test.TestCase): 2274 """Unit tests for FcdLocation.""" 2275 2276 def test_create(self): 2277 fcd_id = mock.sentinel.fcd_id 2278 fcd_id_obj = mock.Mock(id=fcd_id) 2279 ds_ref_val = mock.sentinel.ds_ref_val 2280 ds_ref = mock.Mock(value=ds_ref_val) 2281 fcd_loc = volumeops.FcdLocation.create(fcd_id_obj, ds_ref) 2282 self.assertEqual(fcd_id, fcd_loc.fcd_id) 2283 self.assertEqual(ds_ref_val, fcd_loc.ds_ref_val) 2284 2285 def test_provider_location(self): 2286 fcd_loc = volumeops.FcdLocation('123', 'ds1') 2287 self.assertEqual('123@ds1', fcd_loc.provider_location()) 2288 2289 def test_ds_ref(self): 2290 fcd_loc = volumeops.FcdLocation('123', 'ds1') 2291 ds_ref = fcd_loc.ds_ref() 2292 self.assertEqual('ds1', ds_ref.value) 2293 2294 def test_id(self): 2295 id_obj = mock.Mock() 2296 cf = mock.Mock() 2297 cf.create.return_value = id_obj 2298 2299 fcd_loc = volumeops.FcdLocation('123', 'ds1') 2300 fcd_id = fcd_loc.id(cf) 2301 self.assertEqual('123', fcd_id.id) 2302 cf.create.assert_called_once_with('ns0:ID') 2303 2304 def test_from_provider_location(self): 2305 fcd_loc = volumeops.FcdLocation.from_provider_location('123@ds1') 2306 self.assertEqual('123', fcd_loc.fcd_id) 2307 self.assertEqual('ds1', fcd_loc.ds_ref_val) 2308 2309 def test_str(self): 2310 fcd_loc = volumeops.FcdLocation('123', 'ds1') 2311 self.assertEqual('123@ds1', str(fcd_loc)) 2312