1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
13"""
14test_compute
15----------------------------------
16
17Functional tests for `shade` compute methods.
18"""
19
20import datetime
21
22from fixtures import TimeoutException
23
24from openstack.cloud import exc
25from openstack.tests.functional import base
26from openstack.tests.functional.cloud.util import pick_flavor
27from openstack import utils
28
29
30class TestCompute(base.BaseFunctionalTest):
31    def setUp(self):
32        # OS_TEST_TIMEOUT is 90 sec by default
33        # but on a bad day, test_attach_detach_volume can take more time.
34        self.TIMEOUT_SCALING_FACTOR = 1.5
35
36        super(TestCompute, self).setUp()
37        self.flavor = pick_flavor(
38            self.user_cloud.list_flavors(get_extra=False))
39        if self.flavor is None:
40            self.assertFalse('no sensible flavor available')
41        self.image = self.pick_image()
42        self.server_name = self.getUniqueString()
43
44    def _cleanup_servers_and_volumes(self, server_name):
45        """Delete the named server and any attached volumes.
46
47        Adding separate cleanup calls for servers and volumes can be tricky
48        since they need to be done in the proper order. And sometimes deleting
49        a server can start the process of deleting a volume if it is booted
50        from that volume. This encapsulates that logic.
51        """
52        server = self.user_cloud.get_server(server_name)
53        if not server:
54            return
55        volumes = self.user_cloud.get_volumes(server)
56        try:
57            self.user_cloud.delete_server(server.name, wait=True)
58            for volume in volumes:
59                if volume.status != 'deleting':
60                    self.user_cloud.delete_volume(volume.id, wait=True)
61        except (exc.OpenStackCloudTimeout, TimeoutException):
62            # Ups, some timeout occured during process of deletion server
63            # or volumes, so now we will try to call delete each of them
64            # once again and we will try to live with it
65            self.user_cloud.delete_server(server.name)
66            for volume in volumes:
67                self.operator_cloud.delete_volume(
68                    volume.id, wait=False, force=True)
69
70    def test_create_and_delete_server(self):
71        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
72        server = self.user_cloud.create_server(
73            name=self.server_name,
74            image=self.image,
75            flavor=self.flavor,
76            wait=True)
77        self.assertEqual(self.server_name, server['name'])
78        self.assertEqual(self.image.id, server['image']['id'])
79        self.assertEqual(self.flavor.id, server['flavor']['id'])
80        self.assertIsNotNone(server['adminPass'])
81        self.assertTrue(
82            self.user_cloud.delete_server(self.server_name, wait=True))
83        self.assertIsNone(self.user_cloud.get_server(self.server_name))
84
85    def test_create_and_delete_server_auto_ip_delete_ips(self):
86        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
87        server = self.user_cloud.create_server(
88            name=self.server_name,
89            image=self.image,
90            flavor=self.flavor,
91            auto_ip=True,
92            wait=True)
93        self.assertEqual(self.server_name, server['name'])
94        self.assertEqual(self.image.id, server['image']['id'])
95        self.assertEqual(self.flavor.id, server['flavor']['id'])
96        self.assertIsNotNone(server['adminPass'])
97        self.assertTrue(
98            self.user_cloud.delete_server(
99                self.server_name, wait=True, delete_ips=True))
100        self.assertIsNone(self.user_cloud.get_server(self.server_name))
101
102    def test_attach_detach_volume(self):
103        self.skipTest('Volume functional tests temporarily disabled')
104        server_name = self.getUniqueString()
105        self.addCleanup(self._cleanup_servers_and_volumes, server_name)
106        server = self.user_cloud.create_server(
107            name=server_name, image=self.image, flavor=self.flavor,
108            wait=True)
109        volume = self.user_cloud.create_volume(1)
110        vol_attachment = self.user_cloud.attach_volume(server, volume)
111        for key in ('device', 'serverId', 'volumeId'):
112            self.assertIn(key, vol_attachment)
113            self.assertTrue(vol_attachment[key])  # assert string is not empty
114        self.assertIsNone(self.user_cloud.detach_volume(server, volume))
115
116    def test_create_and_delete_server_with_config_drive(self):
117        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
118        server = self.user_cloud.create_server(
119            name=self.server_name,
120            image=self.image,
121            flavor=self.flavor,
122            config_drive=True,
123            wait=True)
124        self.assertEqual(self.server_name, server['name'])
125        self.assertEqual(self.image.id, server['image']['id'])
126        self.assertEqual(self.flavor.id, server['flavor']['id'])
127        self.assertTrue(server['has_config_drive'])
128        self.assertIsNotNone(server['adminPass'])
129        self.assertTrue(
130            self.user_cloud.delete_server(self.server_name, wait=True))
131        self.assertIsNone(self.user_cloud.get_server(self.server_name))
132
133    def test_create_and_delete_server_with_config_drive_none(self):
134        # check that we're not sending invalid values for config_drive
135        # if it's passed in explicitly as None - which nodepool does if it's
136        # not set in the config
137        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
138        server = self.user_cloud.create_server(
139            name=self.server_name,
140            image=self.image,
141            flavor=self.flavor,
142            config_drive=None,
143            wait=True)
144        self.assertEqual(self.server_name, server['name'])
145        self.assertEqual(self.image.id, server['image']['id'])
146        self.assertEqual(self.flavor.id, server['flavor']['id'])
147        self.assertFalse(server['has_config_drive'])
148        self.assertIsNotNone(server['adminPass'])
149        self.assertTrue(
150            self.user_cloud.delete_server(
151                self.server_name, wait=True))
152        self.assertIsNone(self.user_cloud.get_server(self.server_name))
153
154    def test_list_all_servers(self):
155        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
156        server = self.user_cloud.create_server(
157            name=self.server_name,
158            image=self.image,
159            flavor=self.flavor,
160            wait=True)
161        # We're going to get servers from other tests, but that's ok, as long
162        # as we get the server we created with the demo user.
163        found_server = False
164        for s in self.operator_cloud.list_servers(all_projects=True):
165            if s.name == server.name:
166                found_server = True
167        self.assertTrue(found_server)
168
169    def test_list_all_servers_bad_permissions(self):
170        # Normal users are not allowed to pass all_projects=True
171        self.assertRaises(
172            exc.OpenStackCloudException,
173            self.user_cloud.list_servers,
174            all_projects=True)
175
176    def test_create_server_image_flavor_dict(self):
177        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
178        server = self.user_cloud.create_server(
179            name=self.server_name,
180            image={'id': self.image.id},
181            flavor={'id': self.flavor.id},
182            wait=True)
183        self.assertEqual(self.server_name, server['name'])
184        self.assertEqual(self.image.id, server['image']['id'])
185        self.assertEqual(self.flavor.id, server['flavor']['id'])
186        self.assertIsNotNone(server['adminPass'])
187        self.assertTrue(
188            self.user_cloud.delete_server(self.server_name, wait=True))
189        self.assertIsNone(self.user_cloud.get_server(self.server_name))
190
191    def test_get_server_console(self):
192        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
193        server = self.user_cloud.create_server(
194            name=self.server_name,
195            image=self.image,
196            flavor=self.flavor,
197            wait=True)
198        # _get_server_console_output does not trap HTTP exceptions, so this
199        # returning a string tests that the call is correct. Testing that
200        # the cloud returns actual data in the output is out of scope.
201        log = self.user_cloud._get_server_console_output(server_id=server.id)
202        self.assertTrue(isinstance(log, str))
203
204    def test_get_server_console_name_or_id(self):
205        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
206        self.user_cloud.create_server(
207            name=self.server_name,
208            image=self.image,
209            flavor=self.flavor,
210            wait=True)
211        log = self.user_cloud.get_server_console(server=self.server_name)
212        self.assertTrue(isinstance(log, str))
213
214    def test_list_availability_zone_names(self):
215        self.assertEqual(
216            ['nova'], self.user_cloud.list_availability_zone_names())
217
218    def test_get_server_console_bad_server(self):
219        self.assertRaises(
220            exc.OpenStackCloudException,
221            self.user_cloud.get_server_console,
222            server=self.server_name)
223
224    def test_create_and_delete_server_with_admin_pass(self):
225        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
226        server = self.user_cloud.create_server(
227            name=self.server_name,
228            image=self.image,
229            flavor=self.flavor,
230            admin_pass='sheiqu9loegahSh',
231            wait=True)
232        self.assertEqual(self.server_name, server['name'])
233        self.assertEqual(self.image.id, server['image']['id'])
234        self.assertEqual(self.flavor.id, server['flavor']['id'])
235        self.assertEqual(server['adminPass'], 'sheiqu9loegahSh')
236        self.assertTrue(
237            self.user_cloud.delete_server(self.server_name, wait=True))
238        self.assertIsNone(self.user_cloud.get_server(self.server_name))
239
240    def test_get_image_id(self):
241        self.assertEqual(
242            self.image.id, self.user_cloud.get_image_id(self.image.id))
243        self.assertEqual(
244            self.image.id, self.user_cloud.get_image_id(self.image.name))
245
246    def test_get_image_name(self):
247        self.assertEqual(
248            self.image.name, self.user_cloud.get_image_name(self.image.id))
249        self.assertEqual(
250            self.image.name, self.user_cloud.get_image_name(self.image.name))
251
252    def _assert_volume_attach(self, server, volume_id=None, image=''):
253        self.assertEqual(self.server_name, server['name'])
254        self.assertEqual(image, server['image'])
255        self.assertEqual(self.flavor.id, server['flavor']['id'])
256        volumes = self.user_cloud.get_volumes(server)
257        self.assertEqual(1, len(volumes))
258        volume = volumes[0]
259        if volume_id:
260            self.assertEqual(volume_id, volume['id'])
261        else:
262            volume_id = volume['id']
263        self.assertEqual(1, len(volume['attachments']), 1)
264        self.assertEqual(server['id'], volume['attachments'][0]['server_id'])
265        return volume_id
266
267    def test_create_boot_from_volume_image(self):
268        self.skipTest('Volume functional tests temporarily disabled')
269        if not self.user_cloud.has_service('volume'):
270            self.skipTest('volume service not supported by cloud')
271        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
272        server = self.user_cloud.create_server(
273            name=self.server_name,
274            image=self.image,
275            flavor=self.flavor,
276            boot_from_volume=True,
277            volume_size=1,
278            wait=True)
279        volume_id = self._assert_volume_attach(server)
280        volume = self.user_cloud.get_volume(volume_id)
281        self.assertIsNotNone(volume)
282        self.assertEqual(volume['name'], volume['display_name'])
283        self.assertTrue(volume['bootable'])
284        self.assertEqual(server['id'], volume['attachments'][0]['server_id'])
285        self.assertTrue(self.user_cloud.delete_server(server.id, wait=True))
286        self._wait_for_detach(volume.id)
287        self.assertTrue(self.user_cloud.delete_volume(volume.id, wait=True))
288        self.assertIsNone(self.user_cloud.get_server(server.id))
289        self.assertIsNone(self.user_cloud.get_volume(volume.id))
290
291    def _wait_for_detach(self, volume_id):
292        # Volumes do not show up as unattached for a bit immediately after
293        # deleting a server that had had a volume attached. Yay for eventual
294        # consistency!
295        for count in utils.iterate_timeout(
296                60,
297                'Timeout waiting for volume {volume_id} to detach'.format(
298                    volume_id=volume_id)):
299            volume = self.user_cloud.get_volume(volume_id)
300            if volume.status in (
301                    'available', 'error',
302                    'error_restoring', 'error_extending'):
303                return
304
305    def test_create_terminate_volume_image(self):
306        self.skipTest('Volume functional tests temporarily disabled')
307        if not self.user_cloud.has_service('volume'):
308            self.skipTest('volume service not supported by cloud')
309        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
310        server = self.user_cloud.create_server(
311            name=self.server_name,
312            image=self.image,
313            flavor=self.flavor,
314            boot_from_volume=True,
315            terminate_volume=True,
316            volume_size=1,
317            wait=True)
318        volume_id = self._assert_volume_attach(server)
319        self.assertTrue(
320            self.user_cloud.delete_server(self.server_name, wait=True))
321        volume = self.user_cloud.get_volume(volume_id)
322        # We can either get None (if the volume delete was quick), or a volume
323        # that is in the process of being deleted.
324        if volume:
325            self.assertEqual('deleting', volume.status)
326        self.assertIsNone(self.user_cloud.get_server(self.server_name))
327
328    def test_create_boot_from_volume_preexisting(self):
329        self.skipTest('Volume functional tests temporarily disabled')
330        if not self.user_cloud.has_service('volume'):
331            self.skipTest('volume service not supported by cloud')
332        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
333        volume = self.user_cloud.create_volume(
334            size=1, name=self.server_name, image=self.image, wait=True)
335        self.addCleanup(self.user_cloud.delete_volume, volume.id)
336        server = self.user_cloud.create_server(
337            name=self.server_name,
338            image=None,
339            flavor=self.flavor,
340            boot_volume=volume,
341            volume_size=1,
342            wait=True)
343        volume_id = self._assert_volume_attach(server, volume_id=volume['id'])
344        self.assertTrue(
345            self.user_cloud.delete_server(self.server_name, wait=True))
346        volume = self.user_cloud.get_volume(volume_id)
347        self.assertIsNotNone(volume)
348        self.assertEqual(volume['name'], volume['display_name'])
349        self.assertTrue(volume['bootable'])
350        self.assertEqual([], volume['attachments'])
351        self._wait_for_detach(volume.id)
352        self.assertTrue(self.user_cloud.delete_volume(volume_id))
353        self.assertIsNone(self.user_cloud.get_server(self.server_name))
354        self.assertIsNone(self.user_cloud.get_volume(volume_id))
355
356    def test_create_boot_attach_volume(self):
357        self.skipTest('Volume functional tests temporarily disabled')
358        if not self.user_cloud.has_service('volume'):
359            self.skipTest('volume service not supported by cloud')
360        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
361        volume = self.user_cloud.create_volume(
362            size=1, name=self.server_name, image=self.image, wait=True)
363        self.addCleanup(self.user_cloud.delete_volume, volume['id'])
364        server = self.user_cloud.create_server(
365            name=self.server_name,
366            flavor=self.flavor,
367            image=self.image,
368            boot_from_volume=False,
369            volumes=[volume],
370            wait=True)
371        volume_id = self._assert_volume_attach(
372            server, volume_id=volume['id'], image={'id': self.image['id']})
373        self.assertTrue(
374            self.user_cloud.delete_server(self.server_name, wait=True))
375        volume = self.user_cloud.get_volume(volume_id)
376        self.assertIsNotNone(volume)
377        self.assertEqual(volume['name'], volume['display_name'])
378        self.assertEqual([], volume['attachments'])
379        self._wait_for_detach(volume.id)
380        self.assertTrue(self.user_cloud.delete_volume(volume_id))
381        self.assertIsNone(self.user_cloud.get_server(self.server_name))
382        self.assertIsNone(self.user_cloud.get_volume(volume_id))
383
384    def test_create_boot_from_volume_preexisting_terminate(self):
385        self.skipTest('Volume functional tests temporarily disabled')
386        if not self.user_cloud.has_service('volume'):
387            self.skipTest('volume service not supported by cloud')
388        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
389        volume = self.user_cloud.create_volume(
390            size=1, name=self.server_name, image=self.image, wait=True)
391        server = self.user_cloud.create_server(
392            name=self.server_name,
393            image=None,
394            flavor=self.flavor,
395            boot_volume=volume,
396            terminate_volume=True,
397            volume_size=1,
398            wait=True)
399        volume_id = self._assert_volume_attach(server, volume_id=volume['id'])
400        self.assertTrue(
401            self.user_cloud.delete_server(self.server_name, wait=True))
402        volume = self.user_cloud.get_volume(volume_id)
403        # We can either get None (if the volume delete was quick), or a volume
404        # that is in the process of being deleted.
405        if volume:
406            self.assertEqual('deleting', volume.status)
407        self.assertIsNone(self.user_cloud.get_server(self.server_name))
408
409    def test_create_image_snapshot_wait_active(self):
410        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
411        server = self.user_cloud.create_server(
412            name=self.server_name,
413            image=self.image,
414            flavor=self.flavor,
415            admin_pass='sheiqu9loegahSh',
416            wait=True)
417        image = self.user_cloud.create_image_snapshot('test-snapshot', server,
418                                                      wait=True)
419        self.addCleanup(self.user_cloud.delete_image, image['id'])
420        self.assertEqual('active', image['status'])
421
422    def test_set_and_delete_metadata(self):
423        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
424        self.user_cloud.create_server(
425            name=self.server_name,
426            image=self.image,
427            flavor=self.flavor,
428            wait=True)
429        self.user_cloud.set_server_metadata(self.server_name,
430                                            {'key1': 'value1',
431                                             'key2': 'value2'})
432        updated_server = self.user_cloud.get_server(self.server_name)
433        self.assertEqual(set(updated_server.metadata.items()),
434                         set({'key1': 'value1', 'key2': 'value2'}.items()))
435
436        self.user_cloud.set_server_metadata(self.server_name,
437                                            {'key2': 'value3'})
438        updated_server = self.user_cloud.get_server(self.server_name)
439        self.assertEqual(set(updated_server.metadata.items()),
440                         set({'key1': 'value1', 'key2': 'value3'}.items()))
441
442        self.user_cloud.delete_server_metadata(self.server_name, ['key2'])
443        updated_server = self.user_cloud.get_server(self.server_name)
444        self.assertEqual(set(updated_server.metadata.items()),
445                         set({'key1': 'value1'}.items()))
446
447        self.user_cloud.delete_server_metadata(self.server_name, ['key1'])
448        updated_server = self.user_cloud.get_server(self.server_name)
449        self.assertEqual(set(updated_server.metadata.items()), set([]))
450
451        self.assertRaises(
452            exc.OpenStackCloudURINotFound,
453            self.user_cloud.delete_server_metadata,
454            self.server_name, ['key1'])
455
456    def test_update_server(self):
457        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
458        self.user_cloud.create_server(
459            name=self.server_name,
460            image=self.image,
461            flavor=self.flavor,
462            wait=True)
463        server_updated = self.user_cloud.update_server(
464            self.server_name,
465            name='new_name'
466        )
467        self.assertEqual('new_name', server_updated['name'])
468
469    def test_get_compute_usage(self):
470        '''Test usage functionality'''
471        # Add a server so that we can know we have usage
472        self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
473        self.user_cloud.create_server(
474            name=self.server_name,
475            image=self.image,
476            flavor=self.flavor,
477            wait=True)
478        start = datetime.datetime.now() - datetime.timedelta(seconds=5)
479        usage = self.operator_cloud.get_compute_usage('demo', start)
480        self.add_info_on_exception('usage', usage)
481        self.assertIsNotNone(usage)
482        self.assertIn('total_hours', usage)
483        self.assertIn('started_at', usage)
484        self.assertEqual(start.isoformat(), usage['started_at'])
485        self.assertIn('location', usage)
486