1# Licensed under the Apache License, Version 2.0 (the "License"); you may 2# not use this file except in compliance with the License. You may obtain 3# a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10# License for the specific language governing permissions and limitations 11# under the License. 12 13""" 14test_cluster_templates 15---------------------------------- 16 17Functional tests for `openstack.cloud` cluster_template methods. 18""" 19 20import subprocess 21 22import fixtures 23from testtools import content 24 25from openstack.tests.functional import base 26 27 28class TestClusterTemplate(base.BaseFunctionalTest): 29 30 def setUp(self): 31 super(TestClusterTemplate, self).setUp() 32 if not self.user_cloud.has_service( 33 'container-infrastructure-management' 34 ): 35 self.skipTest('Container service not supported by cloud') 36 self.ct = None 37 self.ssh_directory = self.useFixture(fixtures.TempDir()).path 38 39 def test_cluster_templates(self): 40 '''Test cluster_templates functionality''' 41 name = 'fake-cluster_template' 42 server_type = 'vm' 43 public = False 44 image_id = 'fedora-atomic-f23-dib' 45 tls_disabled = False 46 registry_enabled = False 47 coe = 'kubernetes' 48 keypair_id = 'testkey' 49 50 self.addDetail('cluster_template', content.text_content(name)) 51 self.addCleanup(self.cleanup, name) 52 53 # generate a keypair to add to nova 54 subprocess.call( 55 ['ssh-keygen', '-t', 'rsa', '-N', '', '-f', 56 '%s/id_rsa_sdk' % self.ssh_directory]) 57 58 # add keypair to nova 59 with open('%s/id_rsa_sdk.pub' % self.ssh_directory) as f: 60 key_content = f.read() 61 self.user_cloud.create_keypair('testkey', key_content) 62 63 # Test we can create a cluster_template and we get it returned 64 self.ct = self.user_cloud.create_cluster_template( 65 name=name, image_id=image_id, 66 keypair_id=keypair_id, coe=coe) 67 self.assertEqual(self.ct['name'], name) 68 self.assertEqual(self.ct['image_id'], image_id) 69 self.assertEqual(self.ct['keypair_id'], keypair_id) 70 self.assertEqual(self.ct['coe'], coe) 71 self.assertEqual(self.ct['registry_enabled'], registry_enabled) 72 self.assertEqual(self.ct['tls_disabled'], tls_disabled) 73 self.assertEqual(self.ct['public'], public) 74 self.assertEqual(self.ct['server_type'], server_type) 75 76 # Test that we can list cluster_templates 77 cluster_templates = self.user_cloud.list_cluster_templates() 78 self.assertIsNotNone(cluster_templates) 79 80 # Test we get the same cluster_template with the 81 # get_cluster_template method 82 cluster_template_get = self.user_cloud.get_cluster_template( 83 self.ct['uuid']) 84 self.assertEqual(cluster_template_get['uuid'], self.ct['uuid']) 85 86 # Test the get method also works by name 87 cluster_template_get = self.user_cloud.get_cluster_template(name) 88 self.assertEqual(cluster_template_get['name'], self.ct['name']) 89 90 # Test we can update a field on the cluster_template and only that 91 # field is updated 92 cluster_template_update = self.user_cloud.update_cluster_template( 93 self.ct['uuid'], 'replace', tls_disabled=True) 94 self.assertEqual( 95 cluster_template_update['uuid'], self.ct['uuid']) 96 self.assertTrue(cluster_template_update['tls_disabled']) 97 98 # Test we can delete and get True returned 99 cluster_template_delete = self.user_cloud.delete_cluster_template( 100 self.ct['uuid']) 101 self.assertTrue(cluster_template_delete) 102 103 def cleanup(self, name): 104 if self.ct: 105 try: 106 self.user_cloud.delete_cluster_template(self.ct['name']) 107 except Exception: 108 pass 109 110 # delete keypair 111 self.user_cloud.delete_keypair('testkey') 112