1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16"""Tests for Volume Code."""
17
18import ddt
19import time
20
21import mock
22import os_brick
23from oslo_concurrency import processutils
24from oslo_config import cfg
25from oslo_utils import imageutils
26
27from cinder.common import constants
28from cinder import context
29from cinder import db
30from cinder import exception
31from cinder import objects
32from cinder.objects import fields
33from cinder import quota
34from cinder.tests.unit.api import fakes
35from cinder.tests.unit import fake_constants as fake
36from cinder.tests.unit import fake_volume
37from cinder.tests.unit import utils as tests_utils
38from cinder.tests.unit import volume as base
39import cinder.volume
40from cinder.volume import api as volume_api
41from cinder.volume.flows.manager import create_volume as create_volume_manager
42from cinder.volume import rpcapi as volume_rpcapi
43from cinder.volume import utils as volutils
44from cinder.volume import volume_types
45
46
47QUOTAS = quota.QUOTAS
48
49CONF = cfg.CONF
50
51
52def create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
53                    **kwargs):
54    """Create a snapshot object."""
55    metadata = metadata or {}
56    snap = objects.Snapshot(ctxt or context.get_admin_context())
57    snap.volume_size = size
58    snap.user_id = kwargs.get('user_id', fake.USER_ID)
59    snap.project_id = kwargs.get('project_id', fake.PROJECT_ID)
60    snap.volume_id = volume_id
61    snap.status = fields.SnapshotStatus.CREATING
62    if metadata is not None:
63        snap.metadata = metadata
64    snap.update(kwargs)
65
66    snap.create()
67    return snap
68
69
70@ddt.ddt
71class VolumeMigrationTestCase(base.BaseVolumeTestCase):
72
73    def setUp(self):
74        super(VolumeMigrationTestCase, self).setUp()
75        self._clear_patch = mock.patch('cinder.volume.utils.clear_volume',
76                                       autospec=True)
77        self._clear_patch.start()
78        self.expected_status = 'available'
79        self._service = tests_utils.create_service(
80            self.context,
81            values={'host': 'newhost', 'binary': constants.VOLUME_BINARY})
82
83    def tearDown(self):
84        super(VolumeMigrationTestCase, self).tearDown()
85        self._clear_patch.stop()
86
87    def test_migrate_volume_driver(self):
88        """Test volume migration done by driver."""
89        # Mock driver and rpc functions
90        self.mock_object(self.volume.driver, 'migrate_volume',
91                         lambda x, y, z, new_type_id=None: (
92                             True, {'user_id': fake.USER_ID}))
93
94        volume = tests_utils.create_volume(self.context, size=0,
95                                           host=CONF.host,
96                                           migration_status='migrating')
97        host_obj = {'host': 'newhost', 'capabilities': {}}
98        self.volume.migrate_volume(self.context, volume, host_obj, False)
99
100        # check volume properties
101        volume = objects.Volume.get_by_id(context.get_admin_context(),
102                                          volume.id)
103        self.assertEqual('newhost', volume.host)
104        self.assertEqual('success', volume.migration_status)
105
106    def test_migrate_volume_driver_cross_az(self):
107        """Test volume migration done by driver."""
108        # Mock driver and rpc functions
109        self.mock_object(self.volume.driver, 'migrate_volume',
110                         lambda x, y, z, new_type_id=None: (
111                             True, {'user_id': fake.USER_ID}))
112        dst_az = 'AZ2'
113        db.service_update(self.context, self._service.id,
114                          {'availability_zone': dst_az})
115
116        volume = tests_utils.create_volume(self.context, size=0,
117                                           host=CONF.host,
118                                           migration_status='migrating')
119        host_obj = {'host': 'newhost', 'capabilities': {}}
120        self.volume.migrate_volume(self.context, volume, host_obj, False)
121
122        # check volume properties
123        volume.refresh()
124        self.assertEqual('newhost', volume.host)
125        self.assertEqual('success', volume.migration_status)
126        self.assertEqual(dst_az, volume.availability_zone)
127
128    def _fake_create_volume(self, ctxt, volume, req_spec, filters,
129                            allow_reschedule=True):
130        return db.volume_update(ctxt, volume['id'],
131                                {'status': self.expected_status})
132
133    def test_migrate_volume_error(self):
134        with mock.patch.object(self.volume.driver, 'migrate_volume') as \
135                mock_migrate,\
136                mock.patch.object(self.volume.driver, 'create_export') as \
137                mock_create_export:
138
139            # Exception case at self.driver.migrate_volume and create_export
140            mock_migrate.side_effect = processutils.ProcessExecutionError
141            mock_create_export.side_effect = processutils.ProcessExecutionError
142            volume = tests_utils.create_volume(self.context, size=0,
143                                               host=CONF.host)
144            host_obj = {'host': 'newhost', 'capabilities': {}}
145            self.assertRaises(processutils.ProcessExecutionError,
146                              self.volume.migrate_volume,
147                              self.context,
148                              volume,
149                              host_obj,
150                              False)
151            volume = objects.Volume.get_by_id(context.get_admin_context(),
152                                              volume.id)
153            self.assertEqual('error', volume.migration_status)
154            self.assertEqual('available', volume.status)
155
156    @mock.patch('cinder.compute.API')
157    @mock.patch('cinder.volume.manager.VolumeManager.'
158                'migrate_volume_completion')
159    @mock.patch('cinder.db.sqlalchemy.api.volume_get')
160    def test_migrate_volume_generic(self, volume_get,
161                                    migrate_volume_completion,
162                                    nova_api):
163        fake_db_new_volume = {'status': 'available', 'id': fake.VOLUME_ID}
164        fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
165        new_volume_obj = fake_volume.fake_volume_obj(self.context,
166                                                     **fake_new_volume)
167        host_obj = {'host': 'newhost', 'capabilities': {}}
168        volume_get.return_value = fake_new_volume
169        update_server_volume = nova_api.return_value.update_server_volume
170        volume = tests_utils.create_volume(self.context, size=1,
171                                           host=CONF.host)
172        with mock.patch.object(self.volume, '_copy_volume_data') as \
173                mock_copy_volume:
174            self.volume._migrate_volume_generic(self.context, volume,
175                                                host_obj, None)
176            mock_copy_volume.assert_called_with(self.context, volume,
177                                                new_volume_obj,
178                                                remote='dest')
179            migrate_volume_completion.assert_called_with(
180                self.context, volume, new_volume_obj, error=False)
181            self.assertFalse(update_server_volume.called)
182
183    @mock.patch('cinder.compute.API')
184    @mock.patch('cinder.volume.manager.VolumeManager.'
185                'migrate_volume_completion')
186    def test_migrate_volume_generic_cross_az(self, migrate_volume_completion,
187                                             nova_api):
188        """Test that we set the right AZ in cross AZ migrations."""
189        original_create = objects.Volume.create
190        dst_az = 'AZ2'
191        db.service_update(self.context, self._service.id,
192                          {'availability_zone': dst_az})
193
194        def my_create(self, *args, **kwargs):
195            self.status = 'available'
196            original_create(self, *args, **kwargs)
197
198        volume = tests_utils.create_volume(self.context, size=1,
199                                           host=CONF.host)
200
201        host_obj = {'host': 'newhost', 'capabilities': {}}
202        create_vol = self.patch('cinder.objects.Volume.create',
203                                side_effect=my_create, autospec=True)
204
205        with mock.patch.object(self.volume, '_copy_volume_data') as copy_mock:
206            self.volume._migrate_volume_generic(self.context, volume, host_obj,
207                                                None)
208            copy_mock.assert_called_with(self.context, volume, mock.ANY,
209                                         remote='dest')
210        migrate_volume_completion.assert_called_with(
211            self.context, volume, mock.ANY, error=False)
212
213        nova_api.return_value.update_server_volume.assert_not_called()
214
215        self.assertEqual(dst_az,
216                         create_vol.call_args[0][0]['availability_zone'])
217
218    @mock.patch('cinder.compute.API')
219    @mock.patch('cinder.volume.manager.VolumeManager.'
220                'migrate_volume_completion')
221    @mock.patch('cinder.db.sqlalchemy.api.volume_get')
222    def test_migrate_volume_generic_attached_volume(self, volume_get,
223                                                    migrate_volume_completion,
224                                                    nova_api):
225        attached_host = 'some-host'
226        fake_volume_id = fake.VOLUME_ID
227        fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
228        fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
229        host_obj = {'host': 'newhost', 'capabilities': {}}
230        fake_uuid = fakes.get_fake_uuid()
231        update_server_volume = nova_api.return_value.update_server_volume
232        volume_get.return_value = fake_new_volume
233        volume = tests_utils.create_volume(self.context, size=1,
234                                           host=CONF.host)
235        volume_attach = tests_utils.attach_volume(
236            self.context, volume['id'], fake_uuid, attached_host, '/dev/vda')
237        self.assertIsNotNone(volume_attach['volume_attachment'][0]['id'])
238        self.assertEqual(
239            fake_uuid, volume_attach['volume_attachment'][0]['instance_uuid'])
240        self.assertEqual('in-use', volume_attach['status'])
241        self.volume._migrate_volume_generic(self.context, volume,
242                                            host_obj, None)
243        self.assertFalse(migrate_volume_completion.called)
244        update_server_volume.assert_called_with(self.context, fake_uuid,
245                                                volume['id'], fake_volume_id)
246
247    @mock.patch('cinder.objects.volume.Volume.save')
248    @mock.patch('cinder.volume.rpcapi.VolumeAPI.create_volume')
249    @mock.patch('cinder.compute.API')
250    @mock.patch('cinder.volume.manager.VolumeManager.'
251                'migrate_volume_completion')
252    @mock.patch('cinder.db.sqlalchemy.api.volume_get')
253    def test_migrate_volume_generic_volume_from_snap(self, volume_get,
254                                                     migrate_volume_completion,
255                                                     nova_api, create_volume,
256                                                     save):
257        def fake_create_volume(*args, **kwargs):
258            context, volume, request_spec, filter_properties = args
259            fake_db = mock.Mock()
260            task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
261            specs = task.execute(context, volume, {})
262            self.assertEqual('raw', specs['type'])
263
264        def fake_copy_volume_data_with_chk_param(*args, **kwargs):
265            context, src, dest = args
266            self.assertEqual(src['snapshot_id'], dest['snapshot_id'])
267
268        fake_db_new_volume = {'status': 'available', 'id': fake.VOLUME_ID}
269        fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
270        host_obj = {'host': 'newhost', 'capabilities': {}}
271        volume_get.return_value = fake_new_volume
272
273        volume_from_snap = tests_utils.create_volume(self.context, size=1,
274                                                     host=CONF.host)
275        volume_from_snap['snapshot_id'] = fake.SNAPSHOT_ID
276        create_volume.side_effect = fake_create_volume
277
278        with mock.patch.object(self.volume, '_copy_volume_data') as \
279                mock_copy_volume:
280            mock_copy_volume.side_effect = fake_copy_volume_data_with_chk_param
281            self.volume._migrate_volume_generic(self.context, volume_from_snap,
282                                                host_obj, None)
283
284    @mock.patch('cinder.objects.volume.Volume.save')
285    @mock.patch('cinder.volume.rpcapi.VolumeAPI.create_volume')
286    @mock.patch('cinder.compute.API')
287    @mock.patch('cinder.volume.manager.VolumeManager.'
288                'migrate_volume_completion')
289    @mock.patch('cinder.db.sqlalchemy.api.volume_get')
290    def test_migrate_volume_generic_for_clone(self, volume_get,
291                                              migrate_volume_completion,
292                                              nova_api, create_volume, save):
293        def fake_create_volume(*args, **kwargs):
294            context, volume, request_spec, filter_properties = args
295            fake_db = mock.Mock()
296            task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
297            specs = task.execute(context, volume, {})
298            self.assertEqual('raw', specs['type'])
299
300        def fake_copy_volume_data_with_chk_param(*args, **kwargs):
301            context, src, dest = args
302            self.assertEqual(src['source_volid'], dest['source_volid'])
303
304        fake_db_new_volume = {'status': 'available', 'id': fake.VOLUME_ID}
305        fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
306        host_obj = {'host': 'newhost', 'capabilities': {}}
307        volume_get.return_value = fake_new_volume
308
309        clone = tests_utils.create_volume(self.context, size=1,
310                                          host=CONF.host)
311        clone['source_volid'] = fake.VOLUME2_ID
312        create_volume.side_effect = fake_create_volume
313
314        with mock.patch.object(self.volume, '_copy_volume_data') as \
315                mock_copy_volume:
316            mock_copy_volume.side_effect = fake_copy_volume_data_with_chk_param
317            self.volume._migrate_volume_generic(self.context, clone,
318                                                host_obj, None)
319
320    @mock.patch.object(volume_rpcapi.VolumeAPI, 'update_migrated_volume')
321    @mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume')
322    @mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')
323    def test_migrate_volume_for_volume_generic(self, create_volume,
324                                               rpc_delete_volume,
325                                               update_migrated_volume):
326        fake_volume = tests_utils.create_volume(self.context, size=1,
327                                                previous_status='available',
328                                                host=CONF.host)
329
330        host_obj = {'host': 'newhost', 'capabilities': {}}
331        with mock.patch.object(self.volume.driver, 'migrate_volume') as \
332                mock_migrate_volume,\
333                mock.patch.object(self.volume, '_copy_volume_data'),\
334                mock.patch.object(self.volume.driver, 'delete_volume') as \
335                delete_volume:
336            create_volume.side_effect = self._fake_create_volume
337            self.volume.migrate_volume(self.context, fake_volume, host_obj,
338                                       True)
339            volume = objects.Volume.get_by_id(context.get_admin_context(),
340                                              fake_volume.id)
341            self.assertEqual('newhost', volume.host)
342            self.assertEqual('success', volume.migration_status)
343            self.assertFalse(mock_migrate_volume.called)
344            self.assertFalse(delete_volume.called)
345            self.assertTrue(rpc_delete_volume.called)
346            self.assertTrue(update_migrated_volume.called)
347
348    def test_migrate_volume_generic_copy_error(self):
349        with mock.patch.object(self.volume.driver, 'migrate_volume'),\
350                mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
351                as mock_create_volume,\
352                mock.patch.object(self.volume, '_copy_volume_data') as \
353                mock_copy_volume,\
354                mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
355                mock.patch.object(self.volume, 'migrate_volume_completion'),\
356                mock.patch.object(self.volume.driver, 'create_export'):
357
358            # Exception case at migrate_volume_generic
359            # source_volume['migration_status'] is 'migrating'
360            mock_create_volume.side_effect = self._fake_create_volume
361            mock_copy_volume.side_effect = processutils.ProcessExecutionError
362            volume = tests_utils.create_volume(self.context, size=0,
363                                               host=CONF.host)
364            host_obj = {'host': 'newhost', 'capabilities': {}}
365            self.assertRaises(processutils.ProcessExecutionError,
366                              self.volume.migrate_volume,
367                              self.context,
368                              volume,
369                              host_obj,
370                              True)
371            volume = objects.Volume.get_by_id(context.get_admin_context(),
372                                              volume.id)
373            self.assertEqual('error', volume.migration_status)
374            self.assertEqual('available', volume.status)
375
376    @mock.patch('cinder.image.image_utils.qemu_img_info')
377    def test_migrate_volume_with_glance_metadata(self, mock_qemu_info):
378        volume = self._create_volume_from_image(clone_image_volume=True)
379        glance_metadata = volume.glance_metadata
380
381        # We imitate the behavior of rpcapi, by serializing and then
382        # deserializing the volume object we created earlier.
383        serializer = objects.base.CinderObjectSerializer()
384        serialized_volume = serializer.serialize_entity(self.context, volume)
385        volume = serializer.deserialize_entity(self.context, serialized_volume)
386
387        image_info = imageutils.QemuImgInfo()
388        image_info.virtual_size = '1073741824'
389        mock_qemu_info.return_value = image_info
390
391        host_obj = {'host': 'newhost', 'capabilities': {}}
392        with mock.patch.object(self.volume.driver,
393                               'migrate_volume') as mock_migrate_volume:
394            mock_migrate_volume.side_effect = (
395                lambda x, y, z, new_type_id=None: (
396                    True, {'user_id': fake.USER_ID}))
397            self.volume.migrate_volume(self.context, volume, host_obj,
398                                       False)
399        self.assertEqual('newhost', volume.host)
400        self.assertEqual('success', volume.migration_status)
401        self.assertEqual(glance_metadata, volume.glance_metadata)
402
403    @mock.patch('cinder.db.volume_update')
404    def test_update_migrated_volume(self, volume_update):
405        fake_host = 'fake_host'
406        fake_new_host = 'fake_new_host'
407        fake_update = {'_name_id': fake.VOLUME2_NAME_ID,
408                       'provider_location': 'updated_location'}
409        fake_elevated = context.RequestContext(fake.USER_ID, self.project_id,
410                                               is_admin=True)
411        volume = tests_utils.create_volume(self.context, size=1,
412                                           status='available',
413                                           host=fake_host)
414        new_volume = tests_utils.create_volume(
415            self.context, size=1,
416            status='available',
417            provider_location='fake_provider_location',
418            _name_id=fake.VOLUME_NAME_ID,
419            host=fake_new_host)
420        new_volume._name_id = fake.VOLUME_NAME_ID
421        new_volume.provider_location = 'fake_provider_location'
422        fake_update_error = {'_name_id': new_volume._name_id,
423                             'provider_location':
424                             new_volume.provider_location}
425        expected_update = {'_name_id': volume._name_id,
426                           'provider_location': volume.provider_location}
427        with mock.patch.object(self.volume.driver,
428                               'update_migrated_volume') as migrate_update,\
429                mock.patch.object(self.context, 'elevated') as elevated:
430            migrate_update.return_value = fake_update
431            elevated.return_value = fake_elevated
432            self.volume.update_migrated_volume(self.context, volume,
433                                               new_volume, 'available')
434            volume_update.assert_has_calls((
435                mock.call(fake_elevated, new_volume.id, expected_update),
436                mock.call(fake_elevated, volume.id, fake_update)))
437
438            # Test the case for update_migrated_volume not implemented
439            # for the driver.
440            migrate_update.reset_mock()
441            volume_update.reset_mock()
442            # Reset the volume objects to their original value, since they
443            # were changed in the last call.
444            new_volume._name_id = fake.VOLUME_NAME_ID
445            new_volume.provider_location = 'fake_provider_location'
446            migrate_update.side_effect = NotImplementedError
447            self.volume.update_migrated_volume(self.context, volume,
448                                               new_volume, 'available')
449            volume_update.assert_has_calls((
450                mock.call(fake_elevated, new_volume.id, fake_update),
451                mock.call(fake_elevated, volume.id, fake_update_error)))
452
453    def test_migrate_volume_generic_create_volume_error(self):
454        self.expected_status = 'error'
455
456        with mock.patch.object(self.volume.driver, 'migrate_volume'), \
457                mock.patch.object(volume_rpcapi.VolumeAPI,
458                                  'create_volume') as mock_create_volume, \
459                mock.patch.object(self.volume, '_clean_temporary_volume') as \
460                clean_temporary_volume:
461
462            # Exception case at the creation of the new temporary volume
463            mock_create_volume.side_effect = self._fake_create_volume
464            volume = tests_utils.create_volume(self.context, size=0,
465                                               host=CONF.host)
466            host_obj = {'host': 'newhost', 'capabilities': {}}
467            self.assertRaises(exception.VolumeMigrationFailed,
468                              self.volume.migrate_volume,
469                              self.context,
470                              volume,
471                              host_obj,
472                              True)
473            volume = objects.Volume.get_by_id(context.get_admin_context(),
474                                              volume.id)
475            self.assertEqual('error', volume['migration_status'])
476            self.assertEqual('available', volume['status'])
477            self.assertTrue(clean_temporary_volume.called)
478        self.expected_status = 'available'
479
480    def test_migrate_volume_generic_timeout_error(self):
481        CONF.set_override("migration_create_volume_timeout_secs", 2)
482
483        with mock.patch.object(self.volume.driver, 'migrate_volume'), \
484                mock.patch.object(volume_rpcapi.VolumeAPI,
485                                  'create_volume') as mock_create_volume, \
486                mock.patch.object(self.volume, '_clean_temporary_volume') as \
487                clean_temporary_volume, \
488                mock.patch.object(time, 'sleep'):
489
490            # Exception case at the timeout of the volume creation
491            self.expected_status = 'creating'
492            mock_create_volume.side_effect = self._fake_create_volume
493            volume = tests_utils.create_volume(self.context, size=0,
494                                               host=CONF.host)
495            host_obj = {'host': 'newhost', 'capabilities': {}}
496            self.assertRaises(exception.VolumeMigrationFailed,
497                              self.volume.migrate_volume,
498                              self.context,
499                              volume,
500                              host_obj,
501                              True)
502            volume = objects.Volume.get_by_id(context.get_admin_context(),
503                                              volume.id)
504            self.assertEqual('error', volume['migration_status'])
505            self.assertEqual('available', volume['status'])
506            self.assertTrue(clean_temporary_volume.called)
507        self.expected_status = 'available'
508
509    def test_migrate_volume_generic_create_export_error(self):
510        with mock.patch.object(self.volume.driver, 'migrate_volume'),\
511                mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
512                as mock_create_volume,\
513                mock.patch.object(self.volume, '_copy_volume_data') as \
514                mock_copy_volume,\
515                mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
516                mock.patch.object(self.volume, 'migrate_volume_completion'),\
517                mock.patch.object(self.volume.driver, 'create_export') as \
518                mock_create_export:
519
520            # Exception case at create_export
521            mock_create_volume.side_effect = self._fake_create_volume
522            mock_copy_volume.side_effect = processutils.ProcessExecutionError
523            mock_create_export.side_effect = processutils.ProcessExecutionError
524            volume = tests_utils.create_volume(self.context, size=0,
525                                               host=CONF.host)
526            host_obj = {'host': 'newhost', 'capabilities': {}}
527            self.assertRaises(processutils.ProcessExecutionError,
528                              self.volume.migrate_volume,
529                              self.context,
530                              volume,
531                              host_obj,
532                              True)
533            volume = objects.Volume.get_by_id(context.get_admin_context(),
534                                              volume.id)
535            self.assertEqual('error', volume['migration_status'])
536            self.assertEqual('available', volume['status'])
537
538    def test_migrate_volume_generic_migrate_volume_completion_error(self):
539        def fake_migrate_volume_completion(ctxt, volume, new_volume,
540                                           error=False):
541            db.volume_update(ctxt, volume['id'],
542                             {'migration_status': 'completing'})
543            raise processutils.ProcessExecutionError
544
545        with mock.patch.object(self.volume.driver, 'migrate_volume'),\
546                mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
547                as mock_create_volume,\
548                mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
549                mock.patch.object(self.volume, 'migrate_volume_completion')\
550                as mock_migrate_compl,\
551                mock.patch.object(self.volume.driver, 'create_export'), \
552                mock.patch.object(self.volume, '_attach_volume') \
553                as mock_attach, \
554                mock.patch.object(self.volume, '_detach_volume'), \
555                mock.patch.object(os_brick.initiator.connector,
556                                  'get_connector_properties') \
557                as mock_get_connector_properties, \
558                mock.patch.object(volutils, 'copy_volume') as mock_copy, \
559                mock.patch.object(volume_rpcapi.VolumeAPI,
560                                  'get_capabilities') \
561                as mock_get_capabilities:
562
563            # Exception case at delete_volume
564            # source_volume['migration_status'] is 'completing'
565            mock_create_volume.side_effect = self._fake_create_volume
566            mock_migrate_compl.side_effect = fake_migrate_volume_completion
567            mock_get_connector_properties.return_value = {}
568            mock_attach.side_effect = [{'device': {'path': 'bar'}},
569                                       {'device': {'path': 'foo'}}]
570            mock_get_capabilities.return_value = {'sparse_copy_volume': True}
571            volume = tests_utils.create_volume(self.context, size=0,
572                                               host=CONF.host)
573            host_obj = {'host': 'newhost', 'capabilities': {}}
574            self.assertRaises(processutils.ProcessExecutionError,
575                              self.volume.migrate_volume,
576                              self.context,
577                              volume,
578                              host_obj,
579                              True)
580            volume = db.volume_get(context.get_admin_context(), volume['id'])
581            self.assertEqual('error', volume['migration_status'])
582            self.assertEqual('available', volume['status'])
583            mock_copy.assert_called_once_with('foo', 'bar', 0, '1M',
584                                              sparse=True)
585
586    def fake_attach_volume(self, ctxt, volume, instance_uuid, host_name,
587                           mountpoint, mode):
588            tests_utils.attach_volume(ctxt, volume.id,
589                                      instance_uuid, host_name,
590                                      '/dev/vda')
591
592    def _test_migrate_volume_completion(self, status='available',
593                                        instance_uuid=None, attached_host=None,
594                                        retyping=False,
595                                        previous_status='available'):
596
597        initial_status = retyping and 'retyping' or status
598        old_volume = tests_utils.create_volume(self.context, size=0,
599                                               host=CONF.host,
600                                               status=initial_status,
601                                               migration_status='migrating',
602                                               previous_status=previous_status)
603        attachment = None
604        if status == 'in-use':
605            vol = tests_utils.attach_volume(self.context, old_volume.id,
606                                            instance_uuid, attached_host,
607                                            '/dev/vda')
608            self.assertEqual('in-use', vol['status'])
609            attachment = vol['volume_attachment'][0]
610        target_status = 'target:%s' % old_volume.id
611        new_host = CONF.host + 'new'
612        new_volume = tests_utils.create_volume(self.context, size=0,
613                                               host=new_host,
614                                               migration_status=target_status)
615        with mock.patch.object(self.volume, 'detach_volume') as \
616                mock_detach_volume,\
617                mock.patch.object(volume_rpcapi.VolumeAPI,
618                                  'delete_volume') as mock_delete_volume,\
619                mock.patch.object(volume_rpcapi.VolumeAPI,
620                                  'attach_volume') as mock_attach_volume,\
621                mock.patch.object(volume_rpcapi.VolumeAPI,
622                                  'update_migrated_volume'),\
623                mock.patch.object(self.volume.driver, 'attach_volume'):
624            mock_attach_volume.side_effect = self.fake_attach_volume
625            old_volume_host = old_volume.host
626            new_volume_host = new_volume.host
627            self.volume.migrate_volume_completion(self.context, old_volume,
628                                                  new_volume)
629            after_new_volume = objects.Volume.get_by_id(self.context,
630                                                        new_volume.id)
631            after_old_volume = objects.Volume.get_by_id(self.context,
632                                                        old_volume.id)
633            if status == 'in-use':
634                mock_detach_volume.assert_called_with(self.context,
635                                                      old_volume.id,
636                                                      attachment['id'])
637                attachments = db.volume_attachment_get_all_by_instance_uuid(
638                    self.context, instance_uuid)
639                mock_attach_volume.assert_called_once_with(
640                    self.context,
641                    old_volume,
642                    attachment['instance_uuid'],
643                    attachment['attached_host'],
644                    attachment['mountpoint'],
645                    attachment.get('attach_mode', 'rw'),
646                )
647                self.assertIsNotNone(attachments)
648                self.assertEqual(attached_host,
649                                 attachments[0]['attached_host'])
650                self.assertEqual(instance_uuid,
651                                 attachments[0]['instance_uuid'])
652            else:
653                self.assertFalse(mock_detach_volume.called)
654            self.assertTrue(mock_delete_volume.called)
655            # NOTE(sborkows): the migrate_volume_completion method alters
656            # old and new volume objects, so we need to check the equality
657            # between the former host value and the actual one.
658            self.assertEqual(old_volume_host, after_new_volume.host)
659            self.assertEqual(new_volume_host, after_old_volume.host)
660
661    def test_migrate_volume_completion_retype_available(self):
662        self._test_migrate_volume_completion('available', retyping=True)
663
664    def test_migrate_volume_completion_retype_in_use(self):
665        self._test_migrate_volume_completion(
666            'in-use',
667            '83c969d5-065e-4c9c-907d-5394bc2e98e2',
668            'some-host',
669            retyping=True,
670            previous_status='in-use')
671
672    def test_migrate_volume_completion_migrate_available(self):
673        self._test_migrate_volume_completion()
674
675    def test_migrate_volume_completion_migrate_in_use(self):
676        self._test_migrate_volume_completion(
677            'in-use',
678            '83c969d5-065e-4c9c-907d-5394bc2e98e2',
679            'some-host',
680            retyping=False,
681            previous_status='in-use')
682
683    @ddt.data(False, True)
684    def test_api_migrate_volume_completion_from_swap_with_no_migration(
685            self, swap_error):
686        # This test validates that Cinder properly finishes the swap volume
687        # status updates for the case that no migration has occurred
688        instance_uuid = '83c969d5-065e-4c9c-907d-5394bc2e98e2'
689        attached_host = 'attached-host'
690        orig_attached_vol = tests_utils.create_volume(self.context, size=0)
691        orig_attached_vol = tests_utils.attach_volume(
692            self.context, orig_attached_vol['id'], instance_uuid,
693            attached_host, '/dev/vda')
694        new_volume = tests_utils.create_volume(self.context, size=0)
695
696        @mock.patch.object(volume_rpcapi.VolumeAPI, 'detach_volume')
697        @mock.patch.object(volume_rpcapi.VolumeAPI, 'attach_volume')
698        def _run_migration_completion(rpc_attach_volume,
699                                      rpc_detach_volume):
700            attachment = orig_attached_vol['volume_attachment'][0]
701            attachment_id = attachment['id']
702            rpc_attach_volume.side_effect = self.fake_attach_volume
703            vol_id = volume_api.API().migrate_volume_completion(
704                self.context, orig_attached_vol, new_volume, swap_error)
705            if swap_error:
706                # When swap failed, we don't want to finish attachment
707                self.assertFalse(rpc_detach_volume.called)
708                self.assertFalse(rpc_attach_volume.called)
709            else:
710                # When no error, we should be finishing the attachment
711                rpc_detach_volume.assert_called_with(self.context,
712                                                     orig_attached_vol,
713                                                     attachment_id)
714                rpc_attach_volume.assert_called_with(
715                    self.context, new_volume, attachment['instance_uuid'],
716                    attachment['attached_host'], attachment['mountpoint'],
717                    'rw')
718            self.assertEqual(new_volume['id'], vol_id)
719
720        _run_migration_completion()
721
722    @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
723    def test_retype_setup_fail_volume_is_available(self, mock_notify):
724        """Verify volume is still available if retype prepare failed."""
725        elevated = context.get_admin_context()
726        project_id = self.context.project_id
727
728        db.volume_type_create(elevated, {'name': 'old', 'extra_specs': {}})
729        old_vol_type = db.volume_type_get_by_name(elevated, 'old')
730        db.volume_type_create(elevated, {'name': 'new', 'extra_specs': {}})
731        new_vol_type = db.volume_type_get_by_name(elevated, 'new')
732        db.quota_create(elevated, project_id, 'volumes_new', 0)
733
734        volume = tests_utils.create_volume(self.context, size=1,
735                                           host=CONF.host, status='available',
736                                           volume_type_id=old_vol_type['id'])
737
738        api = cinder.volume.api.API()
739        self.assertRaises(exception.VolumeLimitExceeded, api.retype,
740                          self.context, volume, new_vol_type['id'])
741
742        volume = db.volume_get(elevated, volume.id)
743        mock_notify.assert_not_called()
744        self.assertEqual('available', volume['status'])
745
746    @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
747    def _retype_volume_exec(self, driver, mock_notify,
748                            snap=False, policy='on-demand',
749                            migrate_exc=False, exc=None, diff_equal=False,
750                            replica=False, reserve_vol_type_only=False,
751                            encryption_changed=False,
752                            replica_new=None):
753        elevated = context.get_admin_context()
754        project_id = self.context.project_id
755
756        if replica:
757            rep_status = 'enabled'
758            extra_specs = {'replication_enabled': '<is> True'}
759        else:
760            rep_status = 'disabled'
761            extra_specs = {}
762
763        if replica_new is None:
764            replica_new = replica
765        new_specs = {'replication_enabled': '<is> True'} if replica_new else {}
766
767        db.volume_type_create(elevated, {'name': 'old',
768                                         'extra_specs': extra_specs})
769        old_vol_type = db.volume_type_get_by_name(elevated, 'old')
770
771        db.volume_type_create(elevated, {'name': 'new',
772                                         'extra_specs': new_specs})
773        vol_type = db.volume_type_get_by_name(elevated, 'new')
774        db.quota_create(elevated, project_id, 'volumes_new', 10)
775
776        volume = tests_utils.create_volume(self.context, size=1,
777                                           host=CONF.host, status='retyping',
778                                           volume_type_id=old_vol_type['id'],
779                                           replication_status=rep_status)
780        volume.previous_status = 'available'
781        volume.save()
782        if snap:
783            create_snapshot(volume.id, size=volume.size,
784                            user_id=self.user_context.user_id,
785                            project_id=self.user_context.project_id,
786                            ctxt=self.user_context)
787        if driver or diff_equal:
788            host_obj = {'host': CONF.host, 'capabilities': {}}
789        else:
790            host_obj = {'host': 'newhost', 'capabilities': {}}
791
792        reserve_opts = {'volumes': 1, 'gigabytes': volume.size}
793        QUOTAS.add_volume_type_opts(self.context,
794                                    reserve_opts,
795                                    vol_type['id'])
796        if reserve_vol_type_only:
797            reserve_opts.pop('volumes')
798            reserve_opts.pop('gigabytes')
799            try:
800                usage = db.quota_usage_get(elevated, project_id, 'volumes')
801                total_volumes_in_use = usage.in_use
802                usage = db.quota_usage_get(elevated, project_id, 'gigabytes')
803                total_gigabytes_in_use = usage.in_use
804            except exception.QuotaUsageNotFound:
805                total_volumes_in_use = 0
806                total_gigabytes_in_use = 0
807        reservations = QUOTAS.reserve(self.context,
808                                      project_id=project_id,
809                                      **reserve_opts)
810
811        old_reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
812        QUOTAS.add_volume_type_opts(self.context,
813                                    old_reserve_opts,
814                                    old_vol_type['id'])
815        old_reservations = QUOTAS.reserve(self.context,
816                                          project_id=project_id,
817                                          **old_reserve_opts)
818
819        with mock.patch.object(self.volume.driver, 'retype') as _retype,\
820                mock.patch.object(volume_types, 'volume_types_diff') as _diff,\
821                mock.patch.object(self.volume, 'migrate_volume') as _mig,\
822                mock.patch.object(db.sqlalchemy.api, 'volume_get') as _vget,\
823                mock.patch.object(context.RequestContext, 'elevated') as _ctx:
824            _vget.return_value = volume
825            _retype.return_value = driver
826            _ctx.return_value = self.context
827            returned_diff = {
828                'encryption': {},
829                'qos_specs': {},
830                'extra_specs': {},
831            }
832            if replica != replica_new:
833                returned_diff['extra_specs']['replication_enabled'] = (
834                    extra_specs.get('replication_enabled'),
835                    new_specs.get('replication_enabled'))
836            expected_replica_status = 'enabled' if replica_new else 'disabled'
837
838            if encryption_changed:
839                returned_diff['encryption'] = 'fake'
840            _diff.return_value = (returned_diff, diff_equal)
841            if migrate_exc:
842                _mig.side_effect = KeyError
843            else:
844                _mig.return_value = True
845
846            if not exc:
847                self.volume.retype(self.context, volume,
848                                   vol_type['id'], host_obj,
849                                   migration_policy=policy,
850                                   reservations=reservations,
851                                   old_reservations=old_reservations)
852            else:
853                self.assertRaises(exc, self.volume.retype,
854                                  self.context, volume,
855                                  vol_type['id'], host_obj,
856                                  migration_policy=policy,
857                                  reservations=reservations,
858                                  old_reservations=old_reservations)
859            if host_obj['host'] != CONF.host:
860                _retype.assert_not_called()
861
862        # get volume/quota properties
863        volume = objects.Volume.get_by_id(elevated, volume.id)
864        try:
865            usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
866            volumes_in_use = usage.in_use
867        except exception.QuotaUsageNotFound:
868            volumes_in_use = 0
869
870        # Get new in_use after retype, it should not be changed.
871        if reserve_vol_type_only:
872            try:
873                usage = db.quota_usage_get(elevated, project_id, 'volumes')
874                new_total_volumes_in_use = usage.in_use
875                usage = db.quota_usage_get(elevated, project_id, 'gigabytes')
876                new_total_gigabytes_in_use = usage.in_use
877            except exception.QuotaUsageNotFound:
878                new_total_volumes_in_use = 0
879                new_total_gigabytes_in_use = 0
880            self.assertEqual(total_volumes_in_use, new_total_volumes_in_use)
881            self.assertEqual(total_gigabytes_in_use,
882                             new_total_gigabytes_in_use)
883
884        # check properties
885        if driver or diff_equal:
886            self.assertEqual(vol_type['id'], volume.volume_type_id)
887            self.assertEqual('available', volume.status)
888            self.assertEqual(CONF.host, volume.host)
889            self.assertEqual(1, volumes_in_use)
890            self.assert_notify_called(mock_notify,
891                                      (['INFO', 'volume.retype'],),
892                                      any_order=True)
893        elif not exc:
894            self.assertEqual(old_vol_type['id'], volume.volume_type_id)
895            self.assertEqual('retyping', volume.status)
896            self.assertEqual(CONF.host, volume.host)
897            self.assertEqual(1, volumes_in_use)
898            self.assert_notify_called(mock_notify,
899                                      (['INFO', 'volume.retype'],),
900                                      any_order=True)
901        else:
902            self.assertEqual(old_vol_type['id'], volume.volume_type_id)
903            self.assertEqual('available', volume.status)
904            self.assertEqual(CONF.host, volume.host)
905            self.assertEqual(0, volumes_in_use)
906        if encryption_changed:
907            self.assertTrue(_mig.called)
908        self.assertEqual(expected_replica_status, volume.replication_status)
909
910    def test_retype_volume_driver_success(self):
911        self._retype_volume_exec(True)
912
913    @ddt.data((False, False), (False, True), (True, False), (True, True))
914    @ddt.unpack
915    def test_retype_volume_replica(self, replica, replica_new):
916        self._retype_volume_exec(True, replica=replica,
917                                 replica_new=replica_new)
918
919    def test_retype_volume_migration_bad_policy(self):
920        # Test volume retype that requires migration by not allowed
921        self._retype_volume_exec(False, policy='never',
922                                 exc=exception.VolumeMigrationFailed)
923
924    def test_retype_volume_migration_with_replica(self):
925        self._retype_volume_exec(False,
926                                 replica=True,
927                                 exc=exception.InvalidVolume)
928
929    def test_retype_volume_migration_with_snaps(self):
930        self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)
931
932    def test_retype_volume_migration_failed(self):
933        self._retype_volume_exec(False, migrate_exc=True, exc=KeyError)
934
935    def test_retype_volume_migration_success(self):
936        self._retype_volume_exec(False, migrate_exc=False, exc=None)
937
938    def test_retype_volume_migration_equal_types(self):
939        self._retype_volume_exec(False, diff_equal=True)
940
941    def test_retype_volume_with_type_only(self):
942        self._retype_volume_exec(True, reserve_vol_type_only=True)
943
944    def test_retype_volume_migration_encryption(self):
945        self._retype_volume_exec(False, encryption_changed=True)
946
947    def test_migrate_driver_not_initialized(self):
948        volume = tests_utils.create_volume(self.context, size=0,
949                                           host=CONF.host)
950        host_obj = {'host': 'newhost', 'capabilities': {}}
951
952        self.volume.driver._initialized = False
953        self.assertRaises(exception.DriverNotInitialized,
954                          self.volume.migrate_volume,
955                          self.context, volume, host_obj, True)
956
957        volume = objects.Volume.get_by_id(context.get_admin_context(),
958                                          volume.id)
959        self.assertEqual('error', volume.migration_status)
960
961        # lets cleanup the mess.
962        self.volume.driver._initialized = True
963        self.volume.delete_volume(self.context, volume)
964
965    def test_delete_source_volume_in_migration(self):
966        """Test deleting a source volume that is in migration."""
967        self._test_delete_volume_in_migration('migrating')
968
969    def test_delete_destination_volume_in_migration(self):
970        """Test deleting a destination volume that is in migration."""
971        self._test_delete_volume_in_migration('target:vol-id')
972
973    def _test_delete_volume_in_migration(self, migration_status):
974        """Test deleting a volume that is in migration."""
975        volume = tests_utils.create_volume(self.context, host=CONF.host,
976                                           migration_status=migration_status)
977        self.volume.delete_volume(self.context, volume=volume)
978
979        # The volume is successfully removed during the volume delete
980        # and won't exist in the database any more.
981        self.assertRaises(exception.VolumeNotFound, volume.refresh)
982
983    def test_retype_volume_not_capable_to_replica(self):
984        elevated = context.get_admin_context()
985        db.volume_type_create(elevated, {'name': 'old', 'extra_specs': {}})
986        old_vol_type = db.volume_type_get_by_name(elevated, 'old')
987        new_extra_specs = {'replication_enabled': '<is> True'}
988        db.volume_type_create(elevated, {'name': 'new',
989                                         'extra_specs': new_extra_specs})
990        new_vol_type = db.volume_type_get_by_name(elevated, 'new')
991        volume = tests_utils.create_volume(self.context, size=1,
992                                           host=CONF.host, status='available',
993                                           volume_type_id=old_vol_type['id'],
994                                           replication_status='not-capable')
995        host_obj = {'host': 'newhost', 'capabilities': {}}
996        with mock.patch.object(self.volume,
997                               'migrate_volume') as migrate_volume:
998            migrate_volume.return_value = True
999            self.volume.retype(self.context, volume, new_vol_type['id'],
1000                               host_obj, migration_policy='on-demand')
1001