1# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16test_port
17----------------------------------
18
19Functional tests for `shade` port resource.
20"""
21
22import random
23import string
24
25from openstack.cloud.exc import OpenStackCloudException
26from openstack.tests.functional import base
27
28
29class TestPort(base.BaseFunctionalTest):
30
31    def setUp(self):
32        super(TestPort, self).setUp()
33        # Skip Neutron tests if neutron is not present
34        if not self.operator_cloud.has_service('network'):
35            self.skipTest('Network service not supported by cloud')
36
37        # Generate a unique port name to allow concurrent tests
38        self.new_port_name = 'test_' + ''.join(
39            random.choice(string.ascii_lowercase) for _ in range(5))
40
41        self.addCleanup(self._cleanup_ports)
42
43    def _cleanup_ports(self):
44        exception_list = list()
45
46        for p in self.operator_cloud.list_ports():
47            if p['name'].startswith(self.new_port_name):
48                try:
49                    self.operator_cloud.delete_port(name_or_id=p['id'])
50                except Exception as e:
51                    # We were unable to delete this port, let's try with next
52                    exception_list.append(str(e))
53                    continue
54
55        if exception_list:
56            # Raise an error: we must make users aware that something went
57            # wrong
58            raise OpenStackCloudException('\n'.join(exception_list))
59
60    def test_create_port(self):
61        port_name = self.new_port_name + '_create'
62
63        networks = self.operator_cloud.list_networks()
64        if not networks:
65            self.assertFalse('no sensible network available')
66
67        port = self.operator_cloud.create_port(
68            network_id=networks[0]['id'], name=port_name)
69        self.assertIsInstance(port, dict)
70        self.assertIn('id', port)
71        self.assertEqual(port.get('name'), port_name)
72
73    def test_get_port(self):
74        port_name = self.new_port_name + '_get'
75
76        networks = self.operator_cloud.list_networks()
77        if not networks:
78            self.assertFalse('no sensible network available')
79
80        port = self.operator_cloud.create_port(
81            network_id=networks[0]['id'], name=port_name)
82        self.assertIsInstance(port, dict)
83        self.assertIn('id', port)
84        self.assertEqual(port.get('name'), port_name)
85
86        updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
87        # extra_dhcp_opts is added later by Neutron...
88        if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port:
89            del updated_port['extra_dhcp_opts']
90        self.assertEqual(port, updated_port)
91
92    def test_get_port_by_id(self):
93        port_name = self.new_port_name + '_get_by_id'
94
95        networks = self.operator_cloud.list_networks()
96        if not networks:
97            self.assertFalse('no sensible network available')
98
99        port = self.operator_cloud.create_port(
100            network_id=networks[0]['id'], name=port_name)
101        self.assertIsInstance(port, dict)
102        self.assertIn('id', port)
103        self.assertEqual(port.get('name'), port_name)
104
105        updated_port = self.operator_cloud.get_port_by_id(port['id'])
106        # extra_dhcp_opts is added later by Neutron...
107        if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port:
108            del updated_port['extra_dhcp_opts']
109        self.assertEqual(port, updated_port)
110
111    def test_update_port(self):
112        port_name = self.new_port_name + '_update'
113        new_port_name = port_name + '_new'
114
115        networks = self.operator_cloud.list_networks()
116        if not networks:
117            self.assertFalse('no sensible network available')
118
119        self.operator_cloud.create_port(
120            network_id=networks[0]['id'], name=port_name)
121
122        port = self.operator_cloud.update_port(
123            name_or_id=port_name, name=new_port_name)
124        self.assertIsInstance(port, dict)
125        self.assertEqual(port.get('name'), new_port_name)
126
127        updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
128        self.assertEqual(port.get('name'), new_port_name)
129        port.pop('revision_number', None)
130        port.pop(u'revision_number', None)
131        port.pop('updated_at', None)
132        port.pop(u'updated_at', None)
133        updated_port.pop('revision_number', None)
134        updated_port.pop(u'revision_number', None)
135        updated_port.pop('updated_at', None)
136        updated_port.pop(u'updated_at', None)
137
138        self.assertEqual(port, updated_port)
139
140    def test_delete_port(self):
141        port_name = self.new_port_name + '_delete'
142
143        networks = self.operator_cloud.list_networks()
144        if not networks:
145            self.assertFalse('no sensible network available')
146
147        port = self.operator_cloud.create_port(
148            network_id=networks[0]['id'], name=port_name)
149        self.assertIsInstance(port, dict)
150        self.assertIn('id', port)
151        self.assertEqual(port.get('name'), port_name)
152
153        updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
154        self.assertIsNotNone(updated_port)
155
156        self.operator_cloud.delete_port(name_or_id=port_name)
157
158        updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
159        self.assertIsNone(updated_port)
160