1# -*- coding: utf-8 -*- 2 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import concurrent.futures 16import hashlib 17import logging 18import sys 19from unittest import mock 20 21import fixtures 22import os_service_types 23import testtools 24 25import openstack 26from openstack import exceptions 27from openstack.tests.unit import base 28from openstack import utils 29 30 31class Test_enable_logging(base.TestCase): 32 33 def setUp(self): 34 super(Test_enable_logging, self).setUp() 35 self.openstack_logger = mock.Mock() 36 self.openstack_logger.handlers = [] 37 self.ksa_logger_root = mock.Mock() 38 self.ksa_logger_root.handlers = [] 39 self.ksa_logger_1 = mock.Mock() 40 self.ksa_logger_1.handlers = [] 41 self.ksa_logger_2 = mock.Mock() 42 self.ksa_logger_2.handlers = [] 43 self.ksa_logger_3 = mock.Mock() 44 self.ksa_logger_3.handlers = [] 45 self.urllib3_logger = mock.Mock() 46 self.urllib3_logger.handlers = [] 47 self.stevedore_logger = mock.Mock() 48 self.stevedore_logger.handlers = [] 49 self.fake_get_logger = mock.Mock() 50 self.fake_get_logger.side_effect = [ 51 self.openstack_logger, 52 self.ksa_logger_root, 53 self.urllib3_logger, 54 self.stevedore_logger, 55 self.ksa_logger_1, 56 self.ksa_logger_2, 57 self.ksa_logger_3 58 ] 59 self.useFixture( 60 fixtures.MonkeyPatch('logging.getLogger', self.fake_get_logger)) 61 62 def _console_tests(self, level, debug, stream): 63 64 openstack.enable_logging(debug=debug, stream=stream) 65 66 self.assertEqual(self.openstack_logger.addHandler.call_count, 1) 67 self.openstack_logger.setLevel.assert_called_with(level) 68 69 def _file_tests(self, level, debug): 70 file_handler = mock.Mock() 71 self.useFixture( 72 fixtures.MonkeyPatch('logging.FileHandler', file_handler)) 73 fake_path = "fake/path.log" 74 75 openstack.enable_logging(debug=debug, path=fake_path) 76 77 file_handler.assert_called_with(fake_path) 78 self.assertEqual(self.openstack_logger.addHandler.call_count, 1) 79 self.openstack_logger.setLevel.assert_called_with(level) 80 81 def test_none(self): 82 openstack.enable_logging(debug=True) 83 self.fake_get_logger.assert_has_calls([]) 84 self.openstack_logger.setLevel.assert_called_with(logging.DEBUG) 85 self.assertEqual(self.openstack_logger.addHandler.call_count, 1) 86 self.assertIsInstance( 87 self.openstack_logger.addHandler.call_args_list[0][0][0], 88 logging.StreamHandler) 89 90 def test_debug_console_stderr(self): 91 self._console_tests(logging.DEBUG, True, sys.stderr) 92 93 def test_warning_console_stderr(self): 94 self._console_tests(logging.INFO, False, sys.stderr) 95 96 def test_debug_console_stdout(self): 97 self._console_tests(logging.DEBUG, True, sys.stdout) 98 99 def test_warning_console_stdout(self): 100 self._console_tests(logging.INFO, False, sys.stdout) 101 102 def test_debug_file(self): 103 self._file_tests(logging.DEBUG, True) 104 105 def test_warning_file(self): 106 self._file_tests(logging.INFO, False) 107 108 109class Test_urljoin(base.TestCase): 110 111 def test_strings(self): 112 root = "http://www.example.com" 113 leaves = "foo", "bar" 114 115 result = utils.urljoin(root, *leaves) 116 self.assertEqual(result, "http://www.example.com/foo/bar") 117 118 def test_with_none(self): 119 root = "http://www.example.com" 120 leaves = "foo", None 121 122 result = utils.urljoin(root, *leaves) 123 self.assertEqual(result, "http://www.example.com/foo/") 124 125 def test_unicode_strings(self): 126 root = "http://www.example.com" 127 leaves = u"ascii", u"extra_chars-™" 128 129 try: 130 result = utils.urljoin(root, *leaves) 131 except Exception: 132 self.fail("urljoin failed on unicode strings") 133 134 self.assertEqual(result, u"http://www.example.com/ascii/extra_chars-™") 135 136 137class TestSupportsMicroversion(base.TestCase): 138 def setUp(self): 139 super(TestSupportsMicroversion, self).setUp() 140 self.adapter = mock.Mock(spec=['get_endpoint_data']) 141 self.endpoint_data = mock.Mock(spec=['min_microversion', 142 'max_microversion'], 143 min_microversion='1.1', 144 max_microversion='1.99') 145 self.adapter.get_endpoint_data.return_value = self.endpoint_data 146 147 def test_requested_supported_no_default(self): 148 self.adapter.default_microversion = None 149 self.assertTrue( 150 utils.supports_microversion(self.adapter, '1.2')) 151 152 def test_requested_not_supported_no_default(self): 153 self.adapter.default_microversion = None 154 self.assertFalse( 155 utils.supports_microversion(self.adapter, '2.2')) 156 157 def test_requested_not_supported_no_default_exception(self): 158 self.adapter.default_microversion = None 159 self.assertRaises( 160 exceptions.SDKException, 161 utils.supports_microversion, 162 self.adapter, 163 '2.2', 164 True) 165 166 def test_requested_supported_higher_default(self): 167 self.adapter.default_microversion = '1.8' 168 self.assertTrue( 169 utils.supports_microversion(self.adapter, '1.6')) 170 171 def test_requested_supported_equal_default(self): 172 self.adapter.default_microversion = '1.8' 173 self.assertTrue( 174 utils.supports_microversion(self.adapter, '1.8')) 175 176 def test_requested_supported_lower_default(self): 177 self.adapter.default_microversion = '1.2' 178 self.assertFalse( 179 utils.supports_microversion(self.adapter, '1.8')) 180 181 def test_requested_supported_lower_default_exception(self): 182 self.adapter.default_microversion = '1.2' 183 self.assertRaises( 184 exceptions.SDKException, 185 utils.supports_microversion, 186 self.adapter, 187 '1.8', 188 True) 189 190 @mock.patch('openstack.utils.supports_microversion') 191 def test_require_microversion(self, sm_mock): 192 utils.require_microversion(self.adapter, '1.2') 193 sm_mock.assert_called_with(self.adapter, 194 '1.2', 195 raise_exception=True) 196 197 198class TestMaximumSupportedMicroversion(base.TestCase): 199 def setUp(self): 200 super(TestMaximumSupportedMicroversion, self).setUp() 201 self.adapter = mock.Mock(spec=['get_endpoint_data']) 202 self.endpoint_data = mock.Mock(spec=['min_microversion', 203 'max_microversion'], 204 min_microversion=None, 205 max_microversion='1.99') 206 self.adapter.get_endpoint_data.return_value = self.endpoint_data 207 208 def test_with_none(self): 209 self.assertIsNone(utils.maximum_supported_microversion(self.adapter, 210 None)) 211 212 def test_with_value(self): 213 self.assertEqual('1.42', 214 utils.maximum_supported_microversion(self.adapter, 215 '1.42')) 216 217 def test_value_more_than_max(self): 218 self.assertEqual('1.99', 219 utils.maximum_supported_microversion(self.adapter, 220 '1.100')) 221 222 def test_value_less_than_min(self): 223 self.endpoint_data.min_microversion = '1.42' 224 self.assertIsNone(utils.maximum_supported_microversion(self.adapter, 225 '1.2')) 226 227 228class TestOsServiceTypesVersion(base.TestCase): 229 def test_ost_version(self): 230 ost_version = '2019-05-01T19:53:21.498745' 231 self.assertEqual( 232 ost_version, os_service_types.ServiceTypes().version, 233 "This project must be pinned to the latest version of " 234 "os-service-types. Please bump requirements.txt and " 235 "lower-constraints.txt accordingly.") 236 237 238class TestTinyDAG(base.TestCase): 239 test_graph = { 240 'a': ['b', 'd', 'f'], 241 'b': ['c', 'd'], 242 'c': ['d'], 243 'd': ['e'], 244 'e': [], 245 'f': ['e'], 246 'g': ['e'] 247 } 248 249 def _verify_order(self, test_graph, test_list): 250 for k, v in test_graph.items(): 251 for dep in v: 252 self.assertTrue(test_list.index(k) < test_list.index(dep)) 253 254 def test_from_dict(self): 255 sot = utils.TinyDAG() 256 sot.from_dict(self.test_graph) 257 258 def test_topological_sort(self): 259 sot = utils.TinyDAG() 260 sot.from_dict(self.test_graph) 261 sorted_list = sot.topological_sort() 262 self._verify_order(sot.graph, sorted_list) 263 self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) 264 265 def test_walk(self): 266 sot = utils.TinyDAG() 267 sot.from_dict(self.test_graph) 268 sorted_list = [] 269 for node in sot.walk(): 270 sorted_list.append(node) 271 sot.node_done(node) 272 self._verify_order(sot.graph, sorted_list) 273 self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) 274 275 def test_walk_parallel(self): 276 sot = utils.TinyDAG() 277 sot.from_dict(self.test_graph) 278 sorted_list = [] 279 with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: 280 for node in sot.walk(timeout=1): 281 executor.submit(test_walker_fn, sot, node, sorted_list) 282 self._verify_order(sot.graph, sorted_list) 283 print(sorted_list) 284 self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) 285 286 def test_walk_raise(self): 287 sot = utils.TinyDAG() 288 sot.from_dict(self.test_graph) 289 bad_node = 'f' 290 with testtools.ExpectedException(exceptions.SDKException): 291 for node in sot.walk(timeout=1): 292 if node != bad_node: 293 sot.node_done(node) 294 295 def test_add_node_after_edge(self): 296 sot = utils.TinyDAG() 297 sot.add_node('a') 298 sot.add_edge('a', 'b') 299 sot.add_node('a') 300 self.assertEqual(sot._graph['a'], set('b')) 301 302 303def test_walker_fn(graph, node, lst): 304 lst.append(node) 305 graph.node_done(node) 306 307 308class Test_md5(base.TestCase): 309 310 def setUp(self): 311 super(Test_md5, self).setUp() 312 self.md5_test_data = "Openstack forever".encode('utf-8') 313 try: 314 self.md5_digest = hashlib.md5( # nosec 315 self.md5_test_data).hexdigest() 316 self.fips_enabled = False 317 except ValueError: 318 self.md5_digest = '0d6dc3c588ae71a04ce9a6beebbbba06' 319 self.fips_enabled = True 320 321 def test_md5_with_data(self): 322 if not self.fips_enabled: 323 digest = utils.md5(self.md5_test_data).hexdigest() 324 self.assertEqual(digest, self.md5_digest) 325 else: 326 # on a FIPS enabled system, this throws a ValueError: 327 # [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS 328 self.assertRaises(ValueError, utils.md5, self.md5_test_data) 329 if not self.fips_enabled: 330 digest = utils.md5(self.md5_test_data, 331 usedforsecurity=True).hexdigest() 332 self.assertEqual(digest, self.md5_digest) 333 else: 334 self.assertRaises( 335 ValueError, utils.md5, self.md5_test_data, 336 usedforsecurity=True) 337 digest = utils.md5(self.md5_test_data, 338 usedforsecurity=False).hexdigest() 339 self.assertEqual(digest, self.md5_digest) 340 341 def test_md5_without_data(self): 342 if not self.fips_enabled: 343 test_md5 = utils.md5() 344 test_md5.update(self.md5_test_data) 345 digest = test_md5.hexdigest() 346 self.assertEqual(digest, self.md5_digest) 347 else: 348 self.assertRaises(ValueError, utils.md5) 349 if not self.fips_enabled: 350 test_md5 = utils.md5(usedforsecurity=True) 351 test_md5.update(self.md5_test_data) 352 digest = test_md5.hexdigest() 353 self.assertEqual(digest, self.md5_digest) 354 else: 355 self.assertRaises(ValueError, utils.md5, usedforsecurity=True) 356 test_md5 = utils.md5(usedforsecurity=False) 357 test_md5.update(self.md5_test_data) 358 digest = test_md5.hexdigest() 359 self.assertEqual(digest, self.md5_digest) 360 361 def test_string_data_raises_type_error(self): 362 if not self.fips_enabled: 363 self.assertRaises(TypeError, hashlib.md5, u'foo') 364 self.assertRaises(TypeError, utils.md5, u'foo') 365 self.assertRaises( 366 TypeError, utils.md5, u'foo', usedforsecurity=True) 367 else: 368 self.assertRaises(ValueError, hashlib.md5, u'foo') 369 self.assertRaises(ValueError, utils.md5, u'foo') 370 self.assertRaises( 371 ValueError, utils.md5, u'foo', usedforsecurity=True) 372 self.assertRaises( 373 TypeError, utils.md5, u'foo', usedforsecurity=False) 374 375 def test_none_data_raises_type_error(self): 376 if not self.fips_enabled: 377 self.assertRaises(TypeError, hashlib.md5, None) 378 self.assertRaises(TypeError, utils.md5, None) 379 self.assertRaises( 380 TypeError, utils.md5, None, usedforsecurity=True) 381 else: 382 self.assertRaises(ValueError, hashlib.md5, None) 383 self.assertRaises(ValueError, utils.md5, None) 384 self.assertRaises( 385 ValueError, utils.md5, None, usedforsecurity=True) 386 self.assertRaises( 387 TypeError, utils.md5, None, usedforsecurity=False) 388