1# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16test_floating_ip
17----------------------------------
18
19Functional tests for floating IP resource.
20"""
21
22import pprint
23import sys
24
25from testtools import content
26
27from openstack.cloud.exc import OpenStackCloudException
28from openstack.cloud import meta
29from openstack import proxy
30from openstack.tests.functional import base
31from openstack.tests.functional.cloud.util import pick_flavor
32from openstack import utils
33
34
35class TestFloatingIP(base.BaseFunctionalTest):
36    timeout = 60
37
38    def setUp(self):
39        super(TestFloatingIP, self).setUp()
40        self.flavor = pick_flavor(
41            self.user_cloud.list_flavors(get_extra=False))
42        if self.flavor is None:
43            self.assertFalse('no sensible flavor available')
44        self.image = self.pick_image()
45
46        # Generate a random name for these tests
47        self.new_item_name = self.getUniqueString()
48
49        self.addCleanup(self._cleanup_network)
50        self.addCleanup(self._cleanup_servers)
51
52    def _cleanup_network(self):
53        exception_list = list()
54        tb_list = list()
55
56        # Delete stale networks as well as networks created for this test
57        if self.user_cloud.has_service('network'):
58            # Delete routers
59            for r in self.user_cloud.list_routers():
60                try:
61                    if r['name'].startswith(self.new_item_name):
62                        self.user_cloud.update_router(
63                            r, ext_gateway_net_id=None)
64                        for s in self.user_cloud.list_subnets():
65                            if s['name'].startswith(self.new_item_name):
66                                try:
67                                    self.user_cloud.remove_router_interface(
68                                        r, subnet_id=s['id'])
69                                except Exception:
70                                    pass
71                        self.user_cloud.delete_router(r)
72                except Exception as e:
73                    exception_list.append(e)
74                    tb_list.append(sys.exc_info()[2])
75                    continue
76            # Delete subnets
77            for s in self.user_cloud.list_subnets():
78                if s['name'].startswith(self.new_item_name):
79                    try:
80                        self.user_cloud.delete_subnet(s)
81                    except Exception as e:
82                        exception_list.append(e)
83                        tb_list.append(sys.exc_info()[2])
84                        continue
85            # Delete networks
86            for n in self.user_cloud.list_networks():
87                if n['name'].startswith(self.new_item_name):
88                    try:
89                        self.user_cloud.delete_network(n)
90                    except Exception as e:
91                        exception_list.append(e)
92                        tb_list.append(sys.exc_info()[2])
93                        continue
94
95        if exception_list:
96            # Raise an error: we must make users aware that something went
97            # wrong
98            if len(exception_list) > 1:
99                self.addDetail(
100                    'exceptions',
101                    content.text_content(
102                        '\n'.join([str(ex) for ex in exception_list])))
103            exc = exception_list[0]
104            raise exc
105
106    def _cleanup_servers(self):
107        exception_list = list()
108
109        # Delete stale servers as well as server created for this test
110        for i in self.user_cloud.list_servers(bare=True):
111            if i.name.startswith(self.new_item_name):
112                try:
113                    self.user_cloud.delete_server(i, wait=True)
114                except Exception as e:
115                    exception_list.append(str(e))
116                    continue
117
118        if exception_list:
119            # Raise an error: we must make users aware that something went
120            # wrong
121            raise OpenStackCloudException('\n'.join(exception_list))
122
123    def _cleanup_ips(self, server):
124
125        exception_list = list()
126
127        fixed_ip = meta.get_server_private_ip(server)
128
129        for ip in self.user_cloud.list_floating_ips():
130            if (ip.get('fixed_ip', None) == fixed_ip
131                    or ip.get('fixed_ip_address', None) == fixed_ip):
132                try:
133                    self.user_cloud.delete_floating_ip(ip)
134                except Exception as e:
135                    exception_list.append(str(e))
136                    continue
137
138        if exception_list:
139            # Raise an error: we must make users aware that something went
140            # wrong
141            raise OpenStackCloudException('\n'.join(exception_list))
142
143    def _setup_networks(self):
144        if self.user_cloud.has_service('network'):
145            # Create a network
146            self.test_net = self.user_cloud.create_network(
147                name=self.new_item_name + '_net')
148            # Create a subnet on it
149            self.test_subnet = self.user_cloud.create_subnet(
150                subnet_name=self.new_item_name + '_subnet',
151                network_name_or_id=self.test_net['id'],
152                cidr='10.24.4.0/24',
153                enable_dhcp=True
154            )
155            # Create a router
156            self.test_router = self.user_cloud.create_router(
157                name=self.new_item_name + '_router')
158            # Attach the router to an external network
159            ext_nets = self.user_cloud.search_networks(
160                filters={'router:external': True})
161            self.user_cloud.update_router(
162                name_or_id=self.test_router['id'],
163                ext_gateway_net_id=ext_nets[0]['id'])
164            # Attach the router to the internal subnet
165            self.user_cloud.add_router_interface(
166                self.test_router, subnet_id=self.test_subnet['id'])
167
168            # Select the network for creating new servers
169            self.nic = {'net-id': self.test_net['id']}
170            self.addDetail(
171                'networks-neutron',
172                content.text_content(pprint.pformat(
173                    self.user_cloud.list_networks())))
174        else:
175            # Find network names for nova-net
176            data = proxy._json_response(
177                self.user_cloud._conn.compute.get('/os-tenant-networks'))
178            nets = meta.get_and_munchify('networks', data)
179            self.addDetail(
180                'networks-nova',
181                content.text_content(pprint.pformat(
182                    nets)))
183            self.nic = {'net-id': nets[0].id}
184
185    def test_private_ip(self):
186        self._setup_networks()
187
188        new_server = self.user_cloud.get_openstack_vars(
189            self.user_cloud.create_server(
190                wait=True, name=self.new_item_name + '_server',
191                image=self.image,
192                flavor=self.flavor, nics=[self.nic]))
193
194        self.addDetail(
195            'server', content.text_content(pprint.pformat(new_server)))
196        self.assertNotEqual(new_server['private_v4'], '')
197
198    def test_add_auto_ip(self):
199        self._setup_networks()
200
201        new_server = self.user_cloud.create_server(
202            wait=True, name=self.new_item_name + '_server',
203            image=self.image,
204            flavor=self.flavor, nics=[self.nic])
205
206        # ToDo: remove the following iteration when create_server waits for
207        # the IP to be attached
208        ip = None
209        for _ in utils.iterate_timeout(
210                self.timeout, "Timeout waiting for IP address to be attached"):
211            ip = meta.get_server_external_ipv4(self.user_cloud, new_server)
212            if ip is not None:
213                break
214            new_server = self.user_cloud.get_server(new_server.id)
215
216        self.addCleanup(self._cleanup_ips, new_server)
217
218    def test_detach_ip_from_server(self):
219        self._setup_networks()
220
221        new_server = self.user_cloud.create_server(
222            wait=True, name=self.new_item_name + '_server',
223            image=self.image,
224            flavor=self.flavor, nics=[self.nic])
225
226        # ToDo: remove the following iteration when create_server waits for
227        # the IP to be attached
228        ip = None
229        for _ in utils.iterate_timeout(
230                self.timeout, "Timeout waiting for IP address to be attached"):
231            ip = meta.get_server_external_ipv4(self.user_cloud, new_server)
232            if ip is not None:
233                break
234            new_server = self.user_cloud.get_server(new_server.id)
235
236        self.addCleanup(self._cleanup_ips, new_server)
237
238        f_ip = self.user_cloud.get_floating_ip(
239            id=None, filters={'floating_ip_address': ip})
240        self.user_cloud.detach_ip_from_server(
241            server_id=new_server.id, floating_ip_id=f_ip['id'])
242
243    def test_list_floating_ips(self):
244        fip_admin = self.operator_cloud.create_floating_ip()
245        self.addCleanup(self.operator_cloud.delete_floating_ip, fip_admin.id)
246        fip_user = self.user_cloud.create_floating_ip()
247        self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
248
249        # Get all the floating ips.
250        fip_id_list = [
251            fip.id for fip in self.operator_cloud.list_floating_ips()
252        ]
253        if self.user_cloud.has_service('network'):
254            # Neutron returns all FIP for all projects by default
255            self.assertIn(fip_admin.id, fip_id_list)
256            self.assertIn(fip_user.id, fip_id_list)
257
258            # Ask Neutron for only a subset of all the FIPs.
259            filtered_fip_id_list = [
260                fip.id for fip in self.operator_cloud.list_floating_ips(
261                    {'tenant_id': self.user_cloud.current_project_id}
262                )
263            ]
264            self.assertNotIn(fip_admin.id, filtered_fip_id_list)
265            self.assertIn(fip_user.id, filtered_fip_id_list)
266
267        else:
268            self.assertIn(fip_admin.id, fip_id_list)
269            # By default, Nova returns only the FIPs that belong to the
270            # project which made the listing request.
271            self.assertNotIn(fip_user.id, fip_id_list)
272            self.assertRaisesRegex(
273                ValueError, "Nova-network don't support server-side.*",
274                self.operator_cloud.list_floating_ips, filters={'foo': 'bar'}
275            )
276
277    def test_search_floating_ips(self):
278        fip_user = self.user_cloud.create_floating_ip()
279        self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
280
281        self.assertIn(
282            fip_user['id'],
283            [fip.id for fip in self.user_cloud.search_floating_ips(
284                filters={"attached": False})]
285        )
286        self.assertNotIn(
287            fip_user['id'],
288            [fip.id for fip in self.user_cloud.search_floating_ips(
289                filters={"attached": True})]
290        )
291
292    def test_get_floating_ip_by_id(self):
293        fip_user = self.user_cloud.create_floating_ip()
294        self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
295
296        ret_fip = self.user_cloud.get_floating_ip_by_id(fip_user.id)
297        self.assertEqual(fip_user, ret_fip)
298