1# Copyright 2016 EMC Corporation
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16import ddt
17import mock
18from oslo_utils import strutils
19
20from cinder.api import microversions as mv
21from cinder.api.v3 import snapshots
22from cinder import context
23from cinder import exception
24from cinder.objects import fields
25from cinder.scheduler import rpcapi as scheduler_rpcapi
26from cinder import test
27from cinder.tests.unit.api import fakes
28from cinder.tests.unit import fake_constants as fake
29from cinder.tests.unit import fake_snapshot
30from cinder.tests.unit import fake_volume
31from cinder.tests.unit import utils as test_utils
32from cinder import volume
33
34UUID = '00000000-0000-0000-0000-000000000001'
35INVALID_UUID = '00000000-0000-0000-0000-000000000002'
36
37
38def fake_get(self, context, *args, **kwargs):
39    vol = {'id': fake.VOLUME_ID,
40           'size': 100,
41           'name': 'fake',
42           'host': 'fake-host',
43           'status': 'available',
44           'encryption_key_id': None,
45           'volume_type_id': None,
46           'migration_status': None,
47           'availability_zone': 'fake-zone',
48           'attach_status': 'detached',
49           'metadata': {}}
50    return fake_volume.fake_volume_obj(context, **vol)
51
52
53def create_snapshot_query_with_metadata(metadata_query_string,
54                                        api_microversion):
55    """Helper to create metadata querystring with microversion"""
56    req = fakes.HTTPRequest.blank('/v3/snapshots?metadata=' +
57                                  metadata_query_string)
58    req.headers = mv.get_mv_header(api_microversion)
59    req.api_version_request = mv.get_api_version(api_microversion)
60
61    return req
62
63
64@ddt.ddt
65class SnapshotApiTest(test.TestCase):
66    def setUp(self):
67        super(SnapshotApiTest, self).setUp()
68        self.mock_object(volume.api.API, 'get', fake_get)
69        self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
70        self.controller = snapshots.SnapshotsController()
71        self.ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
72
73    @ddt.data(mv.GROUP_SNAPSHOTS,
74              mv.get_prior_version(mv.GROUP_SNAPSHOTS),
75              mv.SNAPSHOT_LIST_USER_ID)
76    @mock.patch('cinder.db.snapshot_metadata_get', return_value=dict())
77    @mock.patch('cinder.objects.Volume.get_by_id')
78    @mock.patch('cinder.objects.Snapshot.get_by_id')
79    def test_snapshot_show(self, max_ver, snapshot_get_by_id, volume_get_by_id,
80                           snapshot_metadata_get):
81        snapshot = {
82            'id': UUID,
83            'volume_id': fake.VOLUME_ID,
84            'status': fields.SnapshotStatus.AVAILABLE,
85            'volume_size': 100,
86            'display_name': 'Default name',
87            'display_description': 'Default description',
88            'expected_attrs': ['metadata'],
89            'group_snapshot_id': None,
90        }
91        ctx = context.RequestContext(fake.PROJECT_ID, fake.USER_ID, True)
92        snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
93        fake_volume_obj = fake_volume.fake_volume_obj(ctx)
94        snapshot_get_by_id.return_value = snapshot_obj
95        volume_get_by_id.return_value = fake_volume_obj
96        req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % UUID)
97        req.api_version_request = mv.get_api_version(max_ver)
98        resp_dict = self.controller.show(req, UUID)
99
100        self.assertIn('snapshot', resp_dict)
101        self.assertEqual(UUID, resp_dict['snapshot']['id'])
102        self.assertIn('updated_at', resp_dict['snapshot'])
103        if max_ver == mv.SNAPSHOT_LIST_USER_ID:
104            self.assertIn('user_id', resp_dict['snapshot'])
105        elif max_ver == mv.GROUP_SNAPSHOTS:
106            self.assertIn('group_snapshot_id', resp_dict['snapshot'])
107            self.assertNotIn('user_id', resp_dict['snapshot'])
108        else:
109            self.assertNotIn('group_snapshot_id', resp_dict['snapshot'])
110            self.assertNotIn('user_id', resp_dict['snapshot'])
111
112    def test_snapshot_show_invalid_id(self):
113        snapshot_id = INVALID_UUID
114        req = fakes.HTTPRequest.blank('/v3/snapshots/%s' % snapshot_id)
115        self.assertRaises(exception.SnapshotNotFound,
116                          self.controller.show, req, snapshot_id)
117
118    def _create_snapshot(self, name=None, metadata=None):
119        """Creates test snapshopt with provided metadata"""
120        req = fakes.HTTPRequest.blank('/v3/snapshots')
121        snap = {"volume_id": fake.VOLUME_ID,
122                "display_name": name or "Volume Test Name",
123                "description": "Volume Test Desc"
124                }
125        if metadata:
126            snap["metadata"] = metadata
127        body = {"snapshot": snap}
128        self.controller.create(req, body=body)
129
130    @ddt.data(('host', 'test_host1', True), ('cluster_name', 'cluster1', True),
131              ('availability_zone', 'nova1', False))
132    @ddt.unpack
133    def test_snapshot_list_with_filter(self, filter_name, filter_value,
134                                       is_admin_user):
135        volume1 = test_utils.create_volume(self.ctx, host='test_host1',
136                                           cluster_name='cluster1',
137                                           availability_zone='nova1')
138        volume2 = test_utils.create_volume(self.ctx, host='test_host2',
139                                           cluster_name='cluster2',
140                                           availability_zone='nova2')
141        snapshot1 = test_utils.create_snapshot(self.ctx, volume1.id)
142        test_utils.create_snapshot(self.ctx, volume2.id)
143
144        url = '/v3/snapshots?%s=%s' % (filter_name, filter_value)
145        # Generic filtering is introduced since '3,31' and we add
146        # 'availability_zone' support by using generic filtering.
147        req = fakes.HTTPRequest.blank(url, use_admin_context=is_admin_user,
148                                      version=mv.RESOURCE_FILTER)
149        res_dict = self.controller.detail(req)
150
151        self.assertEqual(1, len(res_dict['snapshots']))
152        self.assertEqual(snapshot1.id, res_dict['snapshots'][0]['id'])
153
154    def _create_multiple_snapshots_with_different_project(self):
155        volume1 = test_utils.create_volume(self.ctx,
156                                           project=fake.PROJECT_ID)
157        volume2 = test_utils.create_volume(self.ctx,
158                                           project=fake.PROJECT2_ID)
159        test_utils.create_snapshot(
160            context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True),
161            volume1.id)
162        test_utils.create_snapshot(
163            context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True),
164            volume1.id)
165        test_utils.create_snapshot(
166            context.RequestContext(fake.USER_ID, fake.PROJECT2_ID, True),
167            volume2.id)
168
169    @ddt.data('snapshots', 'snapshots/detail')
170    def test_list_snapshot_with_count_param_version_not_matched(self, action):
171        self._create_multiple_snapshots_with_different_project()
172
173        is_detail = True if 'detail' in action else False
174        req = fakes.HTTPRequest.blank("/v3/%s?with_count=True" % action)
175        req.headers = mv.get_mv_header(
176            mv.get_prior_version(mv.SUPPORT_COUNT_INFO))
177        req.api_version_request = mv.get_api_version(
178            mv.get_prior_version(mv.SUPPORT_COUNT_INFO))
179        ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
180        req.environ['cinder.context'] = ctxt
181        res_dict = self.controller._items(req, is_detail=is_detail)
182        self.assertNotIn('count', res_dict)
183
184    @ddt.data({'method': 'snapshots',
185               'display_param': 'True'},
186              {'method': 'snapshots',
187               'display_param': 'False'},
188              {'method': 'snapshots',
189               'display_param': '1'},
190              {'method': 'snapshots/detail',
191               'display_param': 'True'},
192              {'method': 'snapshots/detail',
193               'display_param': 'False'},
194              {'method': 'snapshots/detail',
195               'display_param': '1'}
196              )
197    @ddt.unpack
198    def test_list_snapshot_with_count_param(self, method, display_param):
199        self._create_multiple_snapshots_with_different_project()
200
201        is_detail = True if 'detail' in method else False
202        show_count = strutils.bool_from_string(display_param, strict=True)
203        # Request with 'with_count' and 'limit'
204        req = fakes.HTTPRequest.blank(
205            "/v3/%s?with_count=%s&limit=1" % (method, display_param))
206        req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO)
207        req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO)
208        ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False)
209        req.environ['cinder.context'] = ctxt
210        res_dict = self.controller._items(req, is_detail=is_detail)
211        self.assertEqual(1, len(res_dict['snapshots']))
212        if show_count:
213            self.assertEqual(2, res_dict['count'])
214        else:
215            self.assertNotIn('count', res_dict)
216
217        # Request with 'with_count'
218        req = fakes.HTTPRequest.blank(
219            "/v3/%s?with_count=%s" % (method, display_param))
220        req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO)
221        req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO)
222        ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False)
223        req.environ['cinder.context'] = ctxt
224        res_dict = self.controller._items(req, is_detail=is_detail)
225        self.assertEqual(2, len(res_dict['snapshots']))
226        if show_count:
227            self.assertEqual(2, res_dict['count'])
228        else:
229            self.assertNotIn('count', res_dict)
230
231        # Request with admin context and 'all_tenants'
232        req = fakes.HTTPRequest.blank(
233            "/v3/%s?with_count=%s&all_tenants=1" % (method, display_param))
234        req.headers = mv.get_mv_header(mv.SUPPORT_COUNT_INFO)
235        req.api_version_request = mv.get_api_version(mv.SUPPORT_COUNT_INFO)
236        ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
237        req.environ['cinder.context'] = ctxt
238        res_dict = self.controller._items(req, is_detail=is_detail)
239        self.assertEqual(3, len(res_dict['snapshots']))
240        if show_count:
241            self.assertEqual(3, res_dict['count'])
242        else:
243            self.assertNotIn('count', res_dict)
244
245    @mock.patch('cinder.objects.volume.Volume.refresh')
246    def test_snapshot_list_with_sort_name(self, mock_refresh):
247        self._create_snapshot(name='test1')
248        self._create_snapshot(name='test2')
249
250        req = fakes.HTTPRequest.blank(
251            '/v3/snapshots?sort_key=name',
252            version=mv.get_prior_version(mv.SNAPSHOT_SORT))
253        self.assertRaises(exception.InvalidInput, self.controller.detail, req)
254
255        req = fakes.HTTPRequest.blank('/v3/snapshots?sort_key=name',
256                                      version=mv.SNAPSHOT_SORT)
257        res_dict = self.controller.detail(req)
258        self.assertEqual(2, len(res_dict['snapshots']))
259        self.assertEqual('test2', res_dict['snapshots'][0]['name'])
260        self.assertEqual('test1', res_dict['snapshots'][1]['name'])
261
262    @mock.patch('cinder.objects.volume.Volume.refresh')
263    def test_snapshot_list_with_one_metadata_in_filter(self, mock_refresh):
264        # Create snapshot with metadata key1: value1
265        metadata = {"key1": "val1"}
266        self._create_snapshot(metadata=metadata)
267
268        # Create request with metadata filter key1: value1
269        req = create_snapshot_query_with_metadata(
270            '{"key1":"val1"}', mv.SNAPSHOT_LIST_METADATA_FILTER)
271
272        # query controller with above request
273        res_dict = self.controller.detail(req)
274
275        # verify 1 snapshot is returned
276        self.assertEqual(1, len(res_dict['snapshots']))
277
278        # verify if the medadata of the returned snapshot is key1: value1
279        self.assertDictEqual({"key1": "val1"}, res_dict['snapshots'][0][
280            'metadata'])
281
282        # Create request with metadata filter key2: value2
283        req = create_snapshot_query_with_metadata(
284            '{"key2":"val2"}', mv.SNAPSHOT_LIST_METADATA_FILTER)
285
286        # query controller with above request
287        res_dict = self.controller.detail(req)
288
289        # verify no snapshot is returned
290        self.assertEqual(0, len(res_dict['snapshots']))
291
292    @mock.patch('cinder.objects.volume.Volume.refresh')
293    def test_snapshot_list_with_multiple_metadata_in_filter(self,
294                                                            mock_refresh):
295        # Create snapshot with metadata key1: value1, key11: value11
296        metadata = {"key1": "val1", "key11": "val11"}
297        self._create_snapshot(metadata=metadata)
298
299        # Create request with metadata filter key1: value1, key11: value11
300        req = create_snapshot_query_with_metadata(
301            '{"key1":"val1", "key11":"val11"}',
302            mv.SNAPSHOT_LIST_METADATA_FILTER)
303
304        # query controller with above request
305        res_dict = self.controller.detail(req)
306
307        # verify 1 snapshot is returned
308        self.assertEqual(1, len(res_dict['snapshots']))
309
310        # verify if the medadata of the returned snapshot is key1: value1
311        self.assertDictEqual({"key1": "val1", "key11": "val11"}, res_dict[
312            'snapshots'][0]['metadata'])
313
314        # Create request with metadata filter key1: value1
315        req = create_snapshot_query_with_metadata(
316            '{"key1":"val1"}', mv.SNAPSHOT_LIST_METADATA_FILTER)
317
318        # query controller with above request
319        res_dict = self.controller.detail(req)
320
321        # verify 1 snapshot is returned
322        self.assertEqual(1, len(res_dict['snapshots']))
323
324        # verify if the medadata of the returned snapshot is key1: value1
325        self.assertDictEqual({"key1": "val1", "key11": "val11"}, res_dict[
326            'snapshots'][0]['metadata'])
327
328        # Create request with metadata filter key2: value2
329        req = create_snapshot_query_with_metadata(
330            '{"key2":"val2"}', mv.SNAPSHOT_LIST_METADATA_FILTER)
331
332        # query controller with above request
333        res_dict = self.controller.detail(req)
334
335        # verify no snapshot is returned
336        self.assertEqual(0, len(res_dict['snapshots']))
337
338    @ddt.data(mv.get_prior_version(mv.RESOURCE_FILTER),
339              mv.RESOURCE_FILTER,
340              mv.LIKE_FILTER)
341    @mock.patch('cinder.api.common.reject_invalid_filters')
342    def test_snapshot_list_with_general_filter(self, version, mock_update):
343        url = '/v3/%s/snapshots' % fake.PROJECT_ID
344        req = fakes.HTTPRequest.blank(url,
345                                      version=version,
346                                      use_admin_context=False)
347        self.controller.index(req)
348
349        if version != mv.get_prior_version(mv.RESOURCE_FILTER):
350            support_like = True if version == mv.LIKE_FILTER else False
351            mock_update.assert_called_once_with(req.environ['cinder.context'],
352                                                mock.ANY, 'snapshot',
353                                                support_like)
354
355    @mock.patch('cinder.objects.volume.Volume.refresh')
356    def test_snapshot_list_with_metadata_unsupported_microversion(
357            self, mock_refresh):
358        # Create snapshot with metadata key1: value1
359        metadata = {"key1": "val1"}
360        self._create_snapshot(metadata=metadata)
361
362        # Create request with metadata filter key2: value2
363        req = create_snapshot_query_with_metadata(
364            '{"key2":"val2"}',
365            mv.get_prior_version(mv.SNAPSHOT_LIST_METADATA_FILTER))
366
367        # query controller with above request
368        res_dict = self.controller.detail(req)
369
370        # verify some snapshot is returned
371        self.assertNotEqual(0, len(res_dict['snapshots']))
372