1# Copyright (c) 2016 Red Hat Inc.
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15"""Tests for Volume Code."""
16
17import ddt
18import mock
19import shutil
20import tempfile
21
22import os_brick
23from oslo_config import cfg
24from oslo_utils import importutils
25
26from cinder.brick.local_dev import lvm as brick_lvm
27from cinder import context
28from cinder import db
29from cinder import exception
30from cinder.image import image_utils
31from cinder import objects
32from cinder.objects import fields
33import cinder.policy
34from cinder import test
35from cinder.tests.unit import fake_constants as fake
36from cinder.tests.unit import fake_snapshot
37from cinder.tests.unit import fake_volume
38from cinder.tests.unit.image import fake as fake_image
39from cinder.tests.unit import utils as tests_utils
40from cinder import utils
41import cinder.volume
42from cinder.volume import configuration as conf
43from cinder.volume import driver
44from cinder.volume import manager
45from cinder.volume import rpcapi as volume_rpcapi
46import cinder.volume.targets.tgt
47from cinder.volume import utils as volutils
48
49
50CONF = cfg.CONF
51
52
53def my_safe_get(self, value):
54    if value == 'replication_device':
55        return ['replication']
56    return None
57
58
59@ddt.ddt
60class DriverTestCase(test.TestCase):
61
62    @staticmethod
63    def _get_driver(relicated, version):
64        class NonReplicatedDriver(driver.VolumeDriver):
65            pass
66
67        class V21Driver(driver.VolumeDriver):
68            def failover_host(*args, **kwargs):
69                pass
70
71        class AADriver(V21Driver):
72            def failover_completed(*args, **kwargs):
73                pass
74
75        if not relicated:
76            return NonReplicatedDriver
77
78        if version == 'v2.1':
79            return V21Driver
80
81        return AADriver
82
83    @ddt.data('v2.1', 'a/a', 'newfeature')
84    def test_supports_replication_feature_none(self, rep_version):
85        my_driver = self._get_driver(False, None)
86        self.assertFalse(my_driver.supports_replication_feature(rep_version))
87
88    @ddt.data('v2.1', 'a/a', 'newfeature')
89    def test_supports_replication_feature_only_21(self, rep_version):
90        version = 'v2.1'
91        my_driver = self._get_driver(True, version)
92        self.assertEqual(rep_version == version,
93                         my_driver.supports_replication_feature(rep_version))
94
95    @ddt.data('v2.1', 'a/a', 'newfeature')
96    def test_supports_replication_feature_aa(self, rep_version):
97        my_driver = self._get_driver(True, 'a/a')
98        self.assertEqual(rep_version in ('v2.1', 'a/a'),
99                         my_driver.supports_replication_feature(rep_version))
100
101    def test_init_non_replicated(self):
102        config = manager.config.Configuration(manager.volume_manager_opts,
103                                              config_group='volume')
104        # No exception raised
105        self._get_driver(False, None)(configuration=config)
106
107    @ddt.data('v2.1', 'a/a')
108    @mock.patch('cinder.volume.configuration.Configuration.safe_get',
109                my_safe_get)
110    def test_init_replicated_non_clustered(self, version):
111        def append_config_values(self, volume_opts):
112            pass
113
114        config = manager.config.Configuration(manager.volume_manager_opts,
115                                              config_group='volume')
116        # No exception raised
117        self._get_driver(True, version)(configuration=config)
118
119    @mock.patch('cinder.volume.configuration.Configuration.safe_get',
120                my_safe_get)
121    def test_init_replicated_clustered_not_supported(self):
122        config = manager.config.Configuration(manager.volume_manager_opts,
123                                              config_group='volume')
124        # Raises exception because we are trying to run a replicated service
125        # in clustered mode but the driver doesn't support it.
126        self.assertRaises(exception.Invalid, self._get_driver(True, 'v2.1'),
127                          configuration=config, cluster_name='mycluster')
128
129    @mock.patch('cinder.volume.configuration.Configuration.safe_get',
130                my_safe_get)
131    def test_init_replicated_clustered_supported(self):
132        config = manager.config.Configuration(manager.volume_manager_opts,
133                                              config_group='volume')
134        # No exception raised
135        self._get_driver(True, 'a/a')(configuration=config,
136                                      cluster_name='mycluster')
137
138    def test_failover(self):
139        """Test default failover behavior of calling failover_host."""
140        my_driver = self._get_driver(True, 'a/a')()
141        with mock.patch.object(my_driver, 'failover_host') as failover_mock:
142            res = my_driver.failover(mock.sentinel.context,
143                                     mock.sentinel.volumes,
144                                     secondary_id=mock.sentinel.secondary_id,
145                                     groups=[])
146        self.assertEqual(failover_mock.return_value, res)
147        failover_mock.assert_called_once_with(mock.sentinel.context,
148                                              mock.sentinel.volumes,
149                                              mock.sentinel.secondary_id,
150                                              [])
151
152
153class BaseDriverTestCase(test.TestCase):
154    """Base Test class for Drivers."""
155    driver_name = "cinder.volume.driver.FakeBaseDriver"
156
157    def setUp(self):
158        super(BaseDriverTestCase, self).setUp()
159        vol_tmpdir = tempfile.mkdtemp()
160        self.override_config('volume_driver', self.driver_name,
161                             conf.SHARED_CONF_GROUP)
162        self.override_config('volumes_dir', vol_tmpdir,
163                             conf.SHARED_CONF_GROUP)
164        self.volume = importutils.import_object(CONF.volume_manager)
165        self.context = context.get_admin_context()
166        self.output = ""
167        self.configuration = conf.Configuration(None)
168        self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
169
170        def _fake_execute(_command, *_args, **_kwargs):
171            """Fake _execute."""
172            return self.output, None
173        exec_patcher = mock.patch.object(self.volume.driver, '_execute',
174                                         _fake_execute)
175        exec_patcher.start()
176        self.addCleanup(exec_patcher.stop)
177        self.volume.driver.set_initialized()
178        self.addCleanup(self._cleanup)
179
180    def _cleanup(self):
181        try:
182            shutil.rmtree(CONF.volumes_dir)
183        except OSError:
184            pass
185
186    def _attach_volume(self):
187        """Attach volumes to an instance."""
188        return []
189
190
191@ddt.ddt
192class GenericVolumeDriverTestCase(BaseDriverTestCase):
193    """Test case for VolumeDriver."""
194    driver_name = "cinder.tests.fake_driver.FakeLoggingVolumeDriver"
195
196    def test_create_temp_cloned_volume(self):
197        with mock.patch.object(
198                self.volume.driver,
199                'create_cloned_volume') as mock_create_cloned_volume:
200            model_update = {'provider_location': 'dummy'}
201            mock_create_cloned_volume.return_value = model_update
202            vol = tests_utils.create_volume(self.context,
203                                            status='backing-up')
204            cloned_vol = self.volume.driver._create_temp_cloned_volume(
205                self.context, vol)
206            self.assertEqual('dummy', cloned_vol.provider_location)
207            self.assertEqual('available', cloned_vol.status)
208
209            mock_create_cloned_volume.return_value = None
210            vol = tests_utils.create_volume(self.context,
211                                            status='backing-up')
212            cloned_vol = self.volume.driver._create_temp_cloned_volume(
213                self.context, vol)
214            self.assertEqual('available', cloned_vol.status)
215
216    def test_get_backup_device_available(self):
217        vol = tests_utils.create_volume(self.context)
218        self.context.user_id = fake.USER_ID
219        self.context.project_id = fake.PROJECT_ID
220        backup_obj = tests_utils.create_backup(self.context,
221                                               vol['id'])
222        (backup_device, is_snapshot) = self.volume.driver.get_backup_device(
223            self.context, backup_obj)
224        volume = objects.Volume.get_by_id(self.context, vol.id)
225        self.assertNotIn('temporary', backup_device.admin_metadata.keys())
226        self.assertEqual(volume, backup_device)
227        self.assertFalse(is_snapshot)
228        backup_obj.refresh()
229        self.assertIsNone(backup_obj.temp_volume_id)
230
231    def test_get_backup_device_in_use(self):
232        vol = tests_utils.create_volume(self.context,
233                                        status='backing-up',
234                                        previous_status='in-use')
235        admin_meta = {'temporary': 'True'}
236        temp_vol = tests_utils.create_volume(self.context,
237                                             admin_metadata=admin_meta)
238        self.context.user_id = fake.USER_ID
239        self.context.project_id = fake.PROJECT_ID
240        backup_obj = tests_utils.create_backup(self.context,
241                                               vol['id'])
242        with mock.patch.object(
243                self.volume.driver,
244                '_create_temp_cloned_volume') as mock_create_temp:
245            mock_create_temp.return_value = temp_vol
246            (backup_device, is_snapshot) = (
247                self.volume.driver.get_backup_device(self.context,
248                                                     backup_obj))
249            self.assertEqual(temp_vol, backup_device)
250            self.assertFalse(is_snapshot)
251            backup_obj.refresh()
252            self.assertEqual(temp_vol.id, backup_obj.temp_volume_id)
253
254    def test_create_temp_volume_from_snapshot(self):
255        volume_dict = {'id': fake.SNAPSHOT_ID,
256                       'host': 'fakehost',
257                       'cluster_name': 'fakecluster',
258                       'availability_zone': 'fakezone',
259                       'size': 1}
260        vol = fake_volume.fake_volume_obj(self.context, **volume_dict)
261        snapshot = fake_snapshot.fake_snapshot_obj(self.context)
262
263        with mock.patch.object(
264                self.volume.driver,
265                'create_volume_from_snapshot'):
266            temp_vol = self.volume.driver._create_temp_volume_from_snapshot(
267                self.context,
268                vol, snapshot)
269            self.assertEqual(fields.VolumeAttachStatus.DETACHED,
270                             temp_vol.attach_status)
271            self.assertEqual('fakezone', temp_vol.availability_zone)
272            self.assertEqual('fakecluster', temp_vol.cluster_name)
273
274    @mock.patch.object(utils, 'brick_get_connector_properties')
275    @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
276    @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
277    @mock.patch.object(volutils, 'copy_volume')
278    @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
279    @mock.patch.object(cinder.volume.volume_types,
280                       'volume_types_encryption_changed')
281    @ddt.data(False, True)
282    def test_copy_volume_data_mgr(self,
283                                  encryption_changed,
284                                  mock_encryption_changed,
285                                  mock_get_capabilities,
286                                  mock_copy,
287                                  mock_detach,
288                                  mock_attach,
289                                  mock_get_connector):
290        """Test function of _copy_volume_data."""
291
292        src_vol = tests_utils.create_volume(self.context, size=1,
293                                            host=CONF.host)
294        dest_vol = tests_utils.create_volume(self.context, size=1,
295                                             host=CONF.host)
296        mock_get_connector.return_value = {}
297        mock_encryption_changed.return_value = encryption_changed
298        self.volume.driver._throttle = mock.MagicMock()
299
300        attach_expected = [
301            mock.call(self.context, dest_vol, {},
302                      remote=False,
303                      attach_encryptor=encryption_changed),
304            mock.call(self.context, src_vol, {},
305                      remote=False,
306                      attach_encryptor=encryption_changed)]
307
308        detach_expected = [
309            mock.call(self.context, {'device': {'path': 'bar'}},
310                      dest_vol, {}, force=True, remote=False,
311                      attach_encryptor=encryption_changed),
312            mock.call(self.context, {'device': {'path': 'foo'}},
313                      src_vol, {}, force=True, remote=False,
314                      attach_encryptor=encryption_changed)]
315
316        attach_volume_returns = [
317            {'device': {'path': 'bar'}},
318            {'device': {'path': 'foo'}}
319        ]
320
321        #  Test case for sparse_copy_volume = False
322        mock_attach.side_effect = attach_volume_returns
323        mock_get_capabilities.return_value = {}
324        self.volume._copy_volume_data(self.context,
325                                      src_vol,
326                                      dest_vol)
327
328        self.assertEqual(attach_expected, mock_attach.mock_calls)
329        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
330        self.assertEqual(detach_expected, mock_detach.mock_calls)
331
332        #  Test case for sparse_copy_volume = True
333        mock_attach.reset_mock()
334        mock_detach.reset_mock()
335        mock_attach.side_effect = attach_volume_returns
336        mock_get_capabilities.return_value = {'sparse_copy_volume': True}
337        self.volume._copy_volume_data(self.context,
338                                      src_vol,
339                                      dest_vol)
340
341        self.assertEqual(attach_expected, mock_attach.mock_calls)
342        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
343        self.assertEqual(detach_expected, mock_detach.mock_calls)
344
345        # cleanup resource
346        db.volume_destroy(self.context, src_vol['id'])
347        db.volume_destroy(self.context, dest_vol['id'])
348
349    @mock.patch(driver_name + '.initialize_connection')
350    @mock.patch(driver_name + '.create_export', return_value=None)
351    @mock.patch(driver_name + '._connect_device')
352    def test_attach_volume_encrypted(self, connect_mock, export_mock,
353                                     initialize_mock):
354        properties = {'host': 'myhost', 'ip': '192.168.1.43',
355                      'initiator': u'iqn.1994-05.com.redhat:d9be887375',
356                      'multipath': False, 'os_type': 'linux2',
357                      'platform': 'x86_64'}
358
359        data = {'target_discovered': True,
360                'target_iqn': 'iqn.2010-10.org.openstack:volume-00000001',
361                'target_portal': '127.0.0.0.1:3260',
362                'volume_id': 1,
363                'discard': False}
364
365        passed_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()}
366        initialize_mock.return_value = passed_conn
367
368        # _attach_volume adds the encrypted value based on the volume
369        expected_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()}
370        expected_conn['data']['encrypted'] = True
371
372        volume = tests_utils.create_volume(
373            self.context, status='available',
374            size=2,
375            encryption_key_id=fake.ENCRYPTION_KEY_ID)
376
377        attach_info, vol = self.volume.driver._attach_volume(self.context,
378                                                             volume,
379                                                             properties)
380
381        export_mock.assert_called_once_with(self.context, volume, properties)
382        initialize_mock.assert_called_once_with(volume, properties)
383
384        connect_mock.assert_called_once_with(expected_conn)
385
386        self.assertEqual(connect_mock.return_value, attach_info)
387        self.assertEqual(volume, vol)
388
389    @mock.patch.object(os_brick.initiator.connector,
390                       'get_connector_properties')
391    @mock.patch.object(image_utils, 'fetch_to_raw')
392    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
393    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
394    @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor')
395    @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor')
396    def test_copy_image_to_encrypted_volume(self,
397                                            mock_detach_encryptor,
398                                            mock_attach_encryptor,
399                                            mock_detach_volume,
400                                            mock_attach_volume,
401                                            mock_fetch_to_raw,
402                                            mock_get_connector_properties):
403        properties = {}
404        volume = tests_utils.create_volume(
405            self.context, status='available',
406            size=2,
407            encryption_key_id=fake.ENCRYPTION_KEY_ID)
408        volume_id = volume['id']
409        volume = db.volume_get(context.get_admin_context(), volume_id)
410        image_service = fake_image.FakeImageService()
411        local_path = 'dev/sda'
412        attach_info = {'device': {'path': local_path},
413                       'conn': {'driver_volume_type': 'iscsi',
414                                'data': {}, }}
415
416        mock_get_connector_properties.return_value = properties
417        mock_attach_volume.return_value = [attach_info, volume]
418
419        self.volume.driver.copy_image_to_encrypted_volume(
420            self.context, volume, image_service, fake.IMAGE_ID)
421
422        encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
423        mock_attach_volume.assert_called_once_with(
424            self.context, volume, properties)
425        mock_attach_encryptor.assert_called_once_with(
426            self.context, attach_info, encryption)
427        mock_fetch_to_raw.assert_called_once_with(
428            self.context, image_service, fake.IMAGE_ID,
429            local_path, '1M', size=2)
430        mock_detach_encryptor.assert_called_once_with(
431            attach_info, encryption)
432        mock_detach_volume.assert_called_once_with(
433            self.context, attach_info, volume, properties, force=True)
434
435    @mock.patch.object(os_brick.initiator.connector,
436                       'get_connector_properties')
437    @mock.patch.object(image_utils, 'fetch_to_raw')
438    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
439    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
440    @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor')
441    @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor')
442    def test_copy_image_to_encrypted_volume_failed_attach_encryptor(
443            self,
444            mock_detach_encryptor,
445            mock_attach_encryptor,
446            mock_detach_volume,
447            mock_attach_volume,
448            mock_fetch_to_raw,
449            mock_get_connector_properties):
450        properties = {}
451        volume = tests_utils.create_volume(
452            self.context, status='available',
453            size=2,
454            encryption_key_id=fake.ENCRYPTION_KEY_ID)
455        volume_id = volume['id']
456        volume = db.volume_get(context.get_admin_context(), volume_id)
457        image_service = fake_image.FakeImageService()
458        attach_info = {'device': {'path': 'dev/sda'},
459                       'conn': {'driver_volume_type': 'iscsi',
460                                'data': {}, }}
461
462        mock_get_connector_properties.return_value = properties
463        mock_attach_volume.return_value = [attach_info, volume]
464        raised_exception = os_brick.exception.VolumeEncryptionNotSupported(
465            volume_id = "123",
466            volume_type = "abc")
467        mock_attach_encryptor.side_effect = raised_exception
468
469        self.assertRaises(os_brick.exception.VolumeEncryptionNotSupported,
470                          self.volume.driver.copy_image_to_encrypted_volume,
471                          self.context, volume, image_service, fake.IMAGE_ID)
472
473        encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
474        mock_attach_volume.assert_called_once_with(
475            self.context, volume, properties)
476        mock_attach_encryptor.assert_called_once_with(
477            self.context, attach_info, encryption)
478        self.assertFalse(mock_fetch_to_raw.called)
479        self.assertFalse(mock_detach_encryptor.called)
480        mock_detach_volume.assert_called_once_with(
481            self.context, attach_info, volume, properties, force=True)
482
483    @mock.patch.object(os_brick.initiator.connector,
484                       'get_connector_properties')
485    @mock.patch.object(image_utils, 'fetch_to_raw')
486    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
487    @mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
488    @mock.patch.object(cinder.utils, 'brick_attach_volume_encryptor')
489    @mock.patch.object(cinder.utils, 'brick_detach_volume_encryptor')
490    @ddt.data(exception.ImageUnacceptable(
491              reason='fake', image_id=fake.IMAGE_ID),
492              exception.ImageTooBig(
493              reason='fake image size exceeded', image_id=fake.IMAGE_ID))
494    def test_copy_image_to_encrypted_volume_failed_fetch(
495            self, excep,
496            mock_detach_encryptor, mock_attach_encryptor,
497            mock_detach_volume, mock_attach_volume, mock_fetch_to_raw,
498            mock_get_connector_properties):
499        properties = {}
500        volume = tests_utils.create_volume(
501            self.context, status='available',
502            size=2,
503            encryption_key_id=fake.ENCRYPTION_KEY_ID)
504        volume_id = volume['id']
505        volume = db.volume_get(context.get_admin_context(), volume_id)
506        image_service = fake_image.FakeImageService()
507        local_path = 'dev/sda'
508        attach_info = {'device': {'path': local_path},
509                       'conn': {'driver_volume_type': 'iscsi',
510                                'data': {}, }}
511
512        mock_get_connector_properties.return_value = properties
513        mock_attach_volume.return_value = [attach_info, volume]
514        mock_fetch_to_raw.side_effect = excep
515
516        encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
517        self.assertRaises(type(excep),
518                          self.volume.driver.copy_image_to_encrypted_volume,
519                          self.context, volume, image_service, fake.IMAGE_ID)
520
521        mock_attach_volume.assert_called_once_with(
522            self.context, volume, properties)
523        mock_attach_encryptor.assert_called_once_with(
524            self.context, attach_info, encryption)
525        mock_fetch_to_raw.assert_called_once_with(
526            self.context, image_service, fake.IMAGE_ID,
527            local_path, '1M', size=2)
528        mock_detach_encryptor.assert_called_once_with(
529            attach_info, encryption)
530        mock_detach_volume.assert_called_once_with(
531            self.context, attach_info, volume, properties, force=True)
532
533    @mock.patch('cinder.volume.driver.brick_exception')
534    @mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
535                'terminate_connection', side_effect=Exception)
536    @mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
537                'remove_export', side_effect=Exception)
538    def test_detach_volume_force(self, remove_mock, terminate_mock, exc_mock):
539        """Test force parameter on _detach_volume.
540
541        On the driver if we receive the force parameter we will do everything
542        even with Exceptions on disconnect, terminate, and remove export.
543        """
544        connector = mock.Mock()
545        connector.disconnect_volume.side_effect = Exception
546        # TODO(geguileo): Remove this ExceptionChainer simulation once we
547        # release OS-Brick version with it and bump min version.
548        exc = exc_mock.ExceptionChainer.return_value
549        exc.context.return_value.__enter__.return_value = exc
550        exc.context.return_value.__exit__.return_value = True
551
552        volume = {'id': fake.VOLUME_ID}
553        attach_info = {'device': {},
554                       'connector': connector,
555                       'conn': {'data': {}, }}
556
557        # TODO(geguileo): Change TypeError to ExceptionChainer once we release
558        # OS-Brick version with it and bump min version.
559        self.assertRaises(TypeError,
560                          self.volume.driver._detach_volume, self.context,
561                          attach_info, volume, {}, force=True)
562
563        self.assertTrue(connector.disconnect_volume.called)
564        self.assertTrue(remove_mock.called)
565        self.assertTrue(terminate_mock.called)
566        self.assertEqual(3, exc.context.call_count)
567
568    @ddt.data({'cfg_value': '10', 'valid': True},
569              {'cfg_value': 'auto', 'valid': True},
570              {'cfg_value': '1', 'valid': True},
571              {'cfg_value': '1.2', 'valid': True},
572              {'cfg_value': '100', 'valid': True},
573              {'cfg_value': '20.15', 'valid': True},
574              {'cfg_value': 'True', 'valid': False},
575              {'cfg_value': 'False', 'valid': False},
576              {'cfg_value': '10.0.0', 'valid': False},
577              {'cfg_value': '0.00', 'valid': True},
578              {'cfg_value': 'anything', 'valid': False},)
579    @ddt.unpack
580    def test_auto_max_subscription_ratio_options(self, cfg_value, valid):
581        # This tests the max_over_subscription_ratio option as it is now
582        # checked by a regex
583        def _set_conf(config, value):
584            config.set_override('max_over_subscription_ratio', value)
585
586        config = conf.Configuration(None)
587        config.append_config_values(driver.volume_opts)
588
589        if valid:
590            _set_conf(config, cfg_value)
591            self.assertEqual(cfg_value, config.safe_get(
592                'max_over_subscription_ratio'))
593        else:
594            self.assertRaises(ValueError, _set_conf, config, cfg_value)
595
596
597class FibreChannelTestCase(BaseDriverTestCase):
598    """Test Case for FibreChannelDriver."""
599    driver_name = "cinder.volume.driver.FibreChannelDriver"
600
601    def test_initialize_connection(self):
602        self.assertRaises(NotImplementedError,
603                          self.volume.driver.initialize_connection, {}, {})
604
605    def test_validate_connector(self):
606        """validate_connector() successful use case.
607
608        validate_connector() does not throw an exception when
609        wwpns and wwnns are both set and both are not empty.
610        """
611        connector = {'wwpns': ["not empty"],
612                     'wwnns': ["not empty"]}
613        self.volume.driver.validate_connector(connector)
614
615    def test_validate_connector_no_wwpns(self):
616        """validate_connector() throws exception when it has no wwpns."""
617        connector = {'wwnns': ["not empty"]}
618        self.assertRaises(exception.InvalidConnectorException,
619                          self.volume.driver.validate_connector, connector)
620
621    def test_validate_connector_empty_wwpns(self):
622        """validate_connector() throws exception when it has empty wwpns."""
623        connector = {'wwpns': [],
624                     'wwnns': ["not empty"]}
625        self.assertRaises(exception.InvalidConnectorException,
626                          self.volume.driver.validate_connector, connector)
627
628    def test_validate_connector_no_wwnns(self):
629        """validate_connector() throws exception when it has no wwnns."""
630        connector = {'wwpns': ["not empty"]}
631        self.assertRaises(exception.InvalidConnectorException,
632                          self.volume.driver.validate_connector, connector)
633
634    def test_validate_connector_empty_wwnns(self):
635        """validate_connector() throws exception when it has empty wwnns."""
636        connector = {'wwnns': [],
637                     'wwpns': ["not empty"]}
638        self.assertRaises(exception.InvalidConnectorException,
639                          self.volume.driver.validate_connector, connector)
640