1#   Copyright 2012 OpenStack Foundation
2#
3#   Licensed under the Apache License, Version 2.0 (the "License"); you may
4#   not use this file except in compliance with the License. You may obtain
5#   a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#   License for the specific language governing permissions and limitations
13#   under the License.
14
15import mock
16import uuid
17
18from oslo_policy import policy as oslo_policy
19from oslo_serialization import jsonutils
20from oslo_utils import timeutils
21from six.moves import http_client
22import webob
23
24from cinder.api.contrib import volume_image_metadata
25from cinder import context
26from cinder import db
27from cinder import exception
28from cinder import objects
29from cinder.objects import fields
30from cinder.policies import base as base_policy
31from cinder.policies import volume_metadata as metadata_policy
32from cinder import policy
33from cinder import test
34from cinder.tests.unit.api import fakes
35from cinder.tests.unit import fake_constants as fake
36from cinder.tests.unit import fake_volume
37from cinder import volume
38
39
40def fake_db_volume_get(*args, **kwargs):
41    return {
42        'id': kwargs.get('volume_id') or fake.VOLUME_ID,
43        'host': 'host001',
44        'status': 'available',
45        'size': 5,
46        'availability_zone': 'somewhere',
47        'created_at': timeutils.utcnow(),
48        'display_name': 'anothervolume',
49        'display_description': 'Just another volume!',
50        'volume_type_id': None,
51        'snapshot_id': None,
52        'project_id': fake.PROJECT_ID,
53        'migration_status': None,
54        '_name_id': fake.VOLUME2_ID,
55        'attach_status': fields.VolumeAttachStatus.DETACHED,
56    }
57
58
59def fake_volume_api_get(*args, **kwargs):
60    ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
61    db_volume = fake_db_volume_get(volume_id=kwargs.get('volume_id'))
62    return fake_volume.fake_volume_obj(ctx, **db_volume)
63
64
65def fake_volume_get_all(*args, **kwargs):
66    return objects.VolumeList(objects=[fake_volume_api_get(),
67                                       fake_volume_api_get(
68                                           volume_id=fake.VOLUME2_ID)])
69
70
71def fake_volume_get_all_empty(*args, **kwargs):
72    return objects.VolumeList(objects=[])
73
74
75fake_image_metadata = {
76    'image_id': fake.IMAGE_ID,
77    'image_name': 'fake',
78    'kernel_id': 'somekernel',
79    'ramdisk_id': 'someramdisk',
80}
81
82
83def fake_get_volume_image_metadata(*args, **kwargs):
84    return fake_image_metadata
85
86
87def fake_get_volumes_image_metadata(*args, **kwargs):
88    return {'fake': fake_image_metadata}
89
90
91def return_empty_image_metadata(*args, **kwargs):
92    return {}
93
94
95def volume_metadata_delete(context, volume_id, key, meta_type):
96    pass
97
98
99def fake_create_volume_metadata(context, volume_id, metadata,
100                                delete, meta_type):
101    return fake_get_volume_image_metadata()
102
103
104def return_volume_nonexistent(*args, **kwargs):
105    raise exception.VolumeNotFound('bogus test message')
106
107
108class VolumeImageMetadataTest(test.TestCase):
109    content_type = 'application/json'
110
111    def setUp(self):
112        super(VolumeImageMetadataTest, self).setUp()
113        self.mock_object(volume.api.API, 'get', fake_volume_api_get)
114        self.mock_object(volume.api.API, 'get_all', fake_volume_get_all)
115        self.mock_object(volume.api.API, 'get_volume_image_metadata',
116                         fake_get_volume_image_metadata)
117        self.mock_object(volume.api.API, 'get_volumes_image_metadata',
118                         fake_get_volumes_image_metadata)
119        self.UUID = uuid.uuid4()
120        self.controller = (volume_image_metadata.
121                           VolumeImageMetadataController())
122        self.user_ctxt = context.RequestContext(
123            fake.USER_ID, fake.PROJECT_ID, auth_token=True)
124
125    def _make_request(self, url):
126        req = webob.Request.blank(url)
127        req.accept = self.content_type
128        res = req.get_response(fakes.wsgi_app(
129            fake_auth_context=self.user_ctxt))
130        return res
131
132    def _get_image_metadata(self, body):
133        return jsonutils.loads(body)['volume']['volume_image_metadata']
134
135    def _get_image_metadata_list(self, body):
136        return [
137            volume['volume_image_metadata']
138            for volume in jsonutils.loads(body)['volumes']
139            if volume.get('volume_image_metadata')
140        ]
141
142    def _create_volume_and_glance_metadata(self):
143        ctxt = context.get_admin_context()
144        # create a bootable volume
145        db.volume_create(ctxt, {'id': fake.VOLUME_ID, 'status': 'available',
146                                'host': 'test', 'provider_location': '',
147                                'size': 1})
148        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID,
149                                         'image_id', fake.IMAGE_ID)
150        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID,
151                                         'image_name', 'fake')
152        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID, 'kernel_id',
153                                         'somekernel')
154        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID, 'ramdisk_id',
155                                         'someramdisk')
156
157        # create an unbootable volume
158        db.volume_create(ctxt, {'id': fake.VOLUME2_ID, 'status': 'available',
159                                'host': 'test', 'provider_location': '',
160                                'size': 1})
161
162    def test_get_volume(self):
163        self._create_volume_and_glance_metadata()
164        res = self._make_request('/v2/%s/volumes/%s' % (
165            fake.PROJECT_ID, self.UUID))
166        self.assertEqual(http_client.OK, res.status_int)
167        self.assertEqual(fake_image_metadata,
168                         self._get_image_metadata(res.body))
169
170    def test_list_detail_volumes(self):
171        self._create_volume_and_glance_metadata()
172        res = self._make_request('/v2/%s/volumes/detail' % fake.PROJECT_ID)
173        self.assertEqual(http_client.OK, res.status_int)
174        self.assertEqual(fake_image_metadata,
175                         self._get_image_metadata_list(res.body)[0])
176
177    def test_list_detail_empty_volumes(self):
178        def fake_dont_call_this(*args, **kwargs):
179            fake_dont_call_this.called = True
180        fake_dont_call_this.called = False
181        self.mock_object(volume.api.API, 'get_list_volumes_image_metadata',
182                         fake_dont_call_this)
183        self.mock_object(volume.api.API, 'get_all',
184                         fake_volume_get_all_empty)
185
186        res = self._make_request('/v2/%s/volumes/detail' % fake.PROJECT_ID)
187        self.assertEqual(http_client.OK, res.status_int)
188        self.assertFalse(fake_dont_call_this.called)
189
190    def test_list_detail_volumes_with_limit(self):
191        ctxt = context.get_admin_context()
192        db.volume_create(ctxt, {'id': fake.VOLUME_ID, 'status': 'available',
193                                'host': 'test', 'provider_location': '',
194                                'size': 1})
195        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID,
196                                         'key1', 'value1')
197        db.volume_glance_metadata_create(ctxt, fake.VOLUME_ID,
198                                         'key2', 'value2')
199        res = self._make_request('/v2/%s/volumes/detail?limit=1'
200                                 % fake.PROJECT_ID)
201        self.assertEqual(http_client.OK, res.status_int)
202        self.assertEqual({'key1': 'value1', 'key2': 'value2'},
203                         self._get_image_metadata_list(res.body)[0])
204
205    @mock.patch('cinder.objects.Volume.get_by_id')
206    def test_create_image_metadata(self, fake_get):
207        self.mock_object(volume.api.API, 'get_volume_image_metadata',
208                         return_empty_image_metadata)
209        self.mock_object(db, 'volume_metadata_update',
210                         fake_create_volume_metadata)
211
212        body = {"os-set_image_metadata": {"metadata": fake_image_metadata}}
213        req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
214            fake.PROJECT_ID, fake.VOLUME_ID))
215        req.method = "POST"
216        req.body = jsonutils.dump_as_bytes(body)
217        req.headers["content-type"] = "application/json"
218        fake_get.return_value = {}
219
220        res = req.get_response(fakes.wsgi_app(
221            fake_auth_context=self.user_ctxt))
222        self.assertEqual(http_client.OK, res.status_int)
223        self.assertEqual(fake_image_metadata,
224                         jsonutils.loads(res.body)["metadata"])
225
226    @mock.patch('cinder.objects.Volume.get_by_id')
227    def test_create_image_metadata_policy_not_authorized(self, fake_get):
228        rules = {
229            metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
230        }
231        policy.set_rules(oslo_policy.Rules.from_dict(rules))
232        self.addCleanup(policy.reset)
233        fake_get.return_value = {}
234
235        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
236            fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
237
238        req.method = 'POST'
239        req.content_type = "application/json"
240        body = {"os-set_image_metadata": {
241            "metadata": {"image_name": "fake"}}
242        }
243        req.body = jsonutils.dump_as_bytes(body)
244
245        self.assertRaises(exception.PolicyNotAuthorized,
246                          self.controller.create, req, fake.VOLUME_ID, None)
247
248    @mock.patch('cinder.objects.Volume.get_by_id')
249    def test_create_with_keys_case_insensitive(self, fake_get):
250        # If the keys in uppercase_and_lowercase, should return the one
251        # which server added
252        self.mock_object(volume.api.API, 'get_volume_image_metadata',
253                         return_empty_image_metadata)
254        self.mock_object(db, 'volume_metadata_update',
255                         fake_create_volume_metadata)
256        fake_get.return_value = {}
257
258        body = {
259            "os-set_image_metadata": {
260                "metadata": {
261                    "Image_Id": "someid",
262                    "image_name": "fake",
263                    "Kernel_id": "somekernel",
264                    "ramdisk_id": "someramdisk"
265                },
266            },
267        }
268
269        req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
270            fake.PROJECT_ID, fake.VOLUME_ID))
271        req.method = 'POST'
272        req.body = jsonutils.dump_as_bytes(body)
273        req.headers["content-type"] = "application/json"
274
275        res = req.get_response(fakes.wsgi_app(
276            fake_auth_context=self.user_ctxt))
277        self.assertEqual(http_client.OK, res.status_int)
278        self.assertEqual(fake_image_metadata,
279                         jsonutils.loads(res.body)["metadata"])
280
281    @mock.patch('cinder.objects.Volume.get_by_id')
282    def test_create_empty_body(self, fake_get):
283        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
284            fake.PROJECT_ID, fake.VOLUME_ID))
285        req.method = 'POST'
286        req.headers["content-type"] = "application/json"
287        fake_get.return_value = {}
288
289        self.assertRaises(webob.exc.HTTPBadRequest,
290                          self.controller.create, req, fake.VOLUME_ID, None)
291
292    def test_create_nonexistent_volume(self):
293        self.mock_object(volume.api.API, 'get', return_volume_nonexistent)
294
295        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
296            fake.PROJECT_ID, fake.VOLUME_ID))
297        req.method = 'POST'
298        req.content_type = "application/json"
299        body = {"os-set_image_metadata": {
300            "metadata": {"image_name": "fake"}}
301        }
302        req.body = jsonutils.dump_as_bytes(body)
303        self.assertRaises(exception.VolumeNotFound,
304                          self.controller.create, req, fake.VOLUME_ID, body)
305
306    @mock.patch('cinder.objects.Volume.get_by_id')
307    def test_invalid_metadata_items_on_create(self, fake_get):
308        self.mock_object(db, 'volume_metadata_update',
309                         fake_create_volume_metadata)
310        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
311            fake.PROJECT_ID, fake.VOLUME_ID))
312        req.method = 'POST'
313        req.headers["content-type"] = "application/json"
314
315        data = {"os-set_image_metadata": {
316            "metadata": {"a" * 260: "value1"}}
317        }
318        fake_get.return_value = {}
319
320        # Test for long key
321        req.body = jsonutils.dump_as_bytes(data)
322        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
323                          self.controller.create, req, fake.VOLUME_ID, data)
324
325        # Test for long value
326        data = {"os-set_image_metadata": {
327            "metadata": {"key": "v" * 260}}
328        }
329        req.body = jsonutils.dump_as_bytes(data)
330        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
331                          self.controller.create, req, fake.VOLUME_ID, data)
332
333        # Test for empty key.
334        data = {"os-set_image_metadata": {
335            "metadata": {"": "value1"}}
336        }
337        req.body = jsonutils.dump_as_bytes(data)
338        self.assertRaises(webob.exc.HTTPBadRequest,
339                          self.controller.create, req, fake.VOLUME_ID, data)
340
341    @mock.patch('cinder.objects.Volume.get_by_id')
342    def test_delete(self, fake_get):
343        self.mock_object(db, 'volume_metadata_delete',
344                         volume_metadata_delete)
345
346        body = {"os-unset_image_metadata": {
347            "key": "ramdisk_id"}
348        }
349        req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
350            fake.PROJECT_ID, fake.VOLUME_ID))
351        req.method = 'POST'
352        req.body = jsonutils.dump_as_bytes(body)
353        req.headers["content-type"] = "application/json"
354        fake_get.return_value = {}
355
356        res = req.get_response(fakes.wsgi_app(
357            fake_auth_context=self.user_ctxt))
358        self.assertEqual(http_client.OK, res.status_int)
359
360    @mock.patch('cinder.objects.Volume.get_by_id')
361    def test_delete_image_metadata_policy_not_authorized(self, fake_get):
362        rules = {
363            metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
364        }
365        policy.set_rules(oslo_policy.Rules.from_dict(rules))
366        self.addCleanup(policy.reset)
367        fake_get.return_value = {}
368
369        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
370            fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
371
372        req.method = 'POST'
373        req.content_type = "application/json"
374        body = {"os-unset_image_metadata": {
375            "metadata": {"image_name": "fake"}}
376        }
377        req.body = jsonutils.dump_as_bytes(body)
378
379        self.assertRaises(exception.PolicyNotAuthorized,
380                          self.controller.delete, req, fake.VOLUME_ID, None)
381
382    @mock.patch('cinder.objects.Volume.get_by_id')
383    def test_delete_meta_not_found(self, fake_get):
384        data = {"os-unset_image_metadata": {
385            "key": "invalid_id"}
386        }
387        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
388            fake.PROJECT_ID, fake.VOLUME_ID))
389        req.method = 'POST'
390        req.body = jsonutils.dump_as_bytes(data)
391        req.headers["content-type"] = "application/json"
392        fake_get.return_value = {}
393
394        self.assertRaises(exception.GlanceMetadataNotFound,
395                          self.controller.delete, req, fake.VOLUME_ID, data)
396
397    @mock.patch('cinder.objects.Volume.get_by_id')
398    def test_delete_nonexistent_volume(self, fake_get):
399        self.mock_object(db, 'volume_metadata_delete',
400                         return_volume_nonexistent)
401
402        body = {"os-unset_image_metadata": {
403            "key": "fake"}
404        }
405        req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
406            fake.PROJECT_ID, fake.VOLUME_ID))
407        req.method = 'POST'
408        req.body = jsonutils.dump_as_bytes(body)
409        req.headers["content-type"] = "application/json"
410        fake_get.return_value = {}
411
412        self.assertRaises(exception.GlanceMetadataNotFound,
413                          self.controller.delete, req, fake.VOLUME_ID, body)
414
415    def test_show_image_metadata(self):
416        body = {"os-show_image_metadata": None}
417        req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
418            fake.PROJECT_ID, fake.VOLUME_ID))
419        req.method = 'POST'
420        req.body = jsonutils.dump_as_bytes(body)
421        req.headers["content-type"] = "application/json"
422
423        res = req.get_response(fakes.wsgi_app(
424            fake_auth_context=self.user_ctxt))
425        self.assertEqual(http_client.OK, res.status_int)
426        self.assertEqual(fake_image_metadata,
427                         jsonutils.loads(res.body)["metadata"])
428