1# Licensed to the Apache Software Foundation (ASF) under one or more 2# contributor license agreements. See the NOTICE file distributed with 3# this work for additional information regarding copyright ownership. 4# The ASF licenses this file to You under the Apache License, Version 2.0 5# (the "License"); you may not use this file except in compliance with 6# the License. You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import sys 17import datetime 18 19try: 20 import simplejson as json 21except ImportError: 22 import json 23 24from mock import Mock 25 26from libcloud.utils.py3 import httplib 27from libcloud.utils.py3 import assertRaisesRegex 28from libcloud.common.openstack import OpenStackBaseConnection 29from libcloud.common.openstack_identity import AUTH_TOKEN_EXPIRES_GRACE_SECONDS 30from libcloud.common.openstack_identity import get_class_for_auth_version 31from libcloud.common.openstack_identity import OpenStackServiceCatalog 32from libcloud.common.openstack_identity import OpenStackIdentity_2_0_Connection 33from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection 34from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection_AppCred 35from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection_OIDC_access_token 36from libcloud.common.openstack_identity import OpenStackIdentityUser 37from libcloud.compute.drivers.openstack import OpenStack_1_0_NodeDriver 38from libcloud.common.openstack_identity import OpenStackIdentity_2_0_Connection_VOMS 39 40from libcloud.test import unittest 41from libcloud.test import MockHttp 42from libcloud.test.secrets import OPENSTACK_PARAMS 43from libcloud.test.file_fixtures import ComputeFileFixtures 44from libcloud.test.compute.test_openstack import OpenStackMockAuthCache 45from libcloud.test.compute.test_openstack import OpenStackMockHttp 46from libcloud.test.compute.test_openstack import OpenStack_2_0_MockHttp 47 48TOMORROW = datetime.datetime.today() + datetime.timedelta(1) 49YESTERDAY = datetime.datetime.today() - datetime.timedelta(1) 50 51 52class OpenStackIdentityConnectionTestCase(unittest.TestCase): 53 def setUp(self): 54 OpenStackBaseConnection.auth_url = None 55 OpenStackBaseConnection.conn_class = OpenStackMockHttp 56 OpenStack_2_0_MockHttp.type = None 57 OpenStackIdentity_3_0_MockHttp.type = None 58 59 def test_auth_url_is_correctly_assembled(self): 60 tuples = [ 61 ('1.0', OpenStackMockHttp, {}), 62 ('1.1', OpenStackMockHttp, {}), 63 ('2.0', OpenStack_2_0_MockHttp, {}), 64 ('2.0_apikey', OpenStack_2_0_MockHttp, {}), 65 ('2.0_password', OpenStack_2_0_MockHttp, {}), 66 ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'tenant_name': 'tenant-name'}), 67 ('3.x_appcred', OpenStackIdentity_3_0_MockHttp, {}), 68 ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'tenant_name': 'tenant-name'}) 69 ] 70 71 APPEND = 0 72 NOTAPPEND = 1 73 74 auth_urls = [ 75 ('https://auth.api.example.com', APPEND, ''), 76 ('https://auth.api.example.com/', NOTAPPEND, '/'), 77 ('https://auth.api.example.com/foo/bar', NOTAPPEND, '/foo/bar'), 78 ('https://auth.api.example.com/foo/bar/', NOTAPPEND, '/foo/bar/') 79 ] 80 81 actions = { 82 '1.0': '/v1.0', 83 '1.1': '/v1.1/auth', 84 '2.0': '/v2.0/tokens', 85 '2.0_apikey': '/v2.0/tokens', 86 '2.0_password': '/v2.0/tokens', 87 '3.x_password': '/v3/auth/tokens', 88 '3.x_appcred': '/v3/auth/tokens', 89 '3.x_oidc_access_token': '/v3/OS-FEDERATION/identity_providers/user_name/protocols/tenant-name/auth', 90 } 91 92 user_id = OPENSTACK_PARAMS[0] 93 key = OPENSTACK_PARAMS[1] 94 95 for (auth_version, mock_http_class, kwargs) in tuples: 96 for (url, should_append_default_path, expected_path) in auth_urls: 97 connection = \ 98 self._get_mock_connection(mock_http_class=mock_http_class, 99 auth_url=url) 100 101 auth_url = connection.auth_url 102 cls = get_class_for_auth_version(auth_version=auth_version) 103 osa = cls(auth_url=auth_url, 104 user_id=user_id, 105 key=key, 106 parent_conn=connection, 107 **kwargs) 108 109 try: 110 osa = osa.authenticate() 111 except Exception: 112 pass 113 114 if (should_append_default_path == APPEND): 115 expected_path = actions[auth_version] 116 117 self.assertEqual(osa.action, expected_path) 118 119 def test_basic_authentication(self): 120 tuples = [ 121 ('1.0', OpenStackMockHttp, {}), 122 ('1.1', OpenStackMockHttp, {}), 123 ('2.0', OpenStack_2_0_MockHttp, {}), 124 ('2.0_apikey', OpenStack_2_0_MockHttp, {}), 125 ('2.0_password', OpenStack_2_0_MockHttp, {}), 126 ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key', 127 'token_scope': 'project', 'tenant_name': 'test_tenant', 128 'tenant_domain_id': 'test_tenant_domain_id', 129 'domain_name': 'test_domain'}), 130 ('3.x_appcred', OpenStackIdentity_3_0_MockHttp, {'user_id': 'appcred_id', 'key': 'appcred_secret'}), 131 ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key', 132 'token_scope': 'domain', 'tenant_name': 'test_tenant', 133 'tenant_domain_id': 'test_tenant_domain_id', 134 'domain_name': 'test_domain'}) 135 ] 136 137 user_id = OPENSTACK_PARAMS[0] 138 key = OPENSTACK_PARAMS[1] 139 140 for (auth_version, mock_http_class, kwargs) in tuples: 141 connection = \ 142 self._get_mock_connection(mock_http_class=mock_http_class) 143 auth_url = connection.auth_url 144 145 if not kwargs: 146 kwargs['user_id'] = user_id 147 kwargs['key'] = key 148 149 cls = get_class_for_auth_version(auth_version=auth_version) 150 osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs) 151 152 self.assertEqual(osa.urls, {}) 153 self.assertIsNone(osa.auth_token) 154 self.assertIsNone(osa.auth_user_info) 155 osa = osa.authenticate() 156 157 self.assertTrue(len(osa.urls) >= 1) 158 self.assertTrue(osa.auth_token is not None) 159 160 if auth_version in ['1.1', '2.0', '2.0_apikey', '2.0_password', 161 '3.x_password', '3.x_appcred', '3.x_oidc_access_token']: 162 self.assertTrue(osa.auth_token_expires is not None) 163 164 if auth_version in ['2.0', '2.0_apikey', '2.0_password', 165 '3.x_password', '3.x_appcred', '3.x_oidc_access_token']: 166 self.assertTrue(osa.auth_user_info is not None) 167 168 def test_token_expiration_and_force_reauthentication(self): 169 user_id = OPENSTACK_PARAMS[0] 170 key = OPENSTACK_PARAMS[1] 171 172 connection = self._get_mock_connection(OpenStack_2_0_MockHttp) 173 auth_url = connection.auth_url 174 175 osa = OpenStackIdentity_2_0_Connection(auth_url=auth_url, 176 user_id=user_id, 177 key=key, 178 parent_conn=connection) 179 180 mocked_auth_method = Mock(wraps=osa._authenticate_2_0_with_body) 181 osa._authenticate_2_0_with_body = mocked_auth_method 182 183 # Force re-auth, expired token 184 osa.auth_token = None 185 osa.auth_token_expires = YESTERDAY 186 count = 5 187 188 for i in range(0, count): 189 osa.authenticate(force=True) 190 191 self.assertEqual(mocked_auth_method.call_count, count) 192 193 # No force reauth, expired token 194 osa.auth_token = None 195 osa.auth_token_expires = YESTERDAY 196 197 mocked_auth_method.call_count = 0 198 self.assertEqual(mocked_auth_method.call_count, 0) 199 200 for i in range(0, count): 201 osa.authenticate(force=False) 202 203 self.assertEqual(mocked_auth_method.call_count, 1) 204 205 # No force reauth, valid / non-expired token 206 osa.auth_token = None 207 208 mocked_auth_method.call_count = 0 209 self.assertEqual(mocked_auth_method.call_count, 0) 210 211 for i in range(0, count): 212 osa.authenticate(force=False) 213 214 if i == 0: 215 osa.auth_token_expires = TOMORROW 216 217 self.assertEqual(mocked_auth_method.call_count, 1) 218 219 # No force reauth, valid / non-expired token which is about to expire in 220 # less than AUTH_TOKEN_EXPIRES_GRACE_SECONDS 221 soon = datetime.datetime.utcnow() + \ 222 datetime.timedelta(seconds=AUTH_TOKEN_EXPIRES_GRACE_SECONDS - 1) 223 osa.auth_token = None 224 225 mocked_auth_method.call_count = 0 226 self.assertEqual(mocked_auth_method.call_count, 0) 227 228 for i in range(0, count): 229 if i == 0: 230 osa.auth_token_expires = soon 231 232 osa.authenticate(force=False) 233 234 self.assertEqual(mocked_auth_method.call_count, 1) 235 236 def test_authentication_cache(self): 237 tuples = [ 238 # 1.0 does not provide token expiration, so it always 239 # re-authenticates and never uses the cache. 240 # ('1.0', OpenStackMockHttp, {}), 241 ('1.1', OpenStackMockHttp, {}), 242 ('2.0', OpenStack_2_0_MockHttp, {}), 243 ('2.0_apikey', OpenStack_2_0_MockHttp, {}), 244 ('2.0_password', OpenStack_2_0_MockHttp, {}), 245 ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key', 246 'token_scope': 'project', 'tenant_name': 'test_tenant', 247 'tenant_domain_id': 'test_tenant_domain_id', 248 'domain_name': 'test_domain'}), 249 ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key', 250 'token_scope': 'domain', 'tenant_name': 'test_tenant', 251 'tenant_domain_id': 'test_tenant_domain_id', 252 'domain_name': 'test_domain'}) 253 ] 254 255 user_id = OPENSTACK_PARAMS[0] 256 key = OPENSTACK_PARAMS[1] 257 258 for (auth_version, mock_http_class, kwargs) in tuples: 259 mock_http_class.type = None 260 connection = \ 261 self._get_mock_connection(mock_http_class=mock_http_class) 262 auth_url = connection.auth_url 263 264 if not kwargs: 265 kwargs['user_id'] = user_id 266 kwargs['key'] = key 267 268 auth_cache = OpenStackMockAuthCache() 269 self.assertEqual(len(auth_cache), 0) 270 kwargs['auth_cache'] = auth_cache 271 272 cls = get_class_for_auth_version(auth_version=auth_version) 273 osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs) 274 osa = osa.authenticate() 275 276 # Token is cached 277 self.assertEqual(len(auth_cache), 1) 278 279 # New client, token from cache is re-used 280 osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs) 281 osa.request = Mock(wraps=osa.request) 282 osa = osa.authenticate() 283 284 # No auth API call 285 if auth_version in ('1.1', '2.0', '2.0_apikey', '2.0_password'): 286 self.assertEqual(osa.request.call_count, 0) 287 elif auth_version in ('3.x_password', '3.x_oidc_access_token'): 288 # v3 only caches token and expiration; service catalog URLs 289 # and the rest of the auth context are fetched from Keystone 290 osa.request.assert_called_once_with( 291 action='/v3/auth/tokens', params=None, data=None, 292 headers={'X-Subject-Token': '00000000000000000000000000000000', 293 'X-Auth-Token': '00000000000000000000000000000000'}, 294 method='GET', raw=False) 295 296 # Cache size unchanged 297 self.assertEqual(len(auth_cache), 1) 298 299 # Authenticates if cached token expired 300 cache_key = list(auth_cache.store.keys())[0] 301 auth_context = auth_cache.get(cache_key) 302 auth_context.expiration = YESTERDAY 303 auth_cache.put(cache_key, auth_context) 304 305 osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs) 306 osa.request = Mock(wraps=osa.request) 307 osa._get_unscoped_token_from_oidc_token = Mock(return_value='000') 308 OpenStackIdentity_3_0_MockHttp.type = 'GET_UNAUTHORIZED_POST_OK' 309 osa = osa.authenticate() 310 311 if auth_version in ('1.1', '2.0', '2.0_apikey', '2.0_password'): 312 self.assertEqual(osa.request.call_count, 1) 313 self.assertTrue(osa.request.call_args[1]['method'], 'POST') 314 elif auth_version in ('3.x_password', '3.x_oidc_access_token'): 315 self.assertTrue(osa.request.call_args[0][0], '/v3/auth/tokens') 316 self.assertTrue(osa.request.call_args[1]['method'], 'POST') 317 318 # Token evicted from cache if 401 received on another call 319 if hasattr(osa, 'list_projects'): 320 mock_http_class.type = None 321 auth_cache.reset() 322 323 osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs) 324 osa.request = Mock(wraps=osa.request) 325 osa = osa.authenticate() 326 self.assertEqual(len(auth_cache), 1) 327 mock_http_class.type = 'UNAUTHORIZED' 328 try: 329 osa.list_projects() 330 except: # These methods don't handle 401s 331 pass 332 self.assertEqual(len(auth_cache), 0) 333 334 def _get_mock_connection(self, mock_http_class, auth_url=None): 335 OpenStackBaseConnection.conn_class = mock_http_class 336 337 if auth_url is None: 338 auth_url = "https://auth.api.example.com" 339 340 OpenStackBaseConnection.auth_url = auth_url 341 connection = OpenStackBaseConnection(*OPENSTACK_PARAMS) 342 343 connection._ex_force_base_url = "https://www.foo.com" 344 connection.driver = OpenStack_1_0_NodeDriver(*OPENSTACK_PARAMS) 345 346 return connection 347 348 349class OpenStackIdentity_2_0_ConnectionTests(unittest.TestCase): 350 def setUp(self): 351 mock_cls = OpenStackIdentity_2_0_MockHttp 352 mock_cls.type = None 353 OpenStackIdentity_2_0_Connection.conn_class = mock_cls 354 355 self.auth_instance = OpenStackIdentity_2_0_Connection(auth_url='http://none', 356 user_id='test', 357 key='test', 358 tenant_name='test', 359 proxy_url='http://proxy:8080', 360 timeout=10) 361 self.auth_instance.auth_token = 'mock' 362 self.assertEqual(self.auth_instance.proxy_url, 'http://proxy:8080') 363 364 def test_list_projects(self): 365 result = self.auth_instance.list_projects() 366 self.assertEqual(len(result), 2) 367 self.assertEqual(result[0].id, 'a') 368 self.assertEqual(result[0].name, 'test') 369 self.assertEqual(result[0].description, 'test project') 370 self.assertTrue(result[0].enabled) 371 372 373class OpenStackIdentity_3_0_ConnectionTests(unittest.TestCase): 374 def setUp(self): 375 mock_cls = OpenStackIdentity_3_0_MockHttp 376 mock_cls.type = None 377 OpenStackIdentity_3_0_Connection.conn_class = mock_cls 378 379 self.auth_instance = OpenStackIdentity_3_0_Connection(auth_url='http://none', 380 user_id='test', 381 key='test', 382 tenant_name='test', 383 proxy_url='http://proxy:8080', 384 timeout=10) 385 self.auth_instance.auth_token = 'mock' 386 self.assertEqual(self.auth_instance.proxy_url, 'http://proxy:8080') 387 388 def test_token_scope_argument(self): 389 # Invalid token_scope value 390 expected_msg = 'Invalid value for "token_scope" argument: foo' 391 assertRaisesRegex(self, ValueError, expected_msg, 392 OpenStackIdentity_3_0_Connection, 393 auth_url='http://none', 394 user_id='test', 395 key='test', 396 token_scope='foo') 397 398 # Missing tenant_name 399 expected_msg = 'Must provide tenant_name and domain_name argument' 400 assertRaisesRegex(self, ValueError, expected_msg, 401 OpenStackIdentity_3_0_Connection, 402 auth_url='http://none', 403 user_id='test', 404 key='test', 405 token_scope='project') 406 407 # Missing domain_name 408 expected_msg = 'Must provide domain_name argument' 409 assertRaisesRegex(self, ValueError, expected_msg, 410 OpenStackIdentity_3_0_Connection, 411 auth_url='http://none', 412 user_id='test', 413 key='test', 414 token_scope='domain', 415 domain_name=None) 416 417 # Scope to project all ok 418 OpenStackIdentity_3_0_Connection(auth_url='http://none', 419 user_id='test', 420 key='test', 421 token_scope='project', 422 tenant_name='test', 423 domain_name='Default') 424 # Scope to domain 425 OpenStackIdentity_3_0_Connection(auth_url='http://none', 426 user_id='test', 427 key='test', 428 token_scope='domain', 429 tenant_name=None, 430 domain_name='Default') 431 432 def test_authenticate(self): 433 auth = OpenStackIdentity_3_0_Connection(auth_url='http://none', 434 user_id='test_user_id', 435 key='test_key', 436 token_scope='project', 437 tenant_name="test_tenant", 438 tenant_domain_id="test_tenant_domain_id", 439 domain_name='test_domain', 440 proxy_url='http://proxy:8080', 441 timeout=10) 442 auth.authenticate() 443 self.assertEqual(auth.proxy_url, 'http://proxy:8080') 444 445 def test_list_supported_versions(self): 446 OpenStackIdentity_3_0_MockHttp.type = 'v3' 447 448 versions = self.auth_instance.list_supported_versions() 449 self.assertEqual(len(versions), 2) 450 self.assertEqual(versions[0].version, 'v2.0') 451 self.assertEqual(versions[0].url, 452 'http://192.168.18.100:5000/v2.0/') 453 self.assertEqual(versions[1].version, 'v3.0') 454 self.assertEqual(versions[1].url, 455 'http://192.168.18.100:5000/v3/') 456 457 def test_list_domains(self): 458 domains = self.auth_instance.list_domains() 459 self.assertEqual(len(domains), 1) 460 self.assertEqual(domains[0].id, 'default') 461 self.assertEqual(domains[0].name, 'Default') 462 self.assertTrue(domains[0].enabled) 463 464 def test_list_projects(self): 465 projects = self.auth_instance.list_projects() 466 self.assertEqual(len(projects), 4) 467 self.assertEqual(projects[0].id, 'a') 468 self.assertEqual(projects[0].domain_id, 'default') 469 self.assertTrue(projects[0].enabled) 470 self.assertEqual(projects[0].description, 'Test project') 471 472 def test_list_users(self): 473 users = self.auth_instance.list_users() 474 self.assertEqual(len(users), 12) 475 self.assertEqual(users[0].id, 'a') 476 self.assertEqual(users[0].domain_id, 'default') 477 self.assertEqual(users[0].enabled, True) 478 self.assertEqual(users[0].email, 'openstack-test@localhost') 479 480 def test_list_roles(self): 481 roles = self.auth_instance.list_roles() 482 self.assertEqual(len(roles), 2) 483 self.assertEqual(roles[1].id, 'b') 484 self.assertEqual(roles[1].name, 'admin') 485 486 def test_list_user_projects(self): 487 user = self.auth_instance.list_users()[0] 488 projects = self.auth_instance.list_user_projects(user=user) 489 self.assertEqual(len(projects), 0) 490 491 def test_list_user_domain_roles(self): 492 user = self.auth_instance.list_users()[0] 493 domain = self.auth_instance.list_domains()[0] 494 roles = self.auth_instance.list_user_domain_roles(domain=domain, 495 user=user) 496 self.assertEqual(len(roles), 1) 497 self.assertEqual(roles[0].name, 'admin') 498 499 def test_get_domain(self): 500 domain = self.auth_instance.get_domain(domain_id='default') 501 self.assertEqual(domain.name, 'Default') 502 503 def test_get_user(self): 504 user = self.auth_instance.get_user(user_id='a') 505 self.assertEqual(user.id, 'a') 506 self.assertEqual(user.domain_id, 'default') 507 self.assertEqual(user.enabled, True) 508 self.assertEqual(user.email, 'openstack-test@localhost') 509 510 def test_get_user_without_email(self): 511 user = self.auth_instance.get_user(user_id='b') 512 self.assertEqual(user.id, 'b') 513 self.assertEqual(user.name, 'userwithoutemail') 514 self.assertIsNone(user.email) 515 516 def test_get_user_without_enabled(self): 517 user = self.auth_instance.get_user(user_id='c') 518 self.assertEqual(user.id, 'c') 519 self.assertEqual(user.name, 'userwithoutenabled') 520 self.assertIsNone(user.enabled) 521 522 def test_create_user(self): 523 user = self.auth_instance.create_user(email='test2@localhost', password='test1', 524 name='test2', domain_id='default') 525 526 self.assertEqual(user.id, 'c') 527 self.assertEqual(user.name, 'test2') 528 529 def test_enable_user(self): 530 user = self.auth_instance.list_users()[0] 531 result = self.auth_instance.enable_user(user=user) 532 self.assertTrue(isinstance(result, OpenStackIdentityUser)) 533 534 def test_disable_user(self): 535 user = self.auth_instance.list_users()[0] 536 result = self.auth_instance.disable_user(user=user) 537 self.assertTrue(isinstance(result, OpenStackIdentityUser)) 538 539 def test_grant_domain_role_to_user(self): 540 domain = self.auth_instance.list_domains()[0] 541 role = self.auth_instance.list_roles()[0] 542 user = self.auth_instance.list_users()[0] 543 544 result = self.auth_instance.grant_domain_role_to_user(domain=domain, 545 role=role, 546 user=user) 547 self.assertTrue(result) 548 549 def test_revoke_domain_role_from_user(self): 550 domain = self.auth_instance.list_domains()[0] 551 role = self.auth_instance.list_roles()[0] 552 user = self.auth_instance.list_users()[0] 553 554 result = self.auth_instance.revoke_domain_role_from_user(domain=domain, 555 role=role, 556 user=user) 557 self.assertTrue(result) 558 559 def test_grant_project_role_to_user(self): 560 project = self.auth_instance.list_projects()[0] 561 role = self.auth_instance.list_roles()[0] 562 user = self.auth_instance.list_users()[0] 563 564 result = self.auth_instance.grant_project_role_to_user(project=project, 565 role=role, 566 user=user) 567 self.assertTrue(result) 568 569 def test_revoke_project_role_from_user(self): 570 project = self.auth_instance.list_projects()[0] 571 role = self.auth_instance.list_roles()[0] 572 user = self.auth_instance.list_users()[0] 573 574 result = self.auth_instance.revoke_project_role_from_user(project=project, 575 role=role, 576 user=user) 577 self.assertTrue(result) 578 579 580class OpenStackIdentity_3_0_Connection_AppCredTests( 581 unittest.TestCase): 582 def setUp(self): 583 mock_cls = OpenStackIdentity_3_0_AppCred_MockHttp 584 mock_cls.type = None 585 OpenStackIdentity_3_0_Connection_AppCred.conn_class = mock_cls 586 587 self.auth_instance = OpenStackIdentity_3_0_Connection_AppCred(auth_url='http://none', 588 user_id='appcred_id', 589 key='appcred_secret', 590 proxy_url='http://proxy:8080', 591 timeout=10) 592 self.auth_instance.auth_token = 'mock' 593 594 def test_authenticate(self): 595 auth = OpenStackIdentity_3_0_Connection_AppCred(auth_url='http://none', 596 user_id='appcred_id', 597 key='appcred_secret', 598 proxy_url='http://proxy:8080', 599 timeout=10) 600 auth.authenticate() 601 602 603class OpenStackIdentity_3_0_Connection_OIDC_access_token_federation_projectsTests( 604 unittest.TestCase): 605 def setUp(self): 606 mock_cls = OpenStackIdentity_3_0_federation_projects_MockHttp 607 mock_cls.type = None 608 OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls 609 610 self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 611 user_id='idp', 612 key='token', 613 tenant_name='oidc', 614 proxy_url='http://proxy:8080', 615 timeout=10) 616 self.auth_instance.auth_token = 'mock' 617 618 def test_authenticate(self): 619 auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 620 user_id='idp', 621 key='token', 622 token_scope='project', 623 tenant_name="oidc", 624 proxy_url='http://proxy:8080', 625 timeout=10) 626 auth.authenticate() 627 628 629class OpenStackIdentity_3_0_Connection_OIDC_access_tokenTests( 630 unittest.TestCase): 631 def setUp(self): 632 mock_cls = OpenStackIdentity_3_0_MockHttp 633 mock_cls.type = None 634 OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls 635 636 self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 637 user_id='idp', 638 key='token', 639 tenant_name='oidc', 640 domain_name='project_name2') 641 self.auth_instance.auth_token = 'mock' 642 643 def test_authenticate(self): 644 auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 645 user_id='idp', 646 key='token', 647 token_scope='project', 648 tenant_name="oidc", 649 domain_name='project_name2') 650 auth.authenticate() 651 652 653class OpenStackIdentity_3_0_Connection_OIDC_access_token_project_idTests( 654 unittest.TestCase): 655 def setUp(self): 656 mock_cls = OpenStackIdentity_3_0_MockHttp 657 mock_cls.type = None 658 OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls 659 660 self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 661 user_id='idp', 662 key='token', 663 tenant_name='oidc', 664 domain_name='project_id2') 665 self.auth_instance.auth_token = 'mock' 666 667 def test_authenticate_valid_project_id(self): 668 auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 669 user_id='idp', 670 key='token', 671 token_scope='project', 672 tenant_name="oidc", 673 domain_name='project_id2') 674 auth.authenticate() 675 676 def test_authenticate_invalid_project_id(self): 677 auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none', 678 user_id='idp', 679 key='token', 680 token_scope='project', 681 tenant_name="oidc", 682 domain_name='project_id100') 683 684 expected_msg = 'Project project_id100 not found' 685 self.assertRaisesRegex(ValueError, expected_msg, auth.authenticate) 686 687 688class OpenStackIdentity_2_0_Connection_VOMSTests(unittest.TestCase): 689 def setUp(self): 690 mock_cls = OpenStackIdentity_2_0_Connection_VOMSMockHttp 691 mock_cls.type = None 692 OpenStackIdentity_2_0_Connection_VOMS.conn_class = mock_cls 693 694 self.auth_instance = OpenStackIdentity_2_0_Connection_VOMS(auth_url='http://none', 695 user_id=None, 696 key='/tmp/proxy.pem', 697 tenant_name='VO') 698 self.auth_instance.auth_token = 'mock' 699 700 def test_authenticate(self): 701 auth = OpenStackIdentity_2_0_Connection_VOMS(auth_url='http://none', 702 user_id=None, 703 key='/tmp/proxy.pem', 704 token_scope='test', 705 tenant_name="VO") 706 auth.authenticate() 707 708 709class OpenStackServiceCatalogTestCase(unittest.TestCase): 710 fixtures = ComputeFileFixtures('openstack') 711 712 def test_parsing_auth_v1_1(self): 713 data = self.fixtures.load('_v1_1__auth.json') 714 data = json.loads(data) 715 service_catalog = data['auth']['serviceCatalog'] 716 717 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 718 auth_version='1.0') 719 entries = catalog.get_entries() 720 self.assertEqual(len(entries), 3) 721 722 entry = [e for e in entries if e.service_type == 'cloudFilesCDN'][0] 723 self.assertEqual(entry.service_type, 'cloudFilesCDN') 724 self.assertIsNone(entry.service_name) 725 self.assertEqual(len(entry.endpoints), 2) 726 self.assertEqual(entry.endpoints[0].region, 'ORD') 727 self.assertEqual(entry.endpoints[0].url, 728 'https://cdn2.clouddrive.com/v1/MossoCloudFS') 729 self.assertEqual(entry.endpoints[0].endpoint_type, 'external') 730 self.assertEqual(entry.endpoints[1].region, 'LON') 731 self.assertEqual(entry.endpoints[1].endpoint_type, 'external') 732 733 def test_parsing_auth_v2(self): 734 data = self.fixtures.load('_v2_0__auth.json') 735 data = json.loads(data) 736 service_catalog = data['access']['serviceCatalog'] 737 738 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 739 auth_version='2.0') 740 entries = catalog.get_entries() 741 self.assertEqual(len(entries), 10) 742 743 entry = [e for e in entries if e.service_name == 'cloudServers'][0] 744 self.assertEqual(entry.service_type, 'compute') 745 self.assertEqual(entry.service_name, 'cloudServers') 746 self.assertEqual(len(entry.endpoints), 1) 747 self.assertIsNone(entry.endpoints[0].region) 748 self.assertEqual(entry.endpoints[0].url, 749 'https://servers.api.rackspacecloud.com/v1.0/1337') 750 self.assertEqual(entry.endpoints[0].endpoint_type, 'external') 751 752 def test_parsing_auth_v3(self): 753 data = self.fixtures.load('_v3__auth.json') 754 data = json.loads(data) 755 service_catalog = data['token']['catalog'] 756 757 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 758 auth_version='3.x') 759 entries = catalog.get_entries() 760 self.assertEqual(len(entries), 6) 761 entry = [e for e in entries if e.service_type == 'volume'][0] 762 self.assertEqual(entry.service_type, 'volume') 763 self.assertIsNone(entry.service_name) 764 self.assertEqual(len(entry.endpoints), 3) 765 self.assertEqual(entry.endpoints[0].region, 'regionOne') 766 self.assertEqual(entry.endpoints[0].endpoint_type, 'external') 767 self.assertEqual(entry.endpoints[1].region, 'regionOne') 768 self.assertEqual(entry.endpoints[1].endpoint_type, 'admin') 769 self.assertEqual(entry.endpoints[2].region, 'regionOne') 770 self.assertEqual(entry.endpoints[2].endpoint_type, 'internal') 771 772 def test_get_public_urls(self): 773 data = self.fixtures.load('_v2_0__auth.json') 774 data = json.loads(data) 775 service_catalog = data['access']['serviceCatalog'] 776 777 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 778 auth_version='2.0') 779 780 public_urls = catalog.get_public_urls(service_type='object-store') 781 expected_urls = ['https://storage101.lon1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111', 782 'https://storage101.ord1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111'] 783 self.assertEqual(public_urls, expected_urls) 784 785 def test_get_regions(self): 786 data = self.fixtures.load('_v2_0__auth.json') 787 data = json.loads(data) 788 service_catalog = data['access']['serviceCatalog'] 789 790 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 791 auth_version='2.0') 792 793 regions = catalog.get_regions(service_type='object-store') 794 self.assertEqual(regions, ['LON', 'ORD']) 795 796 regions = catalog.get_regions(service_type='invalid') 797 self.assertEqual(regions, []) 798 799 def test_get_service_types(self): 800 data = self.fixtures.load('_v2_0__auth.json') 801 data = json.loads(data) 802 service_catalog = data['access']['serviceCatalog'] 803 804 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 805 auth_version='2.0') 806 service_types = catalog.get_service_types() 807 self.assertEqual(service_types, ['compute', 'image', 'network', 808 'object-store', 'rax:object-cdn', 809 'volumev2', 'volumev3']) 810 811 service_types = catalog.get_service_types(region='ORD') 812 self.assertEqual(service_types, ['rax:object-cdn']) 813 814 def test_get_service_names(self): 815 data = self.fixtures.load('_v2_0__auth.json') 816 data = json.loads(data) 817 service_catalog = data['access']['serviceCatalog'] 818 819 catalog = OpenStackServiceCatalog(service_catalog=service_catalog, 820 auth_version='2.0') 821 822 service_names = catalog.get_service_names() 823 self.assertEqual(service_names, ['cinderv2', 'cinderv3', 'cloudFiles', 824 'cloudFilesCDN', 'cloudServers', 825 'cloudServersOpenStack', 826 'cloudServersPreprod', 827 'glance', 828 'neutron', 829 'nova']) 830 831 service_names = catalog.get_service_names(service_type='compute') 832 self.assertEqual(service_names, ['cloudServers', 833 'cloudServersOpenStack', 834 'cloudServersPreprod', 835 'nova']) 836 837 838class OpenStackIdentity_2_0_MockHttp(MockHttp): 839 fixtures = ComputeFileFixtures('openstack_identity/v2') 840 json_content_headers = {'content-type': 'application/json; charset=UTF-8'} 841 842 def _v2_0_tenants(self, method, url, body, headers): 843 if method == 'GET': 844 body = self.fixtures.load('v2_0_tenants.json') 845 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 846 raise NotImplementedError() 847 848 849class OpenStackIdentity_3_0_MockHttp(MockHttp): 850 fixtures = ComputeFileFixtures('openstack_identity/v3') 851 json_content_headers = {'content-type': 'application/json; charset=UTF-8'} 852 853 def _v3(self, method, url, body, headers): 854 if method == 'GET': 855 body = self.fixtures.load('v3_versions.json') 856 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 857 raise NotImplementedError() 858 859 def _v3_domains(self, method, url, body, headers): 860 if method == 'GET': 861 body = self.fixtures.load('v3_domains.json') 862 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 863 raise NotImplementedError() 864 865 def _v3_projects(self, method, url, body, headers): 866 if method == 'GET': 867 body = self.fixtures.load('v3_projects.json') 868 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 869 raise NotImplementedError() 870 871 def _v3_projects_UNAUTHORIZED(self, method, url, body, headers): 872 if method == 'GET': 873 body = ComputeFileFixtures('openstack').load('_v3__auth.json') 874 return (httplib.UNAUTHORIZED, body, self.json_content_headers, 875 httplib.responses[httplib.UNAUTHORIZED]) 876 raise NotImplementedError() 877 878 def _v3_OS_FEDERATION_identity_providers_test_user_id_protocols_test_tenant_auth(self, method, url, body, headers): 879 if method == 'GET': 880 if 'Authorization' not in headers: 881 return (httplib.UNAUTHORIZED, '', headers, httplib.responses[httplib.OK]) 882 883 if headers['Authorization'] == 'Bearer test_key': 884 response_body = ComputeFileFixtures('openstack').load('_v3__auth.json') 885 response_headers = { 886 'Content-Type': 'application/json', 887 'x-subject-token': 'foo-bar' 888 } 889 return (httplib.OK, response_body, response_headers, httplib.responses[httplib.OK]) 890 891 return (httplib.UNAUTHORIZED, '{}', headers, httplib.responses[httplib.OK]) 892 raise NotImplementedError() 893 894 def _v3_auth_tokens(self, method, url, body, headers): 895 if method == 'GET': 896 body = json.loads( 897 ComputeFileFixtures('openstack').load('_v3__auth.json')) 898 body['token']['expires_at'] = TOMORROW.isoformat() 899 headers = self.json_content_headers.copy() 900 headers['x-subject-token'] = '00000000000000000000000000000000' 901 return (httplib.OK, json.dumps(body), headers, 902 httplib.responses[httplib.OK]) 903 if method == 'POST': 904 status = httplib.OK 905 data = json.loads(body) 906 if 'password' in data['auth']['identity']: 907 if data['auth']['identity']['password']['user']['domain']['name'] != 'test_domain' or \ 908 data['auth']['scope']['project']['domain']['id'] != 'test_tenant_domain_id': 909 status = httplib.UNAUTHORIZED 910 911 body = ComputeFileFixtures('openstack').load('_v3__auth.json') 912 headers = self.json_content_headers.copy() 913 headers['x-subject-token'] = '00000000000000000000000000000000' 914 return (status, body, headers, httplib.responses[httplib.OK]) 915 raise NotImplementedError() 916 917 def _v3_auth_tokens_GET_UNAUTHORIZED_POST_OK(self, method, url, body, headers): 918 if method == 'GET': 919 body = ComputeFileFixtures('openstack').load( 920 '_v3__auth_unauthorized.json') 921 return (httplib.UNAUTHORIZED, body, self.json_content_headers, 922 httplib.responses[httplib.UNAUTHORIZED]) 923 if method == 'POST': 924 return self._v3_auth_tokens(method, url, body, headers) 925 raise NotImplementedError() 926 927 def _v3_users(self, method, url, body, headers): 928 if method == 'GET': 929 # list users 930 body = self.fixtures.load('v3_users.json') 931 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 932 elif method == 'POST': 933 # create user 934 body = self.fixtures.load('v3_create_user.json') 935 return (httplib.CREATED, body, self.json_content_headers, 936 httplib.responses[httplib.CREATED]) 937 raise NotImplementedError() 938 939 def _v3_users_a(self, method, url, body, headers): 940 if method == 'GET': 941 # look up a user 942 body = self.fixtures.load('v3_users_a.json') 943 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 944 if method == 'PATCH': 945 # enable / disable user 946 body = self.fixtures.load('v3_users_a.json') 947 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 948 raise NotImplementedError() 949 950 def _v3_users_b(self, method, url, body, headers): 951 if method == 'GET': 952 # look up a user 953 body = self.fixtures.load('v3_users_b.json') 954 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 955 raise NotImplementedError() 956 957 def _v3_users_c(self, method, url, body, headers): 958 if method == 'GET': 959 # look up a user 960 body = self.fixtures.load('v3_users_c.json') 961 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 962 raise NotImplementedError() 963 964 def _v3_roles(self, method, url, body, headers): 965 if method == 'GET': 966 body = self.fixtures.load('v3_roles.json') 967 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 968 raise NotImplementedError() 969 970 def _v3_domains_default_users_a_roles_a(self, method, url, body, headers): 971 if method == 'PUT': 972 # grant domain role 973 body = '' 974 return (httplib.NO_CONTENT, body, self.json_content_headers, 975 httplib.responses[httplib.NO_CONTENT]) 976 elif method == 'DELETE': 977 # revoke domain role 978 body = '' 979 return (httplib.NO_CONTENT, body, self.json_content_headers, 980 httplib.responses[httplib.NO_CONTENT]) 981 raise NotImplementedError() 982 983 def _v3_projects_a_users_a_roles_a(self, method, url, body, headers): 984 if method == 'PUT': 985 # grant project role 986 body = '' 987 return (httplib.NO_CONTENT, body, self.json_content_headers, 988 httplib.responses[httplib.NO_CONTENT]) 989 elif method == 'DELETE': 990 # revoke project role 991 body = '' 992 return (httplib.NO_CONTENT, body, self.json_content_headers, 993 httplib.responses[httplib.NO_CONTENT]) 994 raise NotImplementedError() 995 996 def _v3_domains_default(self, method, url, body, headers): 997 if method == 'GET': 998 # get domain 999 body = self.fixtures.load('v3_domains_default.json') 1000 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1001 raise NotImplementedError() 1002 1003 def _v3_users_a_projects(self, method, url, body, headers): 1004 if method == 'GET': 1005 # get user projects 1006 body = self.fixtures.load('v3_users_a_projects.json') 1007 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1008 raise NotImplementedError() 1009 1010 def _v3_domains_default_users_a_roles(self, method, url, body, headers): 1011 if method == 'GET': 1012 # get user domain roles 1013 body = self.fixtures.load('v3_domains_default_users_a_roles.json') 1014 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1015 raise NotImplementedError() 1016 1017 def _v3_OS_FEDERATION_identity_providers_idp_protocols_oidc_auth(self, method, url, body, headers): 1018 if method == 'GET': 1019 headers = self.json_content_headers.copy() 1020 headers['x-subject-token'] = '00000000000000000000000000000000' 1021 return (httplib.OK, body, headers, httplib.responses[httplib.OK]) 1022 raise NotImplementedError() 1023 1024 def _v3_OS_FEDERATION_projects(self, method, url, body, headers): 1025 if method == 'GET': 1026 # get user projects 1027 body = json.dumps({"projects": [{"id": "project_id", "name": "project_name"}, 1028 {"id": "project_id2", "name": "project_name2"}]}) 1029 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1030 raise NotImplementedError() 1031 1032 def _v3_auth_projects(self, method, url, body, headers): 1033 if method == 'GET': 1034 # get user projects 1035 body = json.dumps({"projects": [{"id": "project_id", "name": "project_name"}, 1036 {"id": "project_id2", "name": "project_name2"}]}) 1037 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1038 raise NotImplementedError() 1039 1040 1041class OpenStackIdentity_3_0_AppCred_MockHttp(OpenStackIdentity_3_0_MockHttp): 1042 1043 def _v3_auth_tokens(self, method, url, body, headers): 1044 if method == 'POST': 1045 status = httplib.OK 1046 data = json.loads(body) 1047 if 'application_credential' not in data['auth']['identity']['methods']: 1048 status = httplib.UNAUTHORIZED 1049 else: 1050 appcred = data['auth']['identity']['application_credential'] 1051 if appcred['id'] != 'appcred_id' or appcred['secret'] != 'appcred_secret': 1052 status = httplib.UNAUTHORIZED 1053 1054 body = ComputeFileFixtures('openstack').load('_v3__auth.json') 1055 headers = self.json_content_headers.copy() 1056 headers['x-subject-token'] = '00000000000000000000000000000000' 1057 return (status, body, headers, httplib.responses[httplib.OK]) 1058 raise NotImplementedError() 1059 1060class OpenStackIdentity_3_0_federation_projects_MockHttp(OpenStackIdentity_3_0_MockHttp): 1061 fixtures = ComputeFileFixtures('openstack_identity/v3') 1062 json_content_headers = {'content-type': 'application/json; charset=UTF-8'} 1063 1064 def _v3_OS_FEDERATION_projects(self, method, url, body, headers): 1065 if method == 'GET': 1066 # get user projects 1067 body = json.dumps({"projects": [{"id": "project_id"}]}) 1068 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1069 raise NotImplementedError() 1070 1071 def _v3_auth_projects(self, method, url, body, headers): 1072 return (httplib.INTERNAL_SERVER_ERROR, body, self.json_content_headers, 1073 httplib.responses[httplib.INTERNAL_SERVER_ERROR]) 1074 1075 1076class OpenStackIdentity_2_0_Connection_VOMSMockHttp(MockHttp): 1077 fixtures = ComputeFileFixtures('openstack_identity/v2') 1078 json_content_headers = {'content-type': 'application/json; charset=UTF-8'} 1079 1080 def _v2_0_tokens(self, method, url, body, headers): 1081 if method == 'POST': 1082 status = httplib.UNAUTHORIZED 1083 data = json.loads(body) 1084 if 'voms' in data['auth'] and data['auth']['voms'] is True: 1085 status = httplib.OK 1086 1087 body = ComputeFileFixtures('openstack').load('_v2_0__auth.json') 1088 headers = self.json_content_headers.copy() 1089 headers['x-subject-token'] = '00000000000000000000000000000000' 1090 return (status, body, headers, httplib.responses[httplib.OK]) 1091 raise NotImplementedError() 1092 1093 def _v2_0_tenants(self, method, url, body, headers): 1094 if method == 'GET': 1095 # get user projects 1096 body = json.dumps({"tenant": [{"name": "tenant_name"}]}) 1097 return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) 1098 raise NotImplementedError() 1099 1100 1101if __name__ == '__main__': 1102 sys.exit(unittest.main()) 1103