1# Copyright 2016 EMC Corporation 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16import ddt 17import mock 18from oslo_utils import strutils 19 20from cinder.api import microversions as mv 21from cinder.api.v3 import snapshots 22from cinder import context 23from cinder import exception 24from cinder.objects import fields 25from cinder.scheduler import rpcapi as scheduler_rpcapi 26from cinder import test 27from cinder.tests.unit.api import fakes 28from cinder.tests.unit import fake_constants as fake 29from cinder.tests.unit import fake_snapshot 30from cinder.tests.unit import fake_volume 31from cinder.tests.unit import utils as test_utils 32from cinder import volume 33 34UUID = '00000000-0000-0000-0000-000000000001' 35INVALID_UUID = '00000000-0000-0000-0000-000000000002' 36 37 38def fake_get(self, context, *args, **kwargs): 39 vol = {'id': fake.VOLUME_ID, 40 'size': 100, 41 'name': 'fake', 42 'host': 'fake-host', 43 'status': 'available', 44 'encryption_key_id': None, 45 'volume_type_id': None, 46 'migration_status': None, 47 'availability_zone': 'fake-zone', 48 'attach_status': 'detached', 49 'metadata': {}} 50 return fake_volume.fake_volume_obj(context, **vol) 51 52 53def create_snapshot_query_with_metadata(metadata_query_string, 54 api_microversion): 55 """Helper to create metadata querystring with microversion""" 56 req = fakes.HTTPRequest.blank('/v3/snapshots?metadata=' + 57 metadata_query_string) 58 req.headers = mv.get_mv_header(api_microversion) 59 req.api_version_request = mv.get_api_version(api_microversion) 60 61 return req 62 63 64@ddt.ddt 65class SnapshotApiTest(test.TestCase): 66 def setUp(self): 67 super(SnapshotApiTest, self).setUp() 68 self.mock_object(volume.api.API, 'get', fake_get) 69 self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot') 70 self.controller = snapshots.SnapshotsController() 71 self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) 72 73 @ddt.data(mv.GROUP_SNAPSHOTS, 74 mv.get_prior_version(mv.GROUP_SNAPSHOTS), 75 mv.SNAPSHOT_LIST_USER_ID) 76 @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict()) 77 @mock.patch('cinder.objects.Volume.get_by_id') 78 @mock.patch('cinder.objects.Snapshot.get_by_id') 79 def test_snapshot_show(self, max_ver, snapshot_get_by_id, volume_get_by_id, 80 snapshot_metadata_get): 81 snapshot = { 82 'id': UUID, 83 'volume_id': fake.VOLUME_ID, 84 'status': fields.SnapshotStatus.AVAILABLE, 85 'volume_size': 100, 86 'display_name': 'Default name', 87 'display_description': 'Default description', 88 'expected_attrs': ['metadata'], 89 'group_snapshot_id': None, 90 } 91 ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True) 92 snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot) 93 fake_volume_obj = fake_volume.fake_volume_obj(ctx) 94 snapshot_get_by_id.return_value = snapshot_obj 95 volume_get_by_id.return_value = fake_volume_obj 96 req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID) 97 req.api_version_request = mv.get_api_version(max_ver) 98 resp_dict = self.controller.show(req, UUID) 99 100 self.assertIn('snapshot', resp_dict) 101 self.assertEqual(UUID, resp_dict['snapshot']['id']) 102 self.assertIn('updated_at', resp_dict['snapshot']) 103 if max_ver == mv.SNAPSHOT_LIST_USER_ID: 104 self.assertIn('user_id', resp_dict['snapshot']) 105 elif max_ver == mv.GROUP_SNAPSHOTS: 106 self.assertIn('group_snapshot_id', resp_dict['snapshot']) 107 self.assertNotIn('user_id', resp_dict['snapshot']) 108 else: 109 self.assertNotIn('group_snapshot_id', resp_dict['snapshot']) 110 self.assertNotIn('user_id', resp_dict['snapshot']) 111 112 def test_snapshot_show_invalid_id(self): 113 snapshot_id = INVALID_UUID 114 req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id) 115 self.assertRaises(exception.SnapshotNotFound, 116 self.controller.show, req, snapshot_id) 117 118 def _create_snapshot(self, name=None, metadata=None): 119 """Creates test snapshopt with provided metadata""" 120 req = fakes.HTTPRequest.blank('/v3/snapshots') 121 snap = {"volume_id": fake.VOLUME_ID, 122 "display_name": name or "Volume Test Name", 123 "description": "Volume Test Desc" 124 } 125 if metadata: 126 snap["metadata"] = metadata 127 body = {"snapshot": snap} 128 self.controller.create(req, body=body) 129 130 @ddt.data(('host', 'test_host1', True), ('cluster_name', 'cluster1', True), 131 ('availability_zone', 'nova1', False)) 132 @ddt.unpack 133 def test_snapshot_list_with_filter(self, filter_name, filter_value, 134 is_admin_user): 135 volume1 = test_utils.create_volume(self.ctx, host='test_host1', 136 cluster_name='cluster1', 137 availability_zone='nova1') 138 volume2 = test_utils.create_volume(self.ctx, host='test_host2', 139 cluster_name='cluster2', 140 availability_zone='nova2') 141 snapshot1 = test_utils.create_snapshot(self.ctx, volume1.id) 142 test_utils.create_snapshot(self.ctx, volume2.id) 143 144 url = '/v3/snapshots?%s=%s' % (filter_name, filter_value) 145 # Generic filtering is introduced since '3,31' and we add 146 # 'availability_zone' support by using generic filtering. 147 req = fakes.HTTPRequest.blank(url, use_admin_context=is_admin_user, 148 version=mv.RESOURCE_FILTER) 149 res_dict = self.controller.detail(req) 150 151 self.assertEqual(1, len(res_dict['snapshots'])) 152 self.assertEqual(snapshot1.id, res_dict['snapshots'][0]['id']) 153 154 def _create_multiple_snapshots_with_different_project(self): 155 volume1 = test_utils.create_volume(self.ctx, 156 project=fake.PROJECT_ID) 157 volume2 = test_utils.create_volume(self.ctx, 158 project=fake.PROJECT2_ID) 159 test_utils.create_snapshot( 160 context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True), 161 volume1.id) 162 test_utils.create_snapshot( 163 context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True), 164 volume1.id) 165 test_utils.create_snapshot( 166 context.RequestContext(fake.USER_ID, fake.PROJECT2_ID, True), 167 volume2.id) 168 169 @ddt.data('snapshots', 'snapshots/detail') 170 def test_list_snapshot_with_count_param_version_not_matched(self, action): 171 self._create_multiple_snapshots_with_different_project() 172 173 is_detail = True if 'detail' in action else False 174 req = fakes.HTTPRequest.blank("/v3/%s?with_count=True" % action) 175 req.headers = mv.get_mv_header( 176 mv.get_prior_version(mv.SUPPORT_COUNT_INFO)) 177 req.api_version_request = mv.get_api_version( 178 mv.get_prior_version(mv.SUPPORT_COUNT_INFO)) 179 ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) 180 req.environ['cinder.context'] = ctxt 181 res_dict = self.controller._items(req, is_detail=is_detail) 182 self.assertNotIn('count', res_dict) 183 184 @ddt.data({'method': 'snapshots', 185 'display_param': 'True'}, 186 {'method': 'snapshots', 187 'display_param': 'False'}, 188 {'method': 'snapshots', 189 'display_param': '1'}, 190 {'method': 'snapshots/detail', 191 'display_param': 'True'}, 192 {'method': 'snapshots/detail', 193 'display_param': 'False'}, 194 {'method': 'snapshots/detail', 195 'display_param': '1'} 196 ) 197 @ddt.unpack 198 def test_list_snapshot_with_count_param(self, method, display_param): 199 self._create_multiple_snapshots_with_different_project() 200 201 is_detail = True if 'detail' in method else False 202 show_count = strutils.bool_from_string(display_param, strict=True) 203 # Request with 'with_count' and 'limit' 204 req = fakes.HTTPRequest.blank( 205 "/v3/%s?with_count=%s&limit=1" % (method, display_param)) 206 req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO) 207 req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO) 208 ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False) 209 req.environ['cinder.context'] = ctxt 210 res_dict = self.controller._items(req, is_detail=is_detail) 211 self.assertEqual(1, len(res_dict['snapshots'])) 212 if show_count: 213 self.assertEqual(2, res_dict['count']) 214 else: 215 self.assertNotIn('count', res_dict) 216 217 # Request with 'with_count' 218 req = fakes.HTTPRequest.blank( 219 "/v3/%s?with_count=%s" % (method, display_param)) 220 req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO) 221 req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO) 222 ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False) 223 req.environ['cinder.context'] = ctxt 224 res_dict = self.controller._items(req, is_detail=is_detail) 225 self.assertEqual(2, len(res_dict['snapshots'])) 226 if show_count: 227 self.assertEqual(2, res_dict['count']) 228 else: 229 self.assertNotIn('count', res_dict) 230 231 # Request with admin context and 'all_tenants' 232 req = fakes.HTTPRequest.blank( 233 "/v3/%s?with_count=%s&all_tenants=1" % (method, display_param)) 234 req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO) 235 req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO) 236 ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) 237 req.environ['cinder.context'] = ctxt 238 res_dict = self.controller._items(req, is_detail=is_detail) 239 self.assertEqual(3, len(res_dict['snapshots'])) 240 if show_count: 241 self.assertEqual(3, res_dict['count']) 242 else: 243 self.assertNotIn('count', res_dict) 244 245 @mock.patch('cinder.objects.volume.Volume.refresh') 246 def test_snapshot_list_with_sort_name(self, mock_refresh): 247 self._create_snapshot(name='test1') 248 self._create_snapshot(name='test2') 249 250 req = fakes.HTTPRequest.blank( 251 '/v3/snapshots?sort_key=name', 252 version=mv.get_prior_version(mv.SNAPSHOT_SORT)) 253 self.assertRaises(exception.InvalidInput, self.controller.detail, req) 254 255 req = fakes.HTTPRequest.blank('/v3/snapshots?sort_key=name', 256 version=mv.SNAPSHOT_SORT) 257 res_dict = self.controller.detail(req) 258 self.assertEqual(2, len(res_dict['snapshots'])) 259 self.assertEqual('test2', res_dict['snapshots'][0]['name']) 260 self.assertEqual('test1', res_dict['snapshots'][1]['name']) 261 262 @mock.patch('cinder.objects.volume.Volume.refresh') 263 def test_snapshot_list_with_one_metadata_in_filter(self, mock_refresh): 264 # Create snapshot with metadata key1: value1 265 metadata = {"key1": "val1"} 266 self._create_snapshot(metadata=metadata) 267 268 # Create request with metadata filter key1: value1 269 req = create_snapshot_query_with_metadata( 270 '{"key1":"val1"}', mv.SNAPSHOT_LIST_METADATA_FILTER) 271 272 # query controller with above request 273 res_dict = self.controller.detail(req) 274 275 # verify 1 snapshot is returned 276 self.assertEqual(1, len(res_dict['snapshots'])) 277 278 # verify if the medadata of the returned snapshot is key1: value1 279 self.assertDictEqual({"key1": "val1"}, res_dict['snapshots'][0][ 280 'metadata']) 281 282 # Create request with metadata filter key2: value2 283 req = create_snapshot_query_with_metadata( 284 '{"key2":"val2"}', mv.SNAPSHOT_LIST_METADATA_FILTER) 285 286 # query controller with above request 287 res_dict = self.controller.detail(req) 288 289 # verify no snapshot is returned 290 self.assertEqual(0, len(res_dict['snapshots'])) 291 292 @mock.patch('cinder.objects.volume.Volume.refresh') 293 def test_snapshot_list_with_multiple_metadata_in_filter(self, 294 mock_refresh): 295 # Create snapshot with metadata key1: value1, key11: value11 296 metadata = {"key1": "val1", "key11": "val11"} 297 self._create_snapshot(metadata=metadata) 298 299 # Create request with metadata filter key1: value1, key11: value11 300 req = create_snapshot_query_with_metadata( 301 '{"key1":"val1", "key11":"val11"}', 302 mv.SNAPSHOT_LIST_METADATA_FILTER) 303 304 # query controller with above request 305 res_dict = self.controller.detail(req) 306 307 # verify 1 snapshot is returned 308 self.assertEqual(1, len(res_dict['snapshots'])) 309 310 # verify if the medadata of the returned snapshot is key1: value1 311 self.assertDictEqual({"key1": "val1", "key11": "val11"}, res_dict[ 312 'snapshots'][0]['metadata']) 313 314 # Create request with metadata filter key1: value1 315 req = create_snapshot_query_with_metadata( 316 '{"key1":"val1"}', mv.SNAPSHOT_LIST_METADATA_FILTER) 317 318 # query controller with above request 319 res_dict = self.controller.detail(req) 320 321 # verify 1 snapshot is returned 322 self.assertEqual(1, len(res_dict['snapshots'])) 323 324 # verify if the medadata of the returned snapshot is key1: value1 325 self.assertDictEqual({"key1": "val1", "key11": "val11"}, res_dict[ 326 'snapshots'][0]['metadata']) 327 328 # Create request with metadata filter key2: value2 329 req = create_snapshot_query_with_metadata( 330 '{"key2":"val2"}', mv.SNAPSHOT_LIST_METADATA_FILTER) 331 332 # query controller with above request 333 res_dict = self.controller.detail(req) 334 335 # verify no snapshot is returned 336 self.assertEqual(0, len(res_dict['snapshots'])) 337 338 @ddt.data(mv.get_prior_version(mv.RESOURCE_FILTER), 339 mv.RESOURCE_FILTER, 340 mv.LIKE_FILTER) 341 @mock.patch('cinder.api.common.reject_invalid_filters') 342 def test_snapshot_list_with_general_filter(self, version, mock_update): 343 url = '/v3/%s/snapshots' % fake.PROJECT_ID 344 req = fakes.HTTPRequest.blank(url, 345 version=version, 346 use_admin_context=False) 347 self.controller.index(req) 348 349 if version != mv.get_prior_version(mv.RESOURCE_FILTER): 350 support_like = True if version == mv.LIKE_FILTER else False 351 mock_update.assert_called_once_with(req.environ['cinder.context'], 352 mock.ANY, 'snapshot', 353 support_like) 354 355 @mock.patch('cinder.objects.volume.Volume.refresh') 356 def test_snapshot_list_with_metadata_unsupported_microversion( 357 self, mock_refresh): 358 # Create snapshot with metadata key1: value1 359 metadata = {"key1": "val1"} 360 self._create_snapshot(metadata=metadata) 361 362 # Create request with metadata filter key2: value2 363 req = create_snapshot_query_with_metadata( 364 '{"key2":"val2"}', 365 mv.get_prior_version(mv.SNAPSHOT_LIST_METADATA_FILTER)) 366 367 # query controller with above request 368 res_dict = self.controller.detail(req) 369 370 # verify some snapshot is returned 371 self.assertNotEqual(0, len(res_dict['snapshots'])) 372