1#    Copyright 2015 SimpliVity Corp.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15import copy
16
17import ddt
18import mock
19from oslo_utils import timeutils
20import pytz
21import six
22
23from cinder.db.sqlalchemy import models
24from cinder import exception
25from cinder import objects
26from cinder.objects import fields
27from cinder.tests.unit import fake_constants as fake
28from cinder.tests.unit import fake_snapshot
29from cinder.tests.unit import fake_volume
30from cinder.tests.unit import objects as test_objects
31
32
33fake_db_snapshot = fake_snapshot.fake_db_snapshot(
34    cgsnapshot_id=fake.CGSNAPSHOT_ID)
35del fake_db_snapshot['metadata']
36del fake_db_snapshot['volume']
37
38
39# NOTE(andrey-mp): make Snapshot object here to check object algorithms
40fake_snapshot_obj = {
41    'id': fake.SNAPSHOT_ID,
42    'volume_id': fake.VOLUME_ID,
43    'status': fields.SnapshotStatus.CREATING,
44    'progress': '0%',
45    'volume_size': 1,
46    'display_name': 'fake_name',
47    'display_description': 'fake_description',
48    'metadata': {},
49}
50
51
52@ddt.ddt
53class TestSnapshot(test_objects.BaseObjectsTestCase):
54
55    @mock.patch('cinder.db.get_by_id', return_value=fake_db_snapshot)
56    def test_get_by_id(self, snapshot_get):
57        snapshot = objects.Snapshot.get_by_id(self.context, 1)
58        self._compare(self, fake_snapshot_obj, snapshot)
59        snapshot_get.assert_called_once_with(self.context, models.Snapshot, 1)
60
61    @mock.patch('cinder.db.sqlalchemy.api.model_query')
62    def test_get_by_id_no_existing_id(self, model_query):
63        query = model_query().options().options().filter_by().first
64        query.return_value = None
65        self.assertRaises(exception.SnapshotNotFound,
66                          objects.Snapshot.get_by_id, self.context, 123)
67
68    def test_reset_changes(self):
69        snapshot = objects.Snapshot()
70        snapshot.metadata = {'key1': 'value1'}
71        self.assertEqual({}, snapshot._orig_metadata)
72        snapshot.obj_reset_changes(['metadata'])
73        self.assertEqual({'key1': 'value1'}, snapshot._orig_metadata)
74
75    @mock.patch('cinder.db.snapshot_create', return_value=fake_db_snapshot)
76    def test_create(self, snapshot_create):
77        snapshot = objects.Snapshot(context=self.context)
78        snapshot.create()
79        self.assertEqual(fake_snapshot_obj['id'], snapshot.id)
80        self.assertEqual(fake_snapshot_obj['volume_id'], snapshot.volume_id)
81
82    @mock.patch('cinder.db.snapshot_create')
83    def test_create_with_provider_id(self, snapshot_create):
84        snapshot_create.return_value = copy.deepcopy(fake_db_snapshot)
85        snapshot_create.return_value['provider_id'] = fake.PROVIDER_ID
86
87        snapshot = objects.Snapshot(context=self.context)
88        snapshot.create()
89        self.assertEqual(fake.PROVIDER_ID, snapshot.provider_id)
90
91    @mock.patch('cinder.db.snapshot_update')
92    def test_save(self, snapshot_update):
93        snapshot = objects.Snapshot._from_db_object(
94            self.context, objects.Snapshot(), fake_db_snapshot)
95        snapshot.display_name = 'foobar'
96        snapshot.save()
97        snapshot_update.assert_called_once_with(self.context, snapshot.id,
98                                                {'display_name': 'foobar'})
99
100    @mock.patch('cinder.db.snapshot_metadata_update',
101                return_value={'key1': 'value1'})
102    @mock.patch('cinder.db.snapshot_update')
103    def test_save_with_metadata(self, snapshot_update,
104                                snapshot_metadata_update):
105        snapshot = objects.Snapshot._from_db_object(
106            self.context, objects.Snapshot(), fake_db_snapshot)
107        snapshot.display_name = 'foobar'
108        snapshot.metadata = {'key1': 'value1'}
109        self.assertEqual({'display_name': 'foobar',
110                          'metadata': {'key1': 'value1'}},
111                         snapshot.obj_get_changes())
112        snapshot.save()
113        snapshot_update.assert_called_once_with(self.context, snapshot.id,
114                                                {'display_name': 'foobar'})
115        snapshot_metadata_update.assert_called_once_with(self.context,
116                                                         fake.SNAPSHOT_ID,
117                                                         {'key1': 'value1'},
118                                                         True)
119
120    @mock.patch('oslo_utils.timeutils.utcnow', return_value=timeutils.utcnow())
121    @mock.patch('cinder.db.sqlalchemy.api.snapshot_destroy')
122    def test_destroy(self, snapshot_destroy, utcnow_mock):
123        snapshot_destroy.return_value = {
124            'status': 'deleted',
125            'deleted': True,
126            'deleted_at': utcnow_mock.return_value}
127        snapshot = objects.Snapshot(context=self.context,
128                                    id=fake.SNAPSHOT_ID)
129        snapshot.destroy()
130        self.assertTrue(snapshot_destroy.called)
131        admin_context = snapshot_destroy.call_args[0][0]
132        self.assertTrue(admin_context.is_admin)
133        self.assertTrue(snapshot.deleted)
134        self.assertEqual(fields.SnapshotStatus.DELETED, snapshot.status)
135        self.assertEqual(utcnow_mock.return_value.replace(tzinfo=pytz.UTC),
136                         snapshot.deleted_at)
137
138    @mock.patch('cinder.db.snapshot_metadata_delete')
139    def test_delete_metadata_key(self, snapshot_metadata_delete):
140        snapshot = objects.Snapshot(self.context, id=fake.SNAPSHOT_ID)
141        snapshot.metadata = {'key1': 'value1', 'key2': 'value2'}
142        self.assertEqual({}, snapshot._orig_metadata)
143        snapshot.delete_metadata_key(self.context, 'key2')
144        self.assertEqual({'key1': 'value1'}, snapshot.metadata)
145        snapshot_metadata_delete.assert_called_once_with(self.context,
146                                                         fake.SNAPSHOT_ID,
147                                                         'key2')
148
149    def test_obj_fields(self):
150        volume = objects.Volume(context=self.context, id=fake.VOLUME_ID,
151                                _name_id=fake.VOLUME_NAME_ID)
152        snapshot = objects.Snapshot(context=self.context, id=fake.VOLUME_ID,
153                                    volume=volume)
154        self.assertEqual(['name', 'volume_name'], snapshot.obj_extra_fields)
155        self.assertEqual('snapshot-%s' % fake.VOLUME_ID, snapshot.name)
156        self.assertEqual('volume-%s' % fake.VOLUME_NAME_ID,
157                         snapshot.volume_name)
158
159    @mock.patch('cinder.objects.volume.Volume.get_by_id')
160    @mock.patch('cinder.objects.cgsnapshot.CGSnapshot.get_by_id')
161    def test_obj_load_attr(self, cgsnapshot_get_by_id, volume_get_by_id):
162        snapshot = objects.Snapshot._from_db_object(
163            self.context, objects.Snapshot(), fake_db_snapshot)
164        # Test volume lazy-loaded field
165        volume = objects.Volume(context=self.context, id=fake.VOLUME_ID)
166        volume_get_by_id.return_value = volume
167        self.assertEqual(volume, snapshot.volume)
168        volume_get_by_id.assert_called_once_with(self.context,
169                                                 snapshot.volume_id)
170        self.assertEqual(snapshot.metadata, {})
171        # Test cgsnapshot lazy-loaded field
172        cgsnapshot = objects.CGSnapshot(context=self.context,
173                                        id=fake.CGSNAPSHOT_ID)
174        cgsnapshot_get_by_id.return_value = cgsnapshot
175        self.assertEqual(cgsnapshot, snapshot.cgsnapshot)
176        cgsnapshot_get_by_id.assert_called_once_with(self.context,
177                                                     snapshot.cgsnapshot_id)
178
179    @mock.patch('cinder.objects.cgsnapshot.CGSnapshot.get_by_id')
180    def test_obj_load_attr_cgroup_not_exist(self, cgsnapshot_get_by_id):
181        fake_non_cg_db_snapshot = fake_snapshot.fake_db_snapshot(
182            cgsnapshot_id=None)
183        snapshot = objects.Snapshot._from_db_object(
184            self.context, objects.Snapshot(), fake_non_cg_db_snapshot)
185        self.assertIsNone(snapshot.cgsnapshot)
186        cgsnapshot_get_by_id.assert_not_called()
187
188    @mock.patch('cinder.objects.group_snapshot.GroupSnapshot.get_by_id')
189    def test_obj_load_attr_group_not_exist(self, group_snapshot_get_by_id):
190        fake_non_cg_db_snapshot = fake_snapshot.fake_db_snapshot(
191            group_snapshot_id=None)
192        snapshot = objects.Snapshot._from_db_object(
193            self.context, objects.Snapshot(), fake_non_cg_db_snapshot)
194        self.assertIsNone(snapshot.group_snapshot)
195        group_snapshot_get_by_id.assert_not_called()
196
197    @mock.patch('cinder.db.snapshot_data_get_for_project')
198    def test_snapshot_data_get_for_project(self, snapshot_data_get):
199        snapshot = objects.Snapshot._from_db_object(
200            self.context, objects.Snapshot(), fake_db_snapshot)
201        volume_type_id = mock.sentinel.volume_type_id
202        snapshot.snapshot_data_get_for_project(self.context,
203                                               self.project_id,
204                                               volume_type_id)
205        snapshot_data_get.assert_called_once_with(self.context,
206                                                  self.project_id,
207                                                  volume_type_id,
208                                                  host=None)
209
210    @mock.patch('cinder.db.sqlalchemy.api.snapshot_get')
211    def test_refresh(self, snapshot_get):
212        db_snapshot1 = fake_snapshot.fake_db_snapshot()
213        db_snapshot2 = db_snapshot1.copy()
214        db_snapshot2['display_name'] = 'foobar'
215
216        # On the second snapshot_get, return the snapshot with an updated
217        # display_name
218        snapshot_get.side_effect = [db_snapshot1, db_snapshot2]
219        snapshot = objects.Snapshot.get_by_id(self.context, fake.SNAPSHOT_ID)
220        self._compare(self, db_snapshot1, snapshot)
221
222        # display_name was updated, so a snapshot refresh should have a new
223        # value for that field
224        snapshot.refresh()
225        self._compare(self, db_snapshot2, snapshot)
226        if six.PY3:
227            call_bool = mock.call.__bool__()
228        else:
229            call_bool = mock.call.__nonzero__()
230        snapshot_get.assert_has_calls([
231            mock.call(self.context,
232                      fake.SNAPSHOT_ID),
233            call_bool,
234            mock.call(self.context,
235                      fake.SNAPSHOT_ID)])
236
237    @ddt.data('1.1', '1.3')
238    def test_obj_make_compatible_1_3(self, version):
239        snapshot = objects.Snapshot(context=self.context)
240        snapshot.status = fields.SnapshotStatus.UNMANAGING
241        primitive = snapshot.obj_to_primitive(version)
242        snapshot = objects.Snapshot.obj_from_primitive(primitive)
243        if version == '1.3':
244            status = fields.SnapshotStatus.UNMANAGING
245        else:
246            status = fields.SnapshotStatus.DELETING
247        self.assertEqual(status, snapshot.status)
248
249    @ddt.data('1.3', '1.4')
250    def test_obj_make_compatible_1_4(self, version):
251        snapshot = objects.Snapshot(context=self.context)
252        snapshot.status = fields.SnapshotStatus.BACKING_UP
253        primitive = snapshot.obj_to_primitive(version)
254        snapshot = objects.Snapshot.obj_from_primitive(primitive)
255        if version == '1.4':
256            status = fields.SnapshotStatus.BACKING_UP
257        else:
258            status = fields.SnapshotStatus.AVAILABLE
259        self.assertEqual(status, snapshot.status)
260
261
262class TestSnapshotList(test_objects.BaseObjectsTestCase):
263    @mock.patch('cinder.objects.volume.Volume.get_by_id')
264    @mock.patch('cinder.db.snapshot_get_all', return_value=[fake_db_snapshot])
265    def test_get_all(self, snapshot_get_all, volume_get_by_id):
266        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
267        volume_get_by_id.return_value = fake_volume_obj
268
269        search_opts = mock.sentinel.search_opts
270        snapshots = objects.SnapshotList.get_all(
271            self.context, search_opts)
272        self.assertEqual(1, len(snapshots))
273        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
274        snapshot_get_all.assert_called_once_with(self.context, search_opts,
275                                                 None, None, None, None, None)
276
277    @mock.patch('cinder.objects.Volume.get_by_id')
278    @mock.patch('cinder.db.snapshot_get_all_by_host',
279                return_value=[fake_db_snapshot])
280    def test_get_by_host(self, get_by_host, volume_get_by_id):
281        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
282        volume_get_by_id.return_value = fake_volume_obj
283
284        snapshots = objects.SnapshotList.get_by_host(
285            self.context, 'fake-host')
286        self.assertEqual(1, len(snapshots))
287        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
288
289    @mock.patch('cinder.objects.volume.Volume.get_by_id')
290    @mock.patch('cinder.db.snapshot_get_all_by_project',
291                return_value=[fake_db_snapshot])
292    def test_get_all_by_project(self, get_all_by_project, volume_get_by_id):
293        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
294        volume_get_by_id.return_value = fake_volume_obj
295
296        search_opts = mock.sentinel.search_opts
297        snapshots = objects.SnapshotList.get_all_by_project(
298            self.context, self.project_id, search_opts)
299        self.assertEqual(1, len(snapshots))
300        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
301        get_all_by_project.assert_called_once_with(self.context,
302                                                   self.project_id,
303                                                   search_opts, None, None,
304                                                   None, None, None)
305
306    @mock.patch('cinder.objects.volume.Volume.get_by_id')
307    @mock.patch('cinder.db.snapshot_get_all_for_volume',
308                return_value=[fake_db_snapshot])
309    def test_get_all_for_volume(self, get_all_for_volume, volume_get_by_id):
310        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
311        volume_get_by_id.return_value = fake_volume_obj
312
313        snapshots = objects.SnapshotList.get_all_for_volume(
314            self.context, fake_volume_obj.id)
315        self.assertEqual(1, len(snapshots))
316        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
317
318    @mock.patch('cinder.objects.volume.Volume.get_by_id')
319    @mock.patch('cinder.db.snapshot_get_all_active_by_window',
320                return_value=[fake_db_snapshot])
321    def test_get_all_active_by_window(self, get_all_active_by_window,
322                                      volume_get_by_id):
323        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
324        volume_get_by_id.return_value = fake_volume_obj
325
326        snapshots = objects.SnapshotList.get_all_active_by_window(
327            self.context, mock.sentinel.begin, mock.sentinel.end)
328        self.assertEqual(1, len(snapshots))
329        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
330
331    @mock.patch('cinder.objects.volume.Volume.get_by_id')
332    @mock.patch('cinder.db.snapshot_get_all_for_cgsnapshot',
333                return_value=[fake_db_snapshot])
334    def test_get_all_for_cgsnapshot(self, get_all_for_cgsnapshot,
335                                    volume_get_by_id):
336        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
337        volume_get_by_id.return_value = fake_volume_obj
338
339        snapshots = objects.SnapshotList.get_all_for_cgsnapshot(
340            self.context, mock.sentinel.cgsnapshot_id)
341        self.assertEqual(1, len(snapshots))
342        TestSnapshot._compare(self, fake_snapshot_obj, snapshots[0])
343
344    @mock.patch('cinder.objects.volume.Volume.get_by_id')
345    @mock.patch('cinder.db.snapshot_get_all')
346    def test_get_all_without_metadata(self, snapshot_get_all,
347                                      volume_get_by_id):
348        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
349        volume_get_by_id.return_value = fake_volume_obj
350
351        snapshot = copy.deepcopy(fake_db_snapshot)
352        del snapshot['snapshot_metadata']
353        snapshot_get_all.return_value = [snapshot]
354
355        search_opts = mock.sentinel.search_opts
356        self.assertRaises(exception.MetadataAbsent,
357                          objects.SnapshotList.get_all,
358                          self.context, search_opts)
359
360    @mock.patch('cinder.objects.volume.Volume.get_by_id')
361    @mock.patch('cinder.db.snapshot_get_all')
362    def test_get_all_with_metadata(self, snapshot_get_all, volume_get_by_id):
363        fake_volume_obj = fake_volume.fake_volume_obj(self.context)
364        volume_get_by_id.return_value = fake_volume_obj
365
366        db_snapshot = copy.deepcopy(fake_db_snapshot)
367        db_snapshot['snapshot_metadata'] = [{'key': 'fake_key',
368                                             'value': 'fake_value'}]
369        snapshot_get_all.return_value = [db_snapshot]
370
371        search_opts = mock.sentinel.search_opts
372        snapshots = objects.SnapshotList.get_all(
373            self.context, search_opts)
374        self.assertEqual(1, len(snapshots))
375
376        snapshot_obj = copy.deepcopy(fake_snapshot_obj)
377        snapshot_obj['metadata'] = {'fake_key': 'fake_value'}
378        TestSnapshot._compare(self, snapshot_obj, snapshots[0])
379        snapshot_get_all.assert_called_once_with(self.context, search_opts,
380                                                 None, None, None, None, None)
381