1# Copyright (c) 2016 Red Hat Inc. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15"""Tests for Volume Code.""" 16 17import ddt 18import mock 19import shutil 20import tempfile 21 22import os_brick 23from oslo_config import cfg 24from oslo_utils import importutils 25 26from cinder.brick.local_dev import lvm as brick_lvm 27from cinder import context 28from cinder import db 29from cinder import exception 30from cinder.image import image_utils 31from cinder import objects 32from cinder.objects import fields 33import cinder.policy 34from cinder import test 35from cinder.tests.unit import fake_constants as fake 36from cinder.tests.unit import fake_snapshot 37from cinder.tests.unit import fake_volume 38from cinder.tests.unit.image import fake as fake_image 39from cinder.tests.unit import utils as tests_utils 40from cinder import utils 41import cinder.volume 42from cinder.volume import configuration as conf 43from cinder.volume import driver 44from cinder.volume import manager 45from cinder.volume import rpcapi as volume_rpcapi 46import cinder.volume.targets.tgt 47from cinder.volume import utils as volutils 48 49 50CONF = cfg.CONF 51 52 53def my_safe_get(self, value): 54 if value == 'replication_device': 55 return ['replication'] 56 return None 57 58 59@ddt.ddt 60class DriverTestCase(test.TestCase): 61 62 @staticmethod 63 def _get_driver(relicated, version): 64 class NonReplicatedDriver(driver.VolumeDriver): 65 pass 66 67 class V21Driver(driver.VolumeDriver): 68 def failover_host(*args, **kwargs): 69 pass 70 71 class AADriver(V21Driver): 72 def failover_completed(*args, **kwargs): 73 pass 74 75 if not relicated: 76 return NonReplicatedDriver 77 78 if version == 'v2.1': 79 return V21Driver 80 81 return AADriver 82 83 @ddt.data('v2.1', 'a/a', 'newfeature') 84 def test_supports_replication_feature_none(self, rep_version): 85 my_driver = self._get_driver(False, None) 86 self.assertFalse(my_driver.supports_replication_feature(rep_version)) 87 88 @ddt.data('v2.1', 'a/a', 'newfeature') 89 def test_supports_replication_feature_only_21(self, rep_version): 90 version = 'v2.1' 91 my_driver = self._get_driver(True, version) 92 self.assertEqual(rep_version == version, 93 my_driver.supports_replication_feature(rep_version)) 94 95 @ddt.data('v2.1', 'a/a', 'newfeature') 96 def test_supports_replication_feature_aa(self, rep_version): 97 my_driver = self._get_driver(True, 'a/a') 98 self.assertEqual(rep_version in ('v2.1', 'a/a'), 99 my_driver.supports_replication_feature(rep_version)) 100 101 def test_init_non_replicated(self): 102 config = manager.config.Configuration(manager.volume_manager_opts, 103 config_group='volume') 104 # No exception raised 105 self._get_driver(False, None)(configuration=config) 106 107 @ddt.data('v2.1', 'a/a') 108 @mock.patch('cinder.volume.configuration.Configuration.safe_get', 109 my_safe_get) 110 def test_init_replicated_non_clustered(self, version): 111 def append_config_values(self, volume_opts): 112 pass 113 114 config = manager.config.Configuration(manager.volume_manager_opts, 115 config_group='volume') 116 # No exception raised 117 self._get_driver(True, version)(configuration=config) 118 119 @mock.patch('cinder.volume.configuration.Configuration.safe_get', 120 my_safe_get) 121 def test_init_replicated_clustered_not_supported(self): 122 config = manager.config.Configuration(manager.volume_manager_opts, 123 config_group='volume') 124 # Raises exception because we are trying to run a replicated service 125 # in clustered mode but the driver doesn't support it. 126 self.assertRaises(exception.Invalid, self._get_driver(True, 'v2.1'), 127 configuration=config, cluster_name='mycluster') 128 129 @mock.patch('cinder.volume.configuration.Configuration.safe_get', 130 my_safe_get) 131 def test_init_replicated_clustered_supported(self): 132 config = manager.config.Configuration(manager.volume_manager_opts, 133 config_group='volume') 134 # No exception raised 135 self._get_driver(True, 'a/a')(configuration=config, 136 cluster_name='mycluster') 137 138 def test_failover(self): 139 """Test default failover behavior of calling failover_host.""" 140 my_driver = self._get_driver(True, 'a/a')() 141 with mock.patch.object(my_driver, 'failover_host') as failover_mock: 142 res = my_driver.failover(mock.sentinel.context, 143 mock.sentinel.volumes, 144 secondary_id=mock.sentinel.secondary_id, 145 groups=[]) 146 self.assertEqual(failover_mock.return_value, res) 147 failover_mock.assert_called_once_with(mock.sentinel.context, 148 mock.sentinel.volumes, 149 mock.sentinel.secondary_id, 150 []) 151 152 153class BaseDriverTestCase(test.TestCase): 154 """Base Test class for Drivers.""" 155 driver_name = "cinder.volume.driver.FakeBaseDriver" 156 157 def setUp(self): 158 super(BaseDriverTestCase, self).setUp() 159 vol_tmpdir = tempfile.mkdtemp() 160 self.override_config('volume_driver', self.driver_name, 161 conf.SHARED_CONF_GROUP) 162 self.override_config('volumes_dir', vol_tmpdir, 163 conf.SHARED_CONF_GROUP) 164 self.volume = importutils.import_object(CONF.volume_manager) 165 self.context = context.get_admin_context() 166 self.output = "" 167 self.configuration = conf.Configuration(None) 168 self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True) 169 170 def _fake_execute(_command, *_args, **_kwargs): 171 """Fake _execute.""" 172 return self.output, None 173 exec_patcher = mock.patch.object(self.volume.driver, '_execute', 174 _fake_execute) 175 exec_patcher.start() 176 self.addCleanup(exec_patcher.stop) 177 self.volume.driver.set_initialized() 178 self.addCleanup(self._cleanup) 179 180 def _cleanup(self): 181 try: 182 shutil.rmtree(CONF.volumes_dir) 183 except OSError: 184 pass 185 186 def _attach_volume(self): 187 """Attach volumes to an instance.""" 188 return [] 189 190 191@ddt.ddt 192class GenericVolumeDriverTestCase(BaseDriverTestCase): 193 """Test case for VolumeDriver.""" 194 driver_name = "cinder.tests.fake_driver.FakeLoggingVolumeDriver" 195 196 def test_create_temp_cloned_volume(self): 197 with mock.patch.object( 198 self.volume.driver, 199 'create_cloned_volume') as mock_create_cloned_volume: 200 model_update = {'provider_location': 'dummy'} 201 mock_create_cloned_volume.return_value = model_update 202 vol = tests_utils.create_volume(self.context, 203 status='backing-up') 204 cloned_vol = self.volume.driver._create_temp_cloned_volume( 205 self.context, vol) 206 self.assertEqual('dummy', cloned_vol.provider_location) 207 self.assertEqual('available', cloned_vol.status) 208 209 mock_create_cloned_volume.return_value = None 210 vol = tests_utils.create_volume(self.context, 211 status='backing-up') 212 cloned_vol = self.volume.driver._create_temp_cloned_volume( 213 self.context, vol) 214 self.assertEqual('available', cloned_vol.status) 215 216 def test_get_backup_device_available(self): 217 vol = tests_utils.create_volume(self.context) 218 self.context.user_id = fake.USER_ID 219 self.context.project_id = fake.PROJECT_ID 220 backup_obj = tests_utils.create_backup(self.context, 221 vol['id']) 222 (backup_device, is_snapshot) = self.volume.driver.get_backup_device( 223 self.context, backup_obj) 224 volume = objects.Volume.get_by_id(self.context, vol.id) 225 self.assertNotIn('temporary', backup_device.admin_metadata.keys()) 226 self.assertEqual(volume, backup_device) 227 self.assertFalse(is_snapshot) 228 backup_obj.refresh() 229 self.assertIsNone(backup_obj.temp_volume_id) 230 231 def test_get_backup_device_in_use(self): 232 vol = tests_utils.create_volume(self.context, 233 status='backing-up', 234 previous_status='in-use') 235 admin_meta = {'temporary': 'True'} 236 temp_vol = tests_utils.create_volume(self.context, 237 admin_metadata=admin_meta) 238 self.context.user_id = fake.USER_ID 239 self.context.project_id = fake.PROJECT_ID 240 backup_obj = tests_utils.create_backup(self.context, 241 vol['id']) 242 with mock.patch.object( 243 self.volume.driver, 244 '_create_temp_cloned_volume') as mock_create_temp: 245 mock_create_temp.return_value = temp_vol 246 (backup_device, is_snapshot) = ( 247 self.volume.driver.get_backup_device(self.context, 248 backup_obj)) 249 self.assertEqual(temp_vol, backup_device) 250 self.assertFalse(is_snapshot) 251 backup_obj.refresh() 252 self.assertEqual(temp_vol.id, backup_obj.temp_volume_id) 253 254 def test_create_temp_volume_from_snapshot(self): 255 volume_dict = {'id': fake.SNAPSHOT_ID, 256 'host': 'fakehost', 257 'cluster_name': 'fakecluster', 258 'availability_zone': 'fakezone', 259 'size': 1} 260 vol = fake_volume.fake_volume_obj(self.context, **volume_dict) 261 snapshot = fake_snapshot.fake_snapshot_obj(self.context) 262 263 with mock.patch.object( 264 self.volume.driver, 265 'create_volume_from_snapshot'): 266 temp_vol = self.volume.driver._create_temp_volume_from_snapshot( 267 self.context, 268 vol, snapshot) 269 self.assertEqual(fields.VolumeAttachStatus.DETACHED, 270 temp_vol.attach_status) 271 self.assertEqual('fakezone', temp_vol.availability_zone) 272 self.assertEqual('fakecluster', temp_vol.cluster_name) 273 274 @mock.patch.object(utils, 'brick_get_connector_properties') 275 @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume') 276 @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume') 277 @mock.patch.object(volutils, 'copy_volume') 278 @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities') 279 @mock.patch.object(cinder.volume.volume_types, 280 'volume_types_encryption_changed') 281 @ddt.data(False, True) 282 def test_copy_volume_data_mgr(self, 283 encryption_changed, 284 mock_encryption_changed, 285 mock_get_capabilities, 286 mock_copy, 287 mock_detach, 288 mock_attach, 289 mock_get_connector): 290 """Test function of _copy_volume_data.""" 291 292 src_vol = tests_utils.create_volume(self.context, size=1, 293 host=CONF.host) 294 dest_vol = tests_utils.create_volume(self.context, size=1, 295 host=CONF.host) 296 mock_get_connector.return_value = {} 297 mock_encryption_changed.return_value = encryption_changed 298 self.volume.driver._throttle = mock.MagicMock() 299 300 attach_expected = [ 301 mock.call(self.context, dest_vol, {}, 302 remote=False, 303 attach_encryptor=encryption_changed), 304 mock.call(self.context, src_vol, {}, 305 remote=False, 306 attach_encryptor=encryption_changed)] 307 308 detach_expected = [ 309 mock.call(self.context, {'device': {'path': 'bar'}}, 310 dest_vol, {}, force=True, remote=False, 311 attach_encryptor=encryption_changed), 312 mock.call(self.context, {'device': {'path': 'foo'}}, 313 src_vol, {}, force=True, remote=False, 314 attach_encryptor=encryption_changed)] 315 316 attach_volume_returns = [ 317 {'device': {'path': 'bar'}}, 318 {'device': {'path': 'foo'}} 319 ] 320 321 # Test case for sparse_copy_volume = False 322 mock_attach.side_effect = attach_volume_returns 323 mock_get_capabilities.return_value = {} 324 self.volume._copy_volume_data(self.context, 325 src_vol, 326 dest_vol) 327 328 self.assertEqual(attach_expected, mock_attach.mock_calls) 329 mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False) 330 self.assertEqual(detach_expected, mock_detach.mock_calls) 331 332 # Test case for sparse_copy_volume = True 333 mock_attach.reset_mock() 334 mock_detach.reset_mock() 335 mock_attach.side_effect = attach_volume_returns 336 mock_get_capabilities.return_value = {'sparse_copy_volume': True} 337 self.volume._copy_volume_data(self.context, 338 src_vol, 339 dest_vol) 340 341 self.assertEqual(attach_expected, mock_attach.mock_calls) 342 mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True) 343 self.assertEqual(detach_expected, mock_detach.mock_calls) 344 345 # cleanup resource 346 db.volume_destroy(self.context, src_vol['id']) 347 db.volume_destroy(self.context, dest_vol['id']) 348 349 @mock.patch(driver_name + '.initialize_connection') 350 @mock.patch(driver_name + '.create_export', return_value=None) 351 @mock.patch(driver_name + '._connect_device') 352 def test_attach_volume_encrypted(self, connect_mock, export_mock, 353 initialize_mock): 354 properties = {'host': 'myhost', 'ip': '192.168.1.43', 355 'initiator': u'iqn.1994-05.com.redhat:d9be887375', 356 'multipath': False, 'os_type': 'linux2', 357 'platform': 'x86_64'} 358 359 data = {'target_discovered': True, 360 'target_iqn': 'iqn.2010-10.org.openstack:volume-00000001', 361 'target_portal': '127.0.0.0.1:3260', 362 'volume_id': 1, 363 'discard': False} 364 365 passed_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()} 366 initialize_mock.return_value = passed_conn 367 368 # _attach_volume adds the encrypted value based on the volume 369 expected_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()} 370 expected_conn['data']['encrypted'] = True 371 372 volume = tests_utils.create_volume( 373 self.context, status='available', 374 size=2, 375 encryption_key_id=fake.ENCRYPTION_KEY_ID) 376 377 attach_info, vol = self.volume.driver._attach_volume(self.context, 378 volume, 379 properties) 380 381 export_mock.assert_called_once_with(self.context, volume, properties) 382 initialize_mock.assert_called_once_with(volume, properties) 383 384 connect_mock.assert_called_once_with(expected_conn) 385 386 self.assertEqual(connect_mock.return_value, attach_info) 387 self.assertEqual(volume, vol) 388 389 @mock.patch.object(os_brick.initiator.connector, 390 'get_connector_properties') 391 @mock.patch.object(image_utils, 'fetch_to_raw') 392 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume') 393 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume') 394 @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor') 395 @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor') 396 def test_copy_image_to_encrypted_volume(self, 397 mock_detach_encryptor, 398 mock_attach_encryptor, 399 mock_detach_volume, 400 mock_attach_volume, 401 mock_fetch_to_raw, 402 mock_get_connector_properties): 403 properties = {} 404 volume = tests_utils.create_volume( 405 self.context, status='available', 406 size=2, 407 encryption_key_id=fake.ENCRYPTION_KEY_ID) 408 volume_id = volume['id'] 409 volume = db.volume_get(context.get_admin_context(), volume_id) 410 image_service = fake_image.FakeImageService() 411 local_path = 'dev/sda' 412 attach_info = {'device': {'path': local_path}, 413 'conn': {'driver_volume_type': 'iscsi', 414 'data': {}, }} 415 416 mock_get_connector_properties.return_value = properties 417 mock_attach_volume.return_value = [attach_info, volume] 418 419 self.volume.driver.copy_image_to_encrypted_volume( 420 self.context, volume, image_service, fake.IMAGE_ID) 421 422 encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID} 423 mock_attach_volume.assert_called_once_with( 424 self.context, volume, properties) 425 mock_attach_encryptor.assert_called_once_with( 426 self.context, attach_info, encryption) 427 mock_fetch_to_raw.assert_called_once_with( 428 self.context, image_service, fake.IMAGE_ID, 429 local_path, '1M', size=2) 430 mock_detach_encryptor.assert_called_once_with( 431 attach_info, encryption) 432 mock_detach_volume.assert_called_once_with( 433 self.context, attach_info, volume, properties, force=True) 434 435 @mock.patch.object(os_brick.initiator.connector, 436 'get_connector_properties') 437 @mock.patch.object(image_utils, 'fetch_to_raw') 438 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume') 439 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume') 440 @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor') 441 @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor') 442 def test_copy_image_to_encrypted_volume_failed_attach_encryptor( 443 self, 444 mock_detach_encryptor, 445 mock_attach_encryptor, 446 mock_detach_volume, 447 mock_attach_volume, 448 mock_fetch_to_raw, 449 mock_get_connector_properties): 450 properties = {} 451 volume = tests_utils.create_volume( 452 self.context, status='available', 453 size=2, 454 encryption_key_id=fake.ENCRYPTION_KEY_ID) 455 volume_id = volume['id'] 456 volume = db.volume_get(context.get_admin_context(), volume_id) 457 image_service = fake_image.FakeImageService() 458 attach_info = {'device': {'path': 'dev/sda'}, 459 'conn': {'driver_volume_type': 'iscsi', 460 'data': {}, }} 461 462 mock_get_connector_properties.return_value = properties 463 mock_attach_volume.return_value = [attach_info, volume] 464 raised_exception = os_brick.exception.VolumeEncryptionNotSupported( 465 volume_id = "123", 466 volume_type = "abc") 467 mock_attach_encryptor.side_effect = raised_exception 468 469 self.assertRaises(os_brick.exception.VolumeEncryptionNotSupported, 470 self.volume.driver.copy_image_to_encrypted_volume, 471 self.context, volume, image_service, fake.IMAGE_ID) 472 473 encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID} 474 mock_attach_volume.assert_called_once_with( 475 self.context, volume, properties) 476 mock_attach_encryptor.assert_called_once_with( 477 self.context, attach_info, encryption) 478 self.assertFalse(mock_fetch_to_raw.called) 479 self.assertFalse(mock_detach_encryptor.called) 480 mock_detach_volume.assert_called_once_with( 481 self.context, attach_info, volume, properties, force=True) 482 483 @mock.patch.object(os_brick.initiator.connector, 484 'get_connector_properties') 485 @mock.patch.object(image_utils, 'fetch_to_raw') 486 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume') 487 @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume') 488 @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor') 489 @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor') 490 @ddt.data(exception.ImageUnacceptable( 491 reason='fake', image_id=fake.IMAGE_ID), 492 exception.ImageTooBig( 493 reason='fake image size exceeded', image_id=fake.IMAGE_ID)) 494 def test_copy_image_to_encrypted_volume_failed_fetch( 495 self, excep, 496 mock_detach_encryptor, mock_attach_encryptor, 497 mock_detach_volume, mock_attach_volume, mock_fetch_to_raw, 498 mock_get_connector_properties): 499 properties = {} 500 volume = tests_utils.create_volume( 501 self.context, status='available', 502 size=2, 503 encryption_key_id=fake.ENCRYPTION_KEY_ID) 504 volume_id = volume['id'] 505 volume = db.volume_get(context.get_admin_context(), volume_id) 506 image_service = fake_image.FakeImageService() 507 local_path = 'dev/sda' 508 attach_info = {'device': {'path': local_path}, 509 'conn': {'driver_volume_type': 'iscsi', 510 'data': {}, }} 511 512 mock_get_connector_properties.return_value = properties 513 mock_attach_volume.return_value = [attach_info, volume] 514 mock_fetch_to_raw.side_effect = excep 515 516 encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID} 517 self.assertRaises(type(excep), 518 self.volume.driver.copy_image_to_encrypted_volume, 519 self.context, volume, image_service, fake.IMAGE_ID) 520 521 mock_attach_volume.assert_called_once_with( 522 self.context, volume, properties) 523 mock_attach_encryptor.assert_called_once_with( 524 self.context, attach_info, encryption) 525 mock_fetch_to_raw.assert_called_once_with( 526 self.context, image_service, fake.IMAGE_ID, 527 local_path, '1M', size=2) 528 mock_detach_encryptor.assert_called_once_with( 529 attach_info, encryption) 530 mock_detach_volume.assert_called_once_with( 531 self.context, attach_info, volume, properties, force=True) 532 533 @mock.patch('cinder.volume.driver.brick_exception') 534 @mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.' 535 'terminate_connection', side_effect=Exception) 536 @mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.' 537 'remove_export', side_effect=Exception) 538 def test_detach_volume_force(self, remove_mock, terminate_mock, exc_mock): 539 """Test force parameter on _detach_volume. 540 541 On the driver if we receive the force parameter we will do everything 542 even with Exceptions on disconnect, terminate, and remove export. 543 """ 544 connector = mock.Mock() 545 connector.disconnect_volume.side_effect = Exception 546 # TODO(geguileo): Remove this ExceptionChainer simulation once we 547 # release OS-Brick version with it and bump min version. 548 exc = exc_mock.ExceptionChainer.return_value 549 exc.context.return_value.__enter__.return_value = exc 550 exc.context.return_value.__exit__.return_value = True 551 552 volume = {'id': fake.VOLUME_ID} 553 attach_info = {'device': {}, 554 'connector': connector, 555 'conn': {'data': {}, }} 556 557 # TODO(geguileo): Change TypeError to ExceptionChainer once we release 558 # OS-Brick version with it and bump min version. 559 self.assertRaises(TypeError, 560 self.volume.driver._detach_volume, self.context, 561 attach_info, volume, {}, force=True) 562 563 self.assertTrue(connector.disconnect_volume.called) 564 self.assertTrue(remove_mock.called) 565 self.assertTrue(terminate_mock.called) 566 self.assertEqual(3, exc.context.call_count) 567 568 @ddt.data({'cfg_value': '10', 'valid': True}, 569 {'cfg_value': 'auto', 'valid': True}, 570 {'cfg_value': '1', 'valid': True}, 571 {'cfg_value': '1.2', 'valid': True}, 572 {'cfg_value': '100', 'valid': True}, 573 {'cfg_value': '20.15', 'valid': True}, 574 {'cfg_value': 'True', 'valid': False}, 575 {'cfg_value': 'False', 'valid': False}, 576 {'cfg_value': '10.0.0', 'valid': False}, 577 {'cfg_value': '0.00', 'valid': True}, 578 {'cfg_value': 'anything', 'valid': False},) 579 @ddt.unpack 580 def test_auto_max_subscription_ratio_options(self, cfg_value, valid): 581 # This tests the max_over_subscription_ratio option as it is now 582 # checked by a regex 583 def _set_conf(config, value): 584 config.set_override('max_over_subscription_ratio', value) 585 586 config = conf.Configuration(None) 587 config.append_config_values(driver.volume_opts) 588 589 if valid: 590 _set_conf(config, cfg_value) 591 self.assertEqual(cfg_value, config.safe_get( 592 'max_over_subscription_ratio')) 593 else: 594 self.assertRaises(ValueError, _set_conf, config, cfg_value) 595 596 597class FibreChannelTestCase(BaseDriverTestCase): 598 """Test Case for FibreChannelDriver.""" 599 driver_name = "cinder.volume.driver.FibreChannelDriver" 600 601 def test_initialize_connection(self): 602 self.assertRaises(NotImplementedError, 603 self.volume.driver.initialize_connection, {}, {}) 604 605 def test_validate_connector(self): 606 """validate_connector() successful use case. 607 608 validate_connector() does not throw an exception when 609 wwpns and wwnns are both set and both are not empty. 610 """ 611 connector = {'wwpns': ["not empty"], 612 'wwnns': ["not empty"]} 613 self.volume.driver.validate_connector(connector) 614 615 def test_validate_connector_no_wwpns(self): 616 """validate_connector() throws exception when it has no wwpns.""" 617 connector = {'wwnns': ["not empty"]} 618 self.assertRaises(exception.InvalidConnectorException, 619 self.volume.driver.validate_connector, connector) 620 621 def test_validate_connector_empty_wwpns(self): 622 """validate_connector() throws exception when it has empty wwpns.""" 623 connector = {'wwpns': [], 624 'wwnns': ["not empty"]} 625 self.assertRaises(exception.InvalidConnectorException, 626 self.volume.driver.validate_connector, connector) 627 628 def test_validate_connector_no_wwnns(self): 629 """validate_connector() throws exception when it has no wwnns.""" 630 connector = {'wwpns': ["not empty"]} 631 self.assertRaises(exception.InvalidConnectorException, 632 self.volume.driver.validate_connector, connector) 633 634 def test_validate_connector_empty_wwnns(self): 635 """validate_connector() throws exception when it has empty wwnns.""" 636 connector = {'wwnns': [], 637 'wwpns': ["not empty"]} 638 self.assertRaises(exception.InvalidConnectorException, 639 self.volume.driver.validate_connector, connector) 640