1# Copyright 2010 United States Government as represented by the 2# Administrator of the National Aeronautics and Space Administration. 3# All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16"""Tests for global snapshot cases.""" 17 18import ddt 19import os 20import sys 21 22import mock 23from oslo_config import cfg 24from oslo_utils import imageutils 25 26from cinder import context 27from cinder import db 28from cinder import exception 29from cinder import objects 30from cinder.objects import fields 31from cinder import quota 32from cinder import test 33from cinder.tests.unit.brick import fake_lvm 34from cinder.tests.unit import fake_constants as fake 35from cinder.tests.unit import utils as tests_utils 36from cinder.tests.unit import volume as base 37import cinder.volume 38 39QUOTAS = quota.QUOTAS 40 41CONF = cfg.CONF 42 43OVER_SNAPSHOT_QUOTA_EXCEPTION = exception.OverQuota( 44 overs=['snapshots'], 45 usages = {'snapshots': {'reserved': 1, 'in_use': 9}}, 46 quotas = {'gigabytes': 10, 'snapshots': 10}) 47 48 49def create_snapshot(volume_id, size=1, metadata=None, ctxt=None, 50 **kwargs): 51 """Create a snapshot object.""" 52 metadata = metadata or {} 53 snap = objects.Snapshot(ctxt or context.get_admin_context()) 54 snap.volume_size = size 55 snap.user_id = fake.USER_ID 56 snap.project_id = fake.PROJECT_ID 57 snap.volume_id = volume_id 58 snap.status = fields.SnapshotStatus.CREATING 59 if metadata is not None: 60 snap.metadata = metadata 61 snap.update(kwargs) 62 63 snap.create() 64 return snap 65 66 67@ddt.ddt 68class SnapshotTestCase(base.BaseVolumeTestCase): 69 def test_delete_snapshot_frozen(self): 70 service = tests_utils.create_service(self.context, {'frozen': True}) 71 volume = tests_utils.create_volume(self.context, host=service.host) 72 snapshot = tests_utils.create_snapshot(self.context, volume.id) 73 self.assertRaises(exception.InvalidInput, 74 self.volume_api.delete_snapshot, self.context, 75 snapshot) 76 77 @ddt.data('create_snapshot', 'create_snapshot_force') 78 def test_create_snapshot_frozen(self, method): 79 service = tests_utils.create_service(self.context, {'frozen': True}) 80 volume = tests_utils.create_volume(self.context, host=service.host) 81 method = getattr(self.volume_api, method) 82 self.assertRaises(exception.InvalidInput, 83 method, self.context, volume, 'name', 'desc') 84 85 def test_create_snapshot_driver_not_initialized(self): 86 volume_src = tests_utils.create_volume(self.context, 87 **self.volume_params) 88 self.volume.create_volume(self.context, volume_src) 89 snapshot_id = create_snapshot(volume_src['id'], 90 size=volume_src['size'])['id'] 91 snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id) 92 93 self.volume.driver._initialized = False 94 95 self.assertRaises(exception.DriverNotInitialized, 96 self.volume.create_snapshot, 97 self.context, snapshot_obj) 98 99 # NOTE(flaper87): The volume status should be error. 100 self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status) 101 102 # lets cleanup the mess 103 self.volume.driver._initialized = True 104 self.volume.delete_snapshot(self.context, snapshot_obj) 105 self.volume.delete_volume(self.context, volume_src) 106 107 @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify') 108 def test_create_delete_snapshot(self, mock_notify): 109 """Test snapshot can be created and deleted.""" 110 volume = tests_utils.create_volume( 111 self.context, 112 availability_zone=CONF.storage_availability_zone, 113 **self.volume_params) 114 115 mock_notify.assert_not_called() 116 117 self.volume.create_volume(self.context, volume) 118 119 self.assert_notify_called(mock_notify, 120 (['INFO', 'volume.create.start'], 121 ['INFO', 'volume.create.end'])) 122 123 snapshot = create_snapshot(volume['id'], size=volume['size']) 124 snapshot_id = snapshot.id 125 self.volume.create_snapshot(self.context, snapshot) 126 self.assertEqual( 127 snapshot_id, objects.Snapshot.get_by_id(self.context, 128 snapshot_id).id) 129 130 self.assert_notify_called(mock_notify, 131 (['INFO', 'volume.create.start'], 132 ['INFO', 'volume.create.end'], 133 ['INFO', 'snapshot.create.start'], 134 ['INFO', 'snapshot.create.end'])) 135 136 self.volume.delete_snapshot(self.context, snapshot) 137 self.assert_notify_called(mock_notify, 138 (['INFO', 'volume.create.start'], 139 ['INFO', 'volume.create.end'], 140 ['INFO', 'snapshot.create.start'], 141 ['INFO', 'snapshot.create.end'], 142 ['INFO', 'snapshot.delete.start'], 143 ['INFO', 'snapshot.delete.end'])) 144 145 snap = objects.Snapshot.get_by_id(context.get_admin_context( 146 read_deleted='yes'), snapshot_id) 147 self.assertEqual(fields.SnapshotStatus.DELETED, snap.status) 148 self.assertRaises(exception.NotFound, 149 db.snapshot_get, 150 self.context, 151 snapshot_id) 152 self.volume.delete_volume(self.context, volume) 153 154 def test_create_delete_snapshot_with_metadata(self): 155 """Test snapshot can be created with metadata and deleted.""" 156 test_meta = {'fake_key': 'fake_value'} 157 volume = tests_utils.create_volume(self.context, **self.volume_params) 158 snapshot = create_snapshot(volume['id'], size=volume['size'], 159 metadata=test_meta) 160 snapshot_id = snapshot.id 161 162 result_dict = snapshot.metadata 163 164 self.assertEqual(test_meta, result_dict) 165 self.volume.delete_snapshot(self.context, snapshot) 166 self.assertRaises(exception.NotFound, 167 db.snapshot_get, 168 self.context, 169 snapshot_id) 170 171 def test_delete_snapshot_another_cluster_fails(self): 172 """Test delete of snapshot from another cluster fails.""" 173 self.volume.cluster = 'mycluster' 174 volume = tests_utils.create_volume(self.context, status='available', 175 size=1, host=CONF.host + 'fake', 176 cluster_name=self.volume.cluster) 177 snapshot = create_snapshot(volume.id, size=volume.size) 178 179 self.volume.delete_snapshot(self.context, snapshot) 180 self.assertRaises(exception.NotFound, 181 db.snapshot_get, 182 self.context, 183 snapshot.id) 184 185 @mock.patch.object(db, 'snapshot_create', 186 side_effect=exception.InvalidSnapshot( 187 'Create snapshot in db failed!')) 188 def test_create_snapshot_failed_db_snapshot(self, mock_snapshot): 189 """Test exception handling when create snapshot in db failed.""" 190 test_volume = tests_utils.create_volume( 191 self.context, 192 status='available', 193 host=CONF.host) 194 volume_api = cinder.volume.api.API() 195 self.assertRaises(exception.InvalidSnapshot, 196 volume_api.create_snapshot, 197 self.context, 198 test_volume, 199 'fake_name', 200 'fake_description') 201 202 @mock.patch('cinder.objects.volume.Volume.get_by_id') 203 def test_create_snapshot_in_db_invalid_volume_status(self, mock_get): 204 test_volume1 = tests_utils.create_volume( 205 self.context, 206 status='available', 207 host=CONF.host) 208 test_volume2 = tests_utils.create_volume( 209 self.context, 210 status='deleting', 211 host=CONF.host) 212 mock_get.return_value = test_volume2 213 volume_api = cinder.volume.api.API() 214 215 self.assertRaises(exception.InvalidVolume, 216 volume_api.create_snapshot_in_db, 217 self.context, test_volume1, "fake_snapshot_name", 218 "fake_description", False, {}, None, 219 commit_quota=False) 220 221 @mock.patch('cinder.objects.volume.Volume.get_by_id') 222 def test_create_snapshot_in_db_invalid_metadata(self, mock_get): 223 test_volume = tests_utils.create_volume( 224 self.context, 225 status='available', 226 host=CONF.host) 227 mock_get.return_value = test_volume 228 volume_api = cinder.volume.api.API() 229 230 with mock.patch.object(QUOTAS, 'add_volume_type_opts'),\ 231 mock.patch.object(QUOTAS, 'reserve') as mock_reserve,\ 232 mock.patch.object(QUOTAS, 'commit') as mock_commit: 233 self.assertRaises(exception.InvalidInput, 234 volume_api.create_snapshot_in_db, 235 self.context, test_volume, "fake_snapshot_name", 236 "fake_description", False, "fake_metadata", None, 237 commit_quota=True) 238 mock_reserve.assert_not_called() 239 mock_commit.assert_not_called() 240 241 def test_create_snapshot_failed_maintenance(self): 242 """Test exception handling when create snapshot in maintenance.""" 243 test_volume = tests_utils.create_volume( 244 self.context, 245 status='maintenance', 246 host=CONF.host) 247 volume_api = cinder.volume.api.API() 248 self.assertRaises(exception.InvalidVolume, 249 volume_api.create_snapshot, 250 self.context, 251 test_volume, 252 'fake_name', 253 'fake_description') 254 255 @mock.patch.object(QUOTAS, 'commit', 256 side_effect=exception.QuotaError( 257 'Snapshot quota commit failed!')) 258 def test_create_snapshot_failed_quota_commit(self, mock_snapshot): 259 """Test exception handling when snapshot quota commit failed.""" 260 test_volume = tests_utils.create_volume( 261 self.context, 262 status='available', 263 host=CONF.host) 264 volume_api = cinder.volume.api.API() 265 self.assertRaises(exception.QuotaError, 266 volume_api.create_snapshot, 267 self.context, 268 test_volume, 269 'fake_name', 270 'fake_description') 271 272 @mock.patch.object(QUOTAS, 'reserve', 273 side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION) 274 def test_create_snapshot_failed_quota_reserve(self, mock_reserve): 275 """Test exception handling when snapshot quota reserve failed.""" 276 test_volume = tests_utils.create_volume( 277 self.context, 278 status='available', 279 host=CONF.host) 280 volume_api = cinder.volume.api.API() 281 self.assertRaises(exception.SnapshotLimitExceeded, 282 volume_api.create_snapshot, 283 self.context, 284 test_volume, 285 'fake_name', 286 'fake_description') 287 288 @mock.patch.object(QUOTAS, 'reserve', 289 side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION) 290 def test_create_snapshots_in_db_failed_quota_reserve(self, mock_reserve): 291 """Test exception handling when snapshot quota reserve failed.""" 292 test_volume = tests_utils.create_volume( 293 self.context, 294 status='available', 295 host=CONF.host) 296 volume_api = cinder.volume.api.API() 297 self.assertRaises(exception.SnapshotLimitExceeded, 298 volume_api.create_snapshots_in_db, 299 self.context, 300 [test_volume], 301 'fake_name', 302 'fake_description', 303 fake.CONSISTENCY_GROUP_ID) 304 305 def test_create_snapshot_failed_host_is_None(self): 306 """Test exception handling when create snapshot and host is None.""" 307 test_volume = tests_utils.create_volume( 308 self.context, 309 host=None) 310 volume_api = cinder.volume.api.API() 311 self.assertRaises(exception.InvalidVolume, 312 volume_api.create_snapshot, 313 self.context, 314 test_volume, 315 'fake_name', 316 'fake_description') 317 318 def test_create_snapshot_force(self): 319 """Test snapshot in use can be created forcibly.""" 320 321 instance_uuid = '12345678-1234-5678-1234-567812345678' 322 # create volume and attach to the instance 323 volume = tests_utils.create_volume(self.context, **self.volume_params) 324 self.volume.create_volume(self.context, volume) 325 values = {'volume_id': volume['id'], 326 'instance_uuid': instance_uuid, 327 'attach_status': fields.VolumeAttachStatus.ATTACHING, } 328 attachment = db.volume_attach(self.context, values) 329 db.volume_attached(self.context, attachment['id'], instance_uuid, 330 None, '/dev/sda1') 331 332 volume_api = cinder.volume.api.API() 333 volume = volume_api.get(self.context, volume['id']) 334 self.assertRaises(exception.InvalidVolume, 335 volume_api.create_snapshot, 336 self.context, volume, 337 'fake_name', 'fake_description') 338 snapshot_ref = volume_api.create_snapshot_force(self.context, 339 volume, 340 'fake_name', 341 'fake_description') 342 snapshot_ref.destroy() 343 db.volume_destroy(self.context, volume['id']) 344 345 # create volume and attach to the host 346 volume = tests_utils.create_volume(self.context, **self.volume_params) 347 self.volume.create_volume(self.context, volume) 348 values = {'volume_id': volume['id'], 349 'attached_host': 'fake_host', 350 'attach_status': fields.VolumeAttachStatus.ATTACHING, } 351 attachment = db.volume_attach(self.context, values) 352 db.volume_attached(self.context, attachment['id'], None, 353 'fake_host', '/dev/sda1') 354 355 volume_api = cinder.volume.api.API() 356 volume = volume_api.get(self.context, volume['id']) 357 self.assertRaises(exception.InvalidVolume, 358 volume_api.create_snapshot, 359 self.context, volume, 360 'fake_name', 'fake_description') 361 snapshot_ref = volume_api.create_snapshot_force(self.context, 362 volume, 363 'fake_name', 364 'fake_description') 365 snapshot_ref.destroy() 366 db.volume_destroy(self.context, volume['id']) 367 368 @mock.patch('cinder.image.image_utils.qemu_img_info') 369 def test_create_snapshot_from_bootable_volume(self, mock_qemu_info): 370 """Test create snapshot from bootable volume.""" 371 # create bootable volume from image 372 volume = self._create_volume_from_image() 373 volume_id = volume['id'] 374 self.assertEqual('available', volume['status']) 375 self.assertTrue(volume['bootable']) 376 377 image_info = imageutils.QemuImgInfo() 378 image_info.virtual_size = '1073741824' 379 mock_qemu_info.return_value = image_info 380 381 # get volume's volume_glance_metadata 382 ctxt = context.get_admin_context() 383 vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id) 384 self.assertTrue(bool(vol_glance_meta)) 385 386 # create snapshot from bootable volume 387 snap = create_snapshot(volume_id) 388 self.volume.create_snapshot(ctxt, snap) 389 390 # get snapshot's volume_glance_metadata 391 snap_glance_meta = db.volume_snapshot_glance_metadata_get( 392 ctxt, snap.id) 393 self.assertTrue(bool(snap_glance_meta)) 394 395 # ensure that volume's glance metadata is copied 396 # to snapshot's glance metadata 397 self.assertEqual(len(vol_glance_meta), len(snap_glance_meta)) 398 vol_glance_dict = {x.key: x.value for x in vol_glance_meta} 399 snap_glance_dict = {x.key: x.value for x in snap_glance_meta} 400 self.assertDictEqual(vol_glance_dict, snap_glance_dict) 401 402 # ensure that snapshot's status is changed to 'available' 403 self.assertEqual(fields.SnapshotStatus.AVAILABLE, snap.status) 404 405 # cleanup resource 406 snap.destroy() 407 db.volume_destroy(ctxt, volume_id) 408 409 @mock.patch('cinder.image.image_utils.qemu_img_info') 410 def test_create_snapshot_from_bootable_volume_fail(self, mock_qemu_info): 411 """Test create snapshot from bootable volume. 412 413 But it fails to volume_glance_metadata_copy_to_snapshot. 414 As a result, status of snapshot is changed to ERROR. 415 """ 416 # create bootable volume from image 417 volume = self._create_volume_from_image() 418 volume_id = volume['id'] 419 self.assertEqual('available', volume['status']) 420 self.assertTrue(volume['bootable']) 421 422 image_info = imageutils.QemuImgInfo() 423 image_info.virtual_size = '1073741824' 424 mock_qemu_info.return_value = image_info 425 426 # get volume's volume_glance_metadata 427 ctxt = context.get_admin_context() 428 vol_glance_meta = db.volume_glance_metadata_get(ctxt, volume_id) 429 self.assertTrue(bool(vol_glance_meta)) 430 snap = create_snapshot(volume_id) 431 self.assertEqual(36, len(snap.id)) # dynamically-generated UUID 432 self.assertEqual('creating', snap.status) 433 434 # set to return DB exception 435 with mock.patch.object(db, 'volume_glance_metadata_copy_to_snapshot')\ 436 as mock_db: 437 mock_db.side_effect = exception.MetadataCopyFailure( 438 reason="Because of DB service down.") 439 # create snapshot from bootable volume 440 self.assertRaises(exception.MetadataCopyFailure, 441 self.volume.create_snapshot, 442 ctxt, 443 snap) 444 445 # get snapshot's volume_glance_metadata 446 self.assertRaises(exception.GlanceMetadataNotFound, 447 db.volume_snapshot_glance_metadata_get, 448 ctxt, snap.id) 449 450 # ensure that status of snapshot is 'error' 451 self.assertEqual(fields.SnapshotStatus.ERROR, snap.status) 452 453 # cleanup resource 454 snap.destroy() 455 db.volume_destroy(ctxt, volume_id) 456 457 def test_create_snapshot_from_bootable_volume_with_volume_metadata_none( 458 self): 459 volume = tests_utils.create_volume(self.context, **self.volume_params) 460 volume_id = volume['id'] 461 462 self.volume.create_volume(self.context, volume) 463 # set bootable flag of volume to True 464 db.volume_update(self.context, volume_id, {'bootable': True}) 465 466 snapshot = create_snapshot(volume['id']) 467 self.volume.create_snapshot(self.context, snapshot) 468 self.assertRaises(exception.GlanceMetadataNotFound, 469 db.volume_snapshot_glance_metadata_get, 470 self.context, snapshot.id) 471 472 # ensure that status of snapshot is 'available' 473 self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot.status) 474 475 # cleanup resource 476 snapshot.destroy() 477 db.volume_destroy(self.context, volume_id) 478 479 def test_create_snapshot_during_encryption_key_migration(self): 480 fixed_key_id = '00000000-0000-0000-0000-000000000000' 481 volume = tests_utils.create_volume(self.context, **self.volume_params) 482 volume['encryption_key_id'] = fixed_key_id 483 volume_id = volume['id'] 484 485 self.volume.create_volume(self.context, volume) 486 487 kwargs = {'encryption_key_id': fixed_key_id} 488 snapshot = create_snapshot(volume['id'], **kwargs) 489 490 self.assertEqual(fixed_key_id, snapshot.encryption_key_id) 491 db.volume_update(self.context, 492 volume_id, 493 {'encryption_key_id': fake.ENCRYPTION_KEY_ID}) 494 495 self.volume.create_snapshot(self.context, snapshot) 496 497 snap_db = db.snapshot_get(self.context, snapshot.id) 498 self.assertEqual(fake.ENCRYPTION_KEY_ID, snap_db.encryption_key_id) 499 500 # cleanup resource 501 snapshot.destroy() 502 db.volume_destroy(self.context, volume_id) 503 504 def test_delete_busy_snapshot(self): 505 """Test snapshot can be created and deleted.""" 506 507 self.volume.driver.vg = fake_lvm.FakeBrickLVM('cinder-volumes', 508 False, 509 None, 510 'default') 511 512 volume = tests_utils.create_volume(self.context, **self.volume_params) 513 volume_id = volume['id'] 514 self.volume.create_volume(self.context, volume) 515 snapshot = create_snapshot(volume_id, size=volume['size']) 516 self.volume.create_snapshot(self.context, snapshot) 517 518 with mock.patch.object(self.volume.driver, 'delete_snapshot', 519 side_effect=exception.SnapshotIsBusy( 520 snapshot_name='fake') 521 ) as mock_del_snap: 522 snapshot_id = snapshot.id 523 self.volume.delete_snapshot(self.context, snapshot) 524 snapshot_ref = objects.Snapshot.get_by_id(self.context, 525 snapshot_id) 526 self.assertEqual(snapshot_id, snapshot_ref.id) 527 self.assertEqual(fields.SnapshotStatus.AVAILABLE, 528 snapshot_ref.status) 529 mock_del_snap.assert_called_once_with(snapshot) 530 531 @test.testtools.skipIf(sys.platform == "darwin", "SKIP on OSX") 532 def test_delete_no_dev_fails(self): 533 """Test delete snapshot with no dev file fails.""" 534 self.mock_object(os.path, 'exists', lambda x: False) 535 self.volume.driver.vg = fake_lvm.FakeBrickLVM('cinder-volumes', 536 False, 537 None, 538 'default') 539 540 volume = tests_utils.create_volume(self.context, **self.volume_params) 541 volume_id = volume['id'] 542 self.volume.create_volume(self.context, volume) 543 snapshot = create_snapshot(volume_id) 544 snapshot_id = snapshot.id 545 self.volume.create_snapshot(self.context, snapshot) 546 547 with mock.patch.object(self.volume.driver, 'delete_snapshot', 548 side_effect=exception.SnapshotIsBusy( 549 snapshot_name='fake')) as mock_del_snap: 550 self.volume.delete_snapshot(self.context, snapshot) 551 snapshot_ref = objects.Snapshot.get_by_id(self.context, 552 snapshot_id) 553 self.assertEqual(snapshot_id, snapshot_ref.id) 554 self.assertEqual(fields.SnapshotStatus.AVAILABLE, 555 snapshot_ref.status) 556 mock_del_snap.assert_called_once_with(snapshot) 557 558 def test_volume_api_update_snapshot(self): 559 # create raw snapshot 560 volume = tests_utils.create_volume(self.context, **self.volume_params) 561 snapshot = create_snapshot(volume['id']) 562 snapshot_id = snapshot.id 563 self.assertIsNone(snapshot.display_name) 564 # use volume.api to update name 565 volume_api = cinder.volume.api.API() 566 update_dict = {'display_name': 'test update name'} 567 volume_api.update_snapshot(self.context, snapshot, update_dict) 568 # read changes from db 569 snap = objects.Snapshot.get_by_id(context.get_admin_context(), 570 snapshot_id) 571 self.assertEqual('test update name', snap.display_name) 572 573 @mock.patch.object(QUOTAS, 'reserve', 574 side_effect = OVER_SNAPSHOT_QUOTA_EXCEPTION) 575 def test_existing_snapshot_failed_quota_reserve(self, mock_reserve): 576 vol = tests_utils.create_volume(self.context) 577 snap = tests_utils.create_snapshot(self.context, vol.id) 578 with mock.patch.object( 579 self.volume.driver, 580 'manage_existing_snapshot_get_size') as mock_get_size: 581 mock_get_size.return_value = 1 582 self.assertRaises(exception.SnapshotLimitExceeded, 583 self.volume.manage_existing_snapshot, 584 self.context, 585 snap) 586