1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16"""Tests for global snapshot cases."""
17
18import ddt
19import os
20import sys
21
22import mock
23from oslo_config import cfg
24from oslo_utils import imageutils
25
26from cinder import context
27from cinder import db
28from cinder import exception
29from cinder import objects
30from cinder.objects import fields
31from cinder import quota
32from cinder import test
33from cinder.tests.unit.brick import fake_lvm
34from cinder.tests.unit import fake_constants as fake
35from cinder.tests.unit import utils as tests_utils
36from cinder.tests.unit import volume as base
37import cinder.volume
38
39QUOTAS = quota.QUOTAS
40
41CONF = cfg.CONF
42
43OVER_SNAPSHOT_QUOTA_EXCEPTION = exception.OverQuota(
44    overs=['snapshots'],
45    usages = {'snapshots': {'reserved': 1, 'in_use': 9}},
46    quotas = {'gigabytes': 10, 'snapshots': 10})
47
48
49def create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
50                    **kwargs):
51    """Create a snapshot object."""
52    metadata = metadata or {}
53    snap = objects.Snapshot(ctxt or context.get_admin_context())
54    snap.volume_size = size
55    snap.user_id = fake.USER_ID
56    snap.project_id = fake.PROJECT_ID
57    snap.volume_id = volume_id
58    snap.status = fields.SnapshotStatus.CREATING
59    if metadata is not None:
60        snap.metadata = metadata
61    snap.update(kwargs)
62
63    snap.create()
64    return snap
65
66
67@ddt.ddt
68class SnapshotTestCase(base.BaseVolumeTestCase):
69    def test_delete_snapshot_frozen(self):
70        service = tests_utils.create_service(self.context, {'frozen': True})
71        volume = tests_utils.create_volume(self.context, host=service.host)
72        snapshot = tests_utils.create_snapshot(self.context, volume.id)
73        self.assertRaises(exception.InvalidInput,
74                          self.volume_api.delete_snapshot, self.context,
75                          snapshot)
76
77    @ddt.data('create_snapshot', 'create_snapshot_force')
78    def test_create_snapshot_frozen(self, method):
79        service = tests_utils.create_service(self.context, {'frozen': True})
80        volume = tests_utils.create_volume(self.context, host=service.host)
81        method = getattr(self.volume_api, method)
82        self.assertRaises(exception.InvalidInput,
83                          method, self.context, volume, 'name', 'desc')
84
85    def test_create_snapshot_driver_not_initialized(self):
86        volume_src = tests_utils.create_volume(self.context,
87                                               **self.volume_params)
88        self.volume.create_volume(self.context, volume_src)
89        snapshot_id = create_snapshot(volume_src['id'],
90                                      size=volume_src['size'])['id']
91        snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
92
93        self.volume.driver._initialized = False
94
95        self.assertRaises(exception.DriverNotInitialized,
96                          self.volume.create_snapshot,
97                          self.context, snapshot_obj)
98
99        # NOTE(flaper87): The volume status should be error.
100        self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status)
101
102        # lets cleanup the mess
103        self.volume.driver._initialized = True
104        self.volume.delete_snapshot(self.context, snapshot_obj)
105        self.volume.delete_volume(self.context, volume_src)
106
107    @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
108    def test_create_delete_snapshot(self, mock_notify):
109        """Test snapshot can be created and deleted."""
110        volume = tests_utils.create_volume(
111            self.context,
112            availability_zone=CONF.storage_availability_zone,
113            **self.volume_params)
114
115        mock_notify.assert_not_called()
116
117        self.volume.create_volume(self.context, volume)
118
119        self.assert_notify_called(mock_notify,
120                                  (['INFO', 'volume.create.start'],
121                                   ['INFO', 'volume.create.end']))
122
123        snapshot = create_snapshot(volume['id'], size=volume['size'])
124        snapshot_id = snapshot.id
125        self.volume.create_snapshot(self.context, snapshot)
126        self.assertEqual(
127            snapshot_id, objects.Snapshot.get_by_id(self.context,
128                                                    snapshot_id).id)
129
130        self.assert_notify_called(mock_notify,
131                                  (['INFO', 'volume.create.start'],
132                                   ['INFO', 'volume.create.end'],
133                                   ['INFO', 'snapshot.create.start'],
134                                   ['INFO', 'snapshot.create.end']))
135
136        self.volume.delete_snapshot(self.context, snapshot)
137        self.assert_notify_called(mock_notify,
138                                  (['INFO', 'volume.create.start'],
139                                   ['INFO', 'volume.create.end'],
140                                   ['INFO', 'snapshot.create.start'],
141                                   ['INFO', 'snapshot.create.end'],
142                                   ['INFO', 'snapshot.delete.start'],
143                                   ['INFO', 'snapshot.delete.end']))
144
145        snap = objects.Snapshot.get_by_id(context.get_admin_context(
146            read_deleted='yes'), snapshot_id)
147        self.assertEqual(fields.SnapshotStatus.DELETED, snap.status)
148        self.assertRaises(exception.NotFound,
149                          db.snapshot_get,
150                          self.context,
151                          snapshot_id)
152        self.volume.delete_volume(self.context, volume)
153
154    def test_create_delete_snapshot_with_metadata(self):
155        """Test snapshot can be created with metadata and deleted."""
156        test_meta = {'fake_key': 'fake_value'}
157        volume = tests_utils.create_volume(self.context, **self.volume_params)
158        snapshot = create_snapshot(volume['id'], size=volume['size'],
159                                   metadata=test_meta)
160        snapshot_id = snapshot.id
161
162        result_dict = snapshot.metadata
163
164        self.assertEqual(test_meta, result_dict)
165        self.volume.delete_snapshot(self.context, snapshot)
166        self.assertRaises(exception.NotFound,
167                          db.snapshot_get,
168                          self.context,
169                          snapshot_id)
170
171    def test_delete_snapshot_another_cluster_fails(self):
172        """Test delete of snapshot from another cluster fails."""
173        self.volume.cluster = 'mycluster'
174        volume = tests_utils.create_volume(self.context, status='available',
175                                           size=1, host=CONF.host + 'fake',
176                                           cluster_name=self.volume.cluster)
177        snapshot = create_snapshot(volume.id, size=volume.size)
178
179        self.volume.delete_snapshot(self.context, snapshot)
180        self.assertRaises(exception.NotFound,
181                          db.snapshot_get,
182                          self.context,
183                          snapshot.id)
184
185    @mock.patch.object(db, 'snapshot_create',
186                       side_effect=exception.InvalidSnapshot(
187                           'Create snapshot in db failed!'))
188    def test_create_snapshot_failed_db_snapshot(self, mock_snapshot):
189        """Test exception handling when create snapshot in db failed."""
190        test_volume = tests_utils.create_volume(
191            self.context,
192            status='available',
193            host=CONF.host)
194        volume_api = cinder.volume.api.API()
195        self.assertRaises(exception.InvalidSnapshot,
196                          volume_api.create_snapshot,
197                          self.context,
198                          test_volume,
199                          'fake_name',
200                          'fake_description')
201
202    @mock.patch('cinder.objects.volume.Volume.get_by_id')
203    def test_create_snapshot_in_db_invalid_volume_status(self, mock_get):
204        test_volume1 = tests_utils.create_volume(
205            self.context,
206            status='available',
207            host=CONF.host)
208        test_volume2 = tests_utils.create_volume(
209            self.context,
210            status='deleting',
211            host=CONF.host)
212        mock_get.return_value = test_volume2
213        volume_api = cinder.volume.api.API()
214
215        self.assertRaises(exception.InvalidVolume,
216                          volume_api.create_snapshot_in_db,
217                          self.context, test_volume1, "fake_snapshot_name",
218                          "fake_description", False, {}, None,
219                          commit_quota=False)
220
221    @mock.patch('cinder.objects.volume.Volume.get_by_id')
222    def test_create_snapshot_in_db_invalid_metadata(self, mock_get):
223        test_volume = tests_utils.create_volume(
224            self.context,
225            status='available',
226            host=CONF.host)
227        mock_get.return_value = test_volume
228        volume_api = cinder.volume.api.API()
229
230        with mock.patch.object(QUOTAS, 'add_volume_type_opts'),\
231            mock.patch.object(QUOTAS, 'reserve') as mock_reserve,\
232                mock.patch.object(QUOTAS, 'commit') as mock_commit:
233            self.assertRaises(exception.InvalidInput,
234                              volume_api.create_snapshot_in_db,
235                              self.context, test_volume, "fake_snapshot_name",
236                              "fake_description", False, "fake_metadata", None,
237                              commit_quota=True)
238            mock_reserve.assert_not_called()
239            mock_commit.assert_not_called()
240
241    def test_create_snapshot_failed_maintenance(self):
242        """Test exception handling when create snapshot in maintenance."""
243        test_volume = tests_utils.create_volume(
244            self.context,
245            status='maintenance',
246            host=CONF.host)
247        volume_api = cinder.volume.api.API()
248        self.assertRaises(exception.InvalidVolume,
249                          volume_api.create_snapshot,
250                          self.context,
251                          test_volume,
252                          'fake_name',
253                          'fake_description')
254
255    @mock.patch.object(QUOTAS, 'commit',
256                       side_effect=exception.QuotaError(
257                           'Snapshot quota commit failed!'))
258    def test_create_snapshot_failed_quota_commit(self, mock_snapshot):
259        """Test exception handling when snapshot quota commit failed."""
260        test_volume = tests_utils.create_volume(
261            self.context,
262            status='available',
263            host=CONF.host)
264        volume_api = cinder.volume.api.API()
265        self.assertRaises(exception.QuotaError,
266                          volume_api.create_snapshot,
267                          self.context,
268                          test_volume,
269                          'fake_name',
270                          'fake_description')
271
272    @mock.patch.object(QUOTAS, 'reserve',
273                       side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION)
274    def test_create_snapshot_failed_quota_reserve(self, mock_reserve):
275        """Test exception handling when snapshot quota reserve failed."""
276        test_volume = tests_utils.create_volume(
277            self.context,
278            status='available',
279            host=CONF.host)
280        volume_api = cinder.volume.api.API()
281        self.assertRaises(exception.SnapshotLimitExceeded,
282                          volume_api.create_snapshot,
283                          self.context,
284                          test_volume,
285                          'fake_name',
286                          'fake_description')
287
288    @mock.patch.object(QUOTAS, 'reserve',
289                       side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION)
290    def test_create_snapshots_in_db_failed_quota_reserve(self, mock_reserve):
291        """Test exception handling when snapshot quota reserve failed."""
292        test_volume = tests_utils.create_volume(
293            self.context,
294            status='available',
295            host=CONF.host)
296        volume_api = cinder.volume.api.API()
297        self.assertRaises(exception.SnapshotLimitExceeded,
298                          volume_api.create_snapshots_in_db,
299                          self.context,
300                          [test_volume],
301                          'fake_name',
302                          'fake_description',
303                          fake.CONSISTENCY_GROUP_ID)
304
305    def test_create_snapshot_failed_host_is_None(self):
306        """Test exception handling when create snapshot and host is None."""
307        test_volume = tests_utils.create_volume(
308            self.context,
309            host=None)
310        volume_api = cinder.volume.api.API()
311        self.assertRaises(exception.InvalidVolume,
312                          volume_api.create_snapshot,
313                          self.context,
314                          test_volume,
315                          'fake_name',
316                          'fake_description')
317
318    def test_create_snapshot_force(self):
319        """Test snapshot in use can be created forcibly."""
320
321        instance_uuid = '12345678-1234-5678-1234-567812345678'
322        # create volume and attach to the instance
323        volume = tests_utils.create_volume(self.context, **self.volume_params)
324        self.volume.create_volume(self.context, volume)
325        values = {'volume_id': volume['id'],
326                  'instance_uuid': instance_uuid,
327                  'attach_status': fields.VolumeAttachStatus.ATTACHING, }
328        attachment = db.volume_attach(self.context, values)
329        db.volume_attached(self.context, attachment['id'], instance_uuid,
330                           None, '/dev/sda1')
331
332        volume_api = cinder.volume.api.API()
333        volume = volume_api.get(self.context, volume['id'])
334        self.assertRaises(exception.InvalidVolume,
335                          volume_api.create_snapshot,
336                          self.context, volume,
337                          'fake_name', 'fake_description')
338        snapshot_ref = volume_api.create_snapshot_force(self.context,
339                                                        volume,
340                                                        'fake_name',
341                                                        'fake_description')
342        snapshot_ref.destroy()
343        db.volume_destroy(self.context, volume['id'])
344
345        # create volume and attach to the host
346        volume = tests_utils.create_volume(self.context, **self.volume_params)
347        self.volume.create_volume(self.context, volume)
348        values = {'volume_id': volume['id'],
349                  'attached_host': 'fake_host',
350                  'attach_status': fields.VolumeAttachStatus.ATTACHING, }
351        attachment = db.volume_attach(self.context, values)
352        db.volume_attached(self.context, attachment['id'], None,
353                           'fake_host', '/dev/sda1')
354
355        volume_api = cinder.volume.api.API()
356        volume = volume_api.get(self.context, volume['id'])
357        self.assertRaises(exception.InvalidVolume,
358                          volume_api.create_snapshot,
359                          self.context, volume,
360                          'fake_name', 'fake_description')
361        snapshot_ref = volume_api.create_snapshot_force(self.context,
362                                                        volume,
363                                                        'fake_name',
364                                                        'fake_description')
365        snapshot_ref.destroy()
366        db.volume_destroy(self.context, volume['id'])
367
368    @mock.patch('cinder.image.image_utils.qemu_img_info')
369    def test_create_snapshot_from_bootable_volume(self, mock_qemu_info):
370        """Test create snapshot from bootable volume."""
371        # create bootable volume from image
372        volume = self._create_volume_from_image()
373        volume_id = volume['id']
374        self.assertEqual('available', volume['status'])
375        self.assertTrue(volume['bootable'])
376
377        image_info = imageutils.QemuImgInfo()
378        image_info.virtual_size = '1073741824'
379        mock_qemu_info.return_value = image_info
380
381        # get volume's volume_glance_metadata
382        ctxt = context.get_admin_context()
383        vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
384        self.assertTrue(bool(vol_glance_meta))
385
386        # create snapshot from bootable volume
387        snap = create_snapshot(volume_id)
388        self.volume.create_snapshot(ctxt, snap)
389
390        # get snapshot's volume_glance_metadata
391        snap_glance_meta = db.volume_snapshot_glance_metadata_get(
392            ctxt, snap.id)
393        self.assertTrue(bool(snap_glance_meta))
394
395        # ensure that volume's glance metadata is copied
396        # to snapshot's glance metadata
397        self.assertEqual(len(vol_glance_meta), len(snap_glance_meta))
398        vol_glance_dict = {x.key: x.value for x in vol_glance_meta}
399        snap_glance_dict = {x.key: x.value for x in snap_glance_meta}
400        self.assertDictEqual(vol_glance_dict, snap_glance_dict)
401
402        # ensure that snapshot's status is changed to 'available'
403        self.assertEqual(fields.SnapshotStatus.AVAILABLE, snap.status)
404
405        # cleanup resource
406        snap.destroy()
407        db.volume_destroy(ctxt, volume_id)
408
409    @mock.patch('cinder.image.image_utils.qemu_img_info')
410    def test_create_snapshot_from_bootable_volume_fail(self, mock_qemu_info):
411        """Test create snapshot from bootable volume.
412
413        But it fails to volume_glance_metadata_copy_to_snapshot.
414        As a result, status of snapshot is changed to ERROR.
415        """
416        # create bootable volume from image
417        volume = self._create_volume_from_image()
418        volume_id = volume['id']
419        self.assertEqual('available', volume['status'])
420        self.assertTrue(volume['bootable'])
421
422        image_info = imageutils.QemuImgInfo()
423        image_info.virtual_size = '1073741824'
424        mock_qemu_info.return_value = image_info
425
426        # get volume's volume_glance_metadata
427        ctxt = context.get_admin_context()
428        vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id)
429        self.assertTrue(bool(vol_glance_meta))
430        snap = create_snapshot(volume_id)
431        self.assertEqual(36, len(snap.id))  # dynamically-generated UUID
432        self.assertEqual('creating', snap.status)
433
434        # set to return DB exception
435        with mock.patch.object(db, 'volume_glance_metadata_copy_to_snapshot')\
436                as mock_db:
437            mock_db.side_effect = exception.MetadataCopyFailure(
438                reason="Because of DB service down.")
439            # create snapshot from bootable volume
440            self.assertRaises(exception.MetadataCopyFailure,
441                              self.volume.create_snapshot,
442                              ctxt,
443                              snap)
444
445        # get snapshot's volume_glance_metadata
446        self.assertRaises(exception.GlanceMetadataNotFound,
447                          db.volume_snapshot_glance_metadata_get,
448                          ctxt, snap.id)
449
450        # ensure that status of snapshot is 'error'
451        self.assertEqual(fields.SnapshotStatus.ERROR, snap.status)
452
453        # cleanup resource
454        snap.destroy()
455        db.volume_destroy(ctxt, volume_id)
456
457    def test_create_snapshot_from_bootable_volume_with_volume_metadata_none(
458            self):
459        volume = tests_utils.create_volume(self.context, **self.volume_params)
460        volume_id = volume['id']
461
462        self.volume.create_volume(self.context, volume)
463        # set bootable flag of volume to True
464        db.volume_update(self.context, volume_id, {'bootable': True})
465
466        snapshot = create_snapshot(volume['id'])
467        self.volume.create_snapshot(self.context, snapshot)
468        self.assertRaises(exception.GlanceMetadataNotFound,
469                          db.volume_snapshot_glance_metadata_get,
470                          self.context, snapshot.id)
471
472        # ensure that status of snapshot is 'available'
473        self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot.status)
474
475        # cleanup resource
476        snapshot.destroy()
477        db.volume_destroy(self.context, volume_id)
478
479    def test_create_snapshot_during_encryption_key_migration(self):
480        fixed_key_id = '00000000-0000-0000-0000-000000000000'
481        volume = tests_utils.create_volume(self.context, **self.volume_params)
482        volume['encryption_key_id'] = fixed_key_id
483        volume_id = volume['id']
484
485        self.volume.create_volume(self.context, volume)
486
487        kwargs = {'encryption_key_id': fixed_key_id}
488        snapshot = create_snapshot(volume['id'], **kwargs)
489
490        self.assertEqual(fixed_key_id, snapshot.encryption_key_id)
491        db.volume_update(self.context,
492                         volume_id,
493                         {'encryption_key_id': fake.ENCRYPTION_KEY_ID})
494
495        self.volume.create_snapshot(self.context, snapshot)
496
497        snap_db = db.snapshot_get(self.context, snapshot.id)
498        self.assertEqual(fake.ENCRYPTION_KEY_ID, snap_db.encryption_key_id)
499
500        # cleanup resource
501        snapshot.destroy()
502        db.volume_destroy(self.context, volume_id)
503
504    def test_delete_busy_snapshot(self):
505        """Test snapshot can be created and deleted."""
506
507        self.volume.driver.vg = fake_lvm.FakeBrickLVM('cinder-volumes',
508                                                      False,
509                                                      None,
510                                                      'default')
511
512        volume = tests_utils.create_volume(self.context, **self.volume_params)
513        volume_id = volume['id']
514        self.volume.create_volume(self.context, volume)
515        snapshot = create_snapshot(volume_id, size=volume['size'])
516        self.volume.create_snapshot(self.context, snapshot)
517
518        with mock.patch.object(self.volume.driver, 'delete_snapshot',
519                               side_effect=exception.SnapshotIsBusy(
520                                   snapshot_name='fake')
521                               ) as mock_del_snap:
522            snapshot_id = snapshot.id
523            self.volume.delete_snapshot(self.context, snapshot)
524            snapshot_ref = objects.Snapshot.get_by_id(self.context,
525                                                      snapshot_id)
526            self.assertEqual(snapshot_id, snapshot_ref.id)
527            self.assertEqual(fields.SnapshotStatus.AVAILABLE,
528                             snapshot_ref.status)
529            mock_del_snap.assert_called_once_with(snapshot)
530
531    @test.testtools.skipIf(sys.platform == "darwin", "SKIP on OSX")
532    def test_delete_no_dev_fails(self):
533        """Test delete snapshot with no dev file fails."""
534        self.mock_object(os.path, 'exists', lambda x: False)
535        self.volume.driver.vg = fake_lvm.FakeBrickLVM('cinder-volumes',
536                                                      False,
537                                                      None,
538                                                      'default')
539
540        volume = tests_utils.create_volume(self.context, **self.volume_params)
541        volume_id = volume['id']
542        self.volume.create_volume(self.context, volume)
543        snapshot = create_snapshot(volume_id)
544        snapshot_id = snapshot.id
545        self.volume.create_snapshot(self.context, snapshot)
546
547        with mock.patch.object(self.volume.driver, 'delete_snapshot',
548                               side_effect=exception.SnapshotIsBusy(
549                                   snapshot_name='fake')) as mock_del_snap:
550            self.volume.delete_snapshot(self.context, snapshot)
551            snapshot_ref = objects.Snapshot.get_by_id(self.context,
552                                                      snapshot_id)
553            self.assertEqual(snapshot_id, snapshot_ref.id)
554            self.assertEqual(fields.SnapshotStatus.AVAILABLE,
555                             snapshot_ref.status)
556            mock_del_snap.assert_called_once_with(snapshot)
557
558    def test_volume_api_update_snapshot(self):
559        # create raw snapshot
560        volume = tests_utils.create_volume(self.context, **self.volume_params)
561        snapshot = create_snapshot(volume['id'])
562        snapshot_id = snapshot.id
563        self.assertIsNone(snapshot.display_name)
564        # use volume.api to update name
565        volume_api = cinder.volume.api.API()
566        update_dict = {'display_name': 'test update name'}
567        volume_api.update_snapshot(self.context, snapshot, update_dict)
568        # read changes from db
569        snap = objects.Snapshot.get_by_id(context.get_admin_context(),
570                                          snapshot_id)
571        self.assertEqual('test update name', snap.display_name)
572
573    @mock.patch.object(QUOTAS, 'reserve',
574                       side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION)
575    def test_existing_snapshot_failed_quota_reserve(self, mock_reserve):
576        vol = tests_utils.create_volume(self.context)
577        snap = tests_utils.create_snapshot(self.context, vol.id)
578        with mock.patch.object(
579                self.volume.driver,
580                'manage_existing_snapshot_get_size') as mock_get_size:
581            mock_get_size.return_value = 1
582            self.assertRaises(exception.SnapshotLimitExceeded,
583                              self.volume.manage_existing_snapshot,
584                              self.context,
585                              snap)
586