1# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import concurrent.futures
16import hashlib
17import logging
18import sys
19from unittest import mock
20
21import fixtures
22import os_service_types
23import testtools
24
25import openstack
26from openstack import exceptions
27from openstack.tests.unit import base
28from openstack import utils
29
30
31class Test_enable_logging(base.TestCase):
32
33    def setUp(self):
34        super(Test_enable_logging, self).setUp()
35        self.openstack_logger = mock.Mock()
36        self.openstack_logger.handlers = []
37        self.ksa_logger_root = mock.Mock()
38        self.ksa_logger_root.handlers = []
39        self.ksa_logger_1 = mock.Mock()
40        self.ksa_logger_1.handlers = []
41        self.ksa_logger_2 = mock.Mock()
42        self.ksa_logger_2.handlers = []
43        self.ksa_logger_3 = mock.Mock()
44        self.ksa_logger_3.handlers = []
45        self.urllib3_logger = mock.Mock()
46        self.urllib3_logger.handlers = []
47        self.stevedore_logger = mock.Mock()
48        self.stevedore_logger.handlers = []
49        self.fake_get_logger = mock.Mock()
50        self.fake_get_logger.side_effect = [
51            self.openstack_logger,
52            self.ksa_logger_root,
53            self.urllib3_logger,
54            self.stevedore_logger,
55            self.ksa_logger_1,
56            self.ksa_logger_2,
57            self.ksa_logger_3
58        ]
59        self.useFixture(
60            fixtures.MonkeyPatch('logging.getLogger', self.fake_get_logger))
61
62    def _console_tests(self, level, debug, stream):
63
64        openstack.enable_logging(debug=debug, stream=stream)
65
66        self.assertEqual(self.openstack_logger.addHandler.call_count, 1)
67        self.openstack_logger.setLevel.assert_called_with(level)
68
69    def _file_tests(self, level, debug):
70        file_handler = mock.Mock()
71        self.useFixture(
72            fixtures.MonkeyPatch('logging.FileHandler', file_handler))
73        fake_path = "fake/path.log"
74
75        openstack.enable_logging(debug=debug, path=fake_path)
76
77        file_handler.assert_called_with(fake_path)
78        self.assertEqual(self.openstack_logger.addHandler.call_count, 1)
79        self.openstack_logger.setLevel.assert_called_with(level)
80
81    def test_none(self):
82        openstack.enable_logging(debug=True)
83        self.fake_get_logger.assert_has_calls([])
84        self.openstack_logger.setLevel.assert_called_with(logging.DEBUG)
85        self.assertEqual(self.openstack_logger.addHandler.call_count, 1)
86        self.assertIsInstance(
87            self.openstack_logger.addHandler.call_args_list[0][0][0],
88            logging.StreamHandler)
89
90    def test_debug_console_stderr(self):
91        self._console_tests(logging.DEBUG, True, sys.stderr)
92
93    def test_warning_console_stderr(self):
94        self._console_tests(logging.INFO, False, sys.stderr)
95
96    def test_debug_console_stdout(self):
97        self._console_tests(logging.DEBUG, True, sys.stdout)
98
99    def test_warning_console_stdout(self):
100        self._console_tests(logging.INFO, False, sys.stdout)
101
102    def test_debug_file(self):
103        self._file_tests(logging.DEBUG, True)
104
105    def test_warning_file(self):
106        self._file_tests(logging.INFO, False)
107
108
109class Test_urljoin(base.TestCase):
110
111    def test_strings(self):
112        root = "http://www.example.com"
113        leaves = "foo", "bar"
114
115        result = utils.urljoin(root, *leaves)
116        self.assertEqual(result, "http://www.example.com/foo/bar")
117
118    def test_with_none(self):
119        root = "http://www.example.com"
120        leaves = "foo", None
121
122        result = utils.urljoin(root, *leaves)
123        self.assertEqual(result, "http://www.example.com/foo/")
124
125    def test_unicode_strings(self):
126        root = "http://www.example.com"
127        leaves = u"ascii", u"extra_chars-™"
128
129        try:
130            result = utils.urljoin(root, *leaves)
131        except Exception:
132            self.fail("urljoin failed on unicode strings")
133
134        self.assertEqual(result, u"http://www.example.com/ascii/extra_chars-™")
135
136
137class TestSupportsMicroversion(base.TestCase):
138    def setUp(self):
139        super(TestSupportsMicroversion, self).setUp()
140        self.adapter = mock.Mock(spec=['get_endpoint_data'])
141        self.endpoint_data = mock.Mock(spec=['min_microversion',
142                                             'max_microversion'],
143                                       min_microversion='1.1',
144                                       max_microversion='1.99')
145        self.adapter.get_endpoint_data.return_value = self.endpoint_data
146
147    def test_requested_supported_no_default(self):
148        self.adapter.default_microversion = None
149        self.assertTrue(
150            utils.supports_microversion(self.adapter, '1.2'))
151
152    def test_requested_not_supported_no_default(self):
153        self.adapter.default_microversion = None
154        self.assertFalse(
155            utils.supports_microversion(self.adapter, '2.2'))
156
157    def test_requested_not_supported_no_default_exception(self):
158        self.adapter.default_microversion = None
159        self.assertRaises(
160            exceptions.SDKException,
161            utils.supports_microversion,
162            self.adapter,
163            '2.2',
164            True)
165
166    def test_requested_supported_higher_default(self):
167        self.adapter.default_microversion = '1.8'
168        self.assertTrue(
169            utils.supports_microversion(self.adapter, '1.6'))
170
171    def test_requested_supported_equal_default(self):
172        self.adapter.default_microversion = '1.8'
173        self.assertTrue(
174            utils.supports_microversion(self.adapter, '1.8'))
175
176    def test_requested_supported_lower_default(self):
177        self.adapter.default_microversion = '1.2'
178        self.assertFalse(
179            utils.supports_microversion(self.adapter, '1.8'))
180
181    def test_requested_supported_lower_default_exception(self):
182        self.adapter.default_microversion = '1.2'
183        self.assertRaises(
184            exceptions.SDKException,
185            utils.supports_microversion,
186            self.adapter,
187            '1.8',
188            True)
189
190    @mock.patch('openstack.utils.supports_microversion')
191    def test_require_microversion(self, sm_mock):
192        utils.require_microversion(self.adapter, '1.2')
193        sm_mock.assert_called_with(self.adapter,
194                                   '1.2',
195                                   raise_exception=True)
196
197
198class TestMaximumSupportedMicroversion(base.TestCase):
199    def setUp(self):
200        super(TestMaximumSupportedMicroversion, self).setUp()
201        self.adapter = mock.Mock(spec=['get_endpoint_data'])
202        self.endpoint_data = mock.Mock(spec=['min_microversion',
203                                             'max_microversion'],
204                                       min_microversion=None,
205                                       max_microversion='1.99')
206        self.adapter.get_endpoint_data.return_value = self.endpoint_data
207
208    def test_with_none(self):
209        self.assertIsNone(utils.maximum_supported_microversion(self.adapter,
210                                                               None))
211
212    def test_with_value(self):
213        self.assertEqual('1.42',
214                         utils.maximum_supported_microversion(self.adapter,
215                                                              '1.42'))
216
217    def test_value_more_than_max(self):
218        self.assertEqual('1.99',
219                         utils.maximum_supported_microversion(self.adapter,
220                                                              '1.100'))
221
222    def test_value_less_than_min(self):
223        self.endpoint_data.min_microversion = '1.42'
224        self.assertIsNone(utils.maximum_supported_microversion(self.adapter,
225                                                               '1.2'))
226
227
228class TestOsServiceTypesVersion(base.TestCase):
229    def test_ost_version(self):
230        ost_version = '2019-05-01T19:53:21.498745'
231        self.assertEqual(
232            ost_version, os_service_types.ServiceTypes().version,
233            "This project must be pinned to the latest version of "
234            "os-service-types. Please bump requirements.txt and "
235            "lower-constraints.txt accordingly.")
236
237
238class TestTinyDAG(base.TestCase):
239    test_graph = {
240        'a': ['b', 'd', 'f'],
241        'b': ['c', 'd'],
242        'c': ['d'],
243        'd': ['e'],
244        'e': [],
245        'f': ['e'],
246        'g': ['e']
247    }
248
249    def _verify_order(self, test_graph, test_list):
250        for k, v in test_graph.items():
251            for dep in v:
252                self.assertTrue(test_list.index(k) < test_list.index(dep))
253
254    def test_from_dict(self):
255        sot = utils.TinyDAG()
256        sot.from_dict(self.test_graph)
257
258    def test_topological_sort(self):
259        sot = utils.TinyDAG()
260        sot.from_dict(self.test_graph)
261        sorted_list = sot.topological_sort()
262        self._verify_order(sot.graph, sorted_list)
263        self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
264
265    def test_walk(self):
266        sot = utils.TinyDAG()
267        sot.from_dict(self.test_graph)
268        sorted_list = []
269        for node in sot.walk():
270            sorted_list.append(node)
271            sot.node_done(node)
272        self._verify_order(sot.graph, sorted_list)
273        self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
274
275    def test_walk_parallel(self):
276        sot = utils.TinyDAG()
277        sot.from_dict(self.test_graph)
278        sorted_list = []
279        with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
280            for node in sot.walk(timeout=1):
281                executor.submit(test_walker_fn, sot, node, sorted_list)
282        self._verify_order(sot.graph, sorted_list)
283        print(sorted_list)
284        self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
285
286    def test_walk_raise(self):
287        sot = utils.TinyDAG()
288        sot.from_dict(self.test_graph)
289        bad_node = 'f'
290        with testtools.ExpectedException(exceptions.SDKException):
291            for node in sot.walk(timeout=1):
292                if node != bad_node:
293                    sot.node_done(node)
294
295    def test_add_node_after_edge(self):
296        sot = utils.TinyDAG()
297        sot.add_node('a')
298        sot.add_edge('a', 'b')
299        sot.add_node('a')
300        self.assertEqual(sot._graph['a'], set('b'))
301
302
303def test_walker_fn(graph, node, lst):
304    lst.append(node)
305    graph.node_done(node)
306
307
308class Test_md5(base.TestCase):
309
310    def setUp(self):
311        super(Test_md5, self).setUp()
312        self.md5_test_data = "Openstack forever".encode('utf-8')
313        try:
314            self.md5_digest = hashlib.md5(  # nosec
315                self.md5_test_data).hexdigest()
316            self.fips_enabled = False
317        except ValueError:
318            self.md5_digest = '0d6dc3c588ae71a04ce9a6beebbbba06'
319            self.fips_enabled = True
320
321    def test_md5_with_data(self):
322        if not self.fips_enabled:
323            digest = utils.md5(self.md5_test_data).hexdigest()
324            self.assertEqual(digest, self.md5_digest)
325        else:
326            # on a FIPS enabled system, this throws a ValueError:
327            # [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS
328            self.assertRaises(ValueError, utils.md5, self.md5_test_data)
329        if not self.fips_enabled:
330            digest = utils.md5(self.md5_test_data,
331                               usedforsecurity=True).hexdigest()
332            self.assertEqual(digest, self.md5_digest)
333        else:
334            self.assertRaises(
335                ValueError, utils.md5, self.md5_test_data,
336                usedforsecurity=True)
337        digest = utils.md5(self.md5_test_data,
338                           usedforsecurity=False).hexdigest()
339        self.assertEqual(digest, self.md5_digest)
340
341    def test_md5_without_data(self):
342        if not self.fips_enabled:
343            test_md5 = utils.md5()
344            test_md5.update(self.md5_test_data)
345            digest = test_md5.hexdigest()
346            self.assertEqual(digest, self.md5_digest)
347        else:
348            self.assertRaises(ValueError, utils.md5)
349        if not self.fips_enabled:
350            test_md5 = utils.md5(usedforsecurity=True)
351            test_md5.update(self.md5_test_data)
352            digest = test_md5.hexdigest()
353            self.assertEqual(digest, self.md5_digest)
354        else:
355            self.assertRaises(ValueError, utils.md5, usedforsecurity=True)
356        test_md5 = utils.md5(usedforsecurity=False)
357        test_md5.update(self.md5_test_data)
358        digest = test_md5.hexdigest()
359        self.assertEqual(digest, self.md5_digest)
360
361    def test_string_data_raises_type_error(self):
362        if not self.fips_enabled:
363            self.assertRaises(TypeError, hashlib.md5, u'foo')
364            self.assertRaises(TypeError, utils.md5, u'foo')
365            self.assertRaises(
366                TypeError, utils.md5, u'foo', usedforsecurity=True)
367        else:
368            self.assertRaises(ValueError, hashlib.md5, u'foo')
369            self.assertRaises(ValueError, utils.md5, u'foo')
370            self.assertRaises(
371                ValueError, utils.md5, u'foo', usedforsecurity=True)
372        self.assertRaises(
373            TypeError, utils.md5, u'foo', usedforsecurity=False)
374
375    def test_none_data_raises_type_error(self):
376        if not self.fips_enabled:
377            self.assertRaises(TypeError, hashlib.md5, None)
378            self.assertRaises(TypeError, utils.md5, None)
379            self.assertRaises(
380                TypeError, utils.md5, None, usedforsecurity=True)
381        else:
382            self.assertRaises(ValueError, hashlib.md5, None)
383            self.assertRaises(ValueError, utils.md5, None)
384            self.assertRaises(
385                ValueError, utils.md5, None, usedforsecurity=True)
386        self.assertRaises(
387            TypeError, utils.md5, None, usedforsecurity=False)
388