1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements.  See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License.  You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import sys
17import datetime
18
19try:
20    import simplejson as json
21except ImportError:
22    import json
23
24from mock import Mock
25
26from libcloud.utils.py3 import httplib
27from libcloud.utils.py3 import assertRaisesRegex
28from libcloud.common.openstack import OpenStackBaseConnection
29from libcloud.common.openstack_identity import AUTH_TOKEN_EXPIRES_GRACE_SECONDS
30from libcloud.common.openstack_identity import get_class_for_auth_version
31from libcloud.common.openstack_identity import OpenStackServiceCatalog
32from libcloud.common.openstack_identity import OpenStackIdentity_2_0_Connection
33from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection
34from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection_AppCred
35from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection_OIDC_access_token
36from libcloud.common.openstack_identity import OpenStackIdentityUser
37from libcloud.compute.drivers.openstack import OpenStack_1_0_NodeDriver
38from libcloud.common.openstack_identity import OpenStackIdentity_2_0_Connection_VOMS
39
40from libcloud.test import unittest
41from libcloud.test import MockHttp
42from libcloud.test.secrets import OPENSTACK_PARAMS
43from libcloud.test.file_fixtures import ComputeFileFixtures
44from libcloud.test.compute.test_openstack import OpenStackMockAuthCache
45from libcloud.test.compute.test_openstack import OpenStackMockHttp
46from libcloud.test.compute.test_openstack import OpenStack_2_0_MockHttp
47
48TOMORROW = datetime.datetime.today() + datetime.timedelta(1)
49YESTERDAY = datetime.datetime.today() - datetime.timedelta(1)
50
51
52class OpenStackIdentityConnectionTestCase(unittest.TestCase):
53    def setUp(self):
54        OpenStackBaseConnection.auth_url = None
55        OpenStackBaseConnection.conn_class = OpenStackMockHttp
56        OpenStack_2_0_MockHttp.type = None
57        OpenStackIdentity_3_0_MockHttp.type = None
58
59    def test_auth_url_is_correctly_assembled(self):
60        tuples = [
61            ('1.0', OpenStackMockHttp, {}),
62            ('1.1', OpenStackMockHttp, {}),
63            ('2.0', OpenStack_2_0_MockHttp, {}),
64            ('2.0_apikey', OpenStack_2_0_MockHttp, {}),
65            ('2.0_password', OpenStack_2_0_MockHttp, {}),
66            ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'tenant_name': 'tenant-name'}),
67            ('3.x_appcred', OpenStackIdentity_3_0_MockHttp, {}),
68            ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'tenant_name': 'tenant-name'})
69        ]
70
71        APPEND = 0
72        NOTAPPEND = 1
73
74        auth_urls = [
75            ('https://auth.api.example.com', APPEND, ''),
76            ('https://auth.api.example.com/', NOTAPPEND, '/'),
77            ('https://auth.api.example.com/foo/bar', NOTAPPEND, '/foo/bar'),
78            ('https://auth.api.example.com/foo/bar/', NOTAPPEND, '/foo/bar/')
79        ]
80
81        actions = {
82            '1.0': '/v1.0',
83            '1.1': '/v1.1/auth',
84            '2.0': '/v2.0/tokens',
85            '2.0_apikey': '/v2.0/tokens',
86            '2.0_password': '/v2.0/tokens',
87            '3.x_password': '/v3/auth/tokens',
88            '3.x_appcred': '/v3/auth/tokens',
89            '3.x_oidc_access_token': '/v3/OS-FEDERATION/identity_providers/user_name/protocols/tenant-name/auth',
90        }
91
92        user_id = OPENSTACK_PARAMS[0]
93        key = OPENSTACK_PARAMS[1]
94
95        for (auth_version, mock_http_class, kwargs) in tuples:
96            for (url, should_append_default_path, expected_path) in auth_urls:
97                connection = \
98                    self._get_mock_connection(mock_http_class=mock_http_class,
99                                              auth_url=url)
100
101                auth_url = connection.auth_url
102                cls = get_class_for_auth_version(auth_version=auth_version)
103                osa = cls(auth_url=auth_url,
104                          user_id=user_id,
105                          key=key,
106                          parent_conn=connection,
107                          **kwargs)
108
109                try:
110                    osa = osa.authenticate()
111                except Exception:
112                    pass
113
114                if (should_append_default_path == APPEND):
115                    expected_path = actions[auth_version]
116
117                self.assertEqual(osa.action, expected_path)
118
119    def test_basic_authentication(self):
120        tuples = [
121            ('1.0', OpenStackMockHttp, {}),
122            ('1.1', OpenStackMockHttp, {}),
123            ('2.0', OpenStack_2_0_MockHttp, {}),
124            ('2.0_apikey', OpenStack_2_0_MockHttp, {}),
125            ('2.0_password', OpenStack_2_0_MockHttp, {}),
126            ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key',
127                                                              'token_scope': 'project', 'tenant_name': 'test_tenant',
128                                                              'tenant_domain_id': 'test_tenant_domain_id',
129                                                              'domain_name': 'test_domain'}),
130            ('3.x_appcred', OpenStackIdentity_3_0_MockHttp, {'user_id': 'appcred_id', 'key': 'appcred_secret'}),
131            ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key',
132                                                              'token_scope': 'domain', 'tenant_name': 'test_tenant',
133                                                              'tenant_domain_id': 'test_tenant_domain_id',
134                                                              'domain_name': 'test_domain'})
135        ]
136
137        user_id = OPENSTACK_PARAMS[0]
138        key = OPENSTACK_PARAMS[1]
139
140        for (auth_version, mock_http_class, kwargs) in tuples:
141            connection = \
142                self._get_mock_connection(mock_http_class=mock_http_class)
143            auth_url = connection.auth_url
144
145            if not kwargs:
146                kwargs['user_id'] = user_id
147                kwargs['key'] = key
148
149            cls = get_class_for_auth_version(auth_version=auth_version)
150            osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
151
152            self.assertEqual(osa.urls, {})
153            self.assertIsNone(osa.auth_token)
154            self.assertIsNone(osa.auth_user_info)
155            osa = osa.authenticate()
156
157            self.assertTrue(len(osa.urls) >= 1)
158            self.assertTrue(osa.auth_token is not None)
159
160            if auth_version in ['1.1', '2.0', '2.0_apikey', '2.0_password',
161                                '3.x_password', '3.x_appcred', '3.x_oidc_access_token']:
162                self.assertTrue(osa.auth_token_expires is not None)
163
164            if auth_version in ['2.0', '2.0_apikey', '2.0_password',
165                                '3.x_password', '3.x_appcred', '3.x_oidc_access_token']:
166                self.assertTrue(osa.auth_user_info is not None)
167
168    def test_token_expiration_and_force_reauthentication(self):
169        user_id = OPENSTACK_PARAMS[0]
170        key = OPENSTACK_PARAMS[1]
171
172        connection = self._get_mock_connection(OpenStack_2_0_MockHttp)
173        auth_url = connection.auth_url
174
175        osa = OpenStackIdentity_2_0_Connection(auth_url=auth_url,
176                                               user_id=user_id,
177                                               key=key,
178                                               parent_conn=connection)
179
180        mocked_auth_method = Mock(wraps=osa._authenticate_2_0_with_body)
181        osa._authenticate_2_0_with_body = mocked_auth_method
182
183        # Force re-auth, expired token
184        osa.auth_token = None
185        osa.auth_token_expires = YESTERDAY
186        count = 5
187
188        for i in range(0, count):
189            osa.authenticate(force=True)
190
191        self.assertEqual(mocked_auth_method.call_count, count)
192
193        # No force reauth, expired token
194        osa.auth_token = None
195        osa.auth_token_expires = YESTERDAY
196
197        mocked_auth_method.call_count = 0
198        self.assertEqual(mocked_auth_method.call_count, 0)
199
200        for i in range(0, count):
201            osa.authenticate(force=False)
202
203        self.assertEqual(mocked_auth_method.call_count, 1)
204
205        # No force reauth, valid / non-expired token
206        osa.auth_token = None
207
208        mocked_auth_method.call_count = 0
209        self.assertEqual(mocked_auth_method.call_count, 0)
210
211        for i in range(0, count):
212            osa.authenticate(force=False)
213
214            if i == 0:
215                osa.auth_token_expires = TOMORROW
216
217        self.assertEqual(mocked_auth_method.call_count, 1)
218
219        # No force reauth, valid / non-expired token which is about to expire in
220        # less than AUTH_TOKEN_EXPIRES_GRACE_SECONDS
221        soon = datetime.datetime.utcnow() + \
222            datetime.timedelta(seconds=AUTH_TOKEN_EXPIRES_GRACE_SECONDS - 1)
223        osa.auth_token = None
224
225        mocked_auth_method.call_count = 0
226        self.assertEqual(mocked_auth_method.call_count, 0)
227
228        for i in range(0, count):
229            if i == 0:
230                osa.auth_token_expires = soon
231
232            osa.authenticate(force=False)
233
234        self.assertEqual(mocked_auth_method.call_count, 1)
235
236    def test_authentication_cache(self):
237        tuples = [
238            # 1.0 does not provide token expiration, so it always
239            # re-authenticates and never uses the cache.
240            # ('1.0', OpenStackMockHttp, {}),
241            ('1.1', OpenStackMockHttp, {}),
242            ('2.0', OpenStack_2_0_MockHttp, {}),
243            ('2.0_apikey', OpenStack_2_0_MockHttp, {}),
244            ('2.0_password', OpenStack_2_0_MockHttp, {}),
245            ('3.x_password', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key',
246                                                              'token_scope': 'project', 'tenant_name': 'test_tenant',
247                                                              'tenant_domain_id': 'test_tenant_domain_id',
248                                                              'domain_name': 'test_domain'}),
249            ('3.x_oidc_access_token', OpenStackIdentity_3_0_MockHttp, {'user_id': 'test_user_id', 'key': 'test_key',
250                                                              'token_scope': 'domain', 'tenant_name': 'test_tenant',
251                                                              'tenant_domain_id': 'test_tenant_domain_id',
252                                                              'domain_name': 'test_domain'})
253        ]
254
255        user_id = OPENSTACK_PARAMS[0]
256        key = OPENSTACK_PARAMS[1]
257
258        for (auth_version, mock_http_class, kwargs) in tuples:
259            mock_http_class.type = None
260            connection = \
261                self._get_mock_connection(mock_http_class=mock_http_class)
262            auth_url = connection.auth_url
263
264            if not kwargs:
265                kwargs['user_id'] = user_id
266                kwargs['key'] = key
267
268            auth_cache = OpenStackMockAuthCache()
269            self.assertEqual(len(auth_cache), 0)
270            kwargs['auth_cache'] = auth_cache
271
272            cls = get_class_for_auth_version(auth_version=auth_version)
273            osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
274            osa = osa.authenticate()
275
276            # Token is cached
277            self.assertEqual(len(auth_cache), 1)
278
279            # New client, token from cache is re-used
280            osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
281            osa.request = Mock(wraps=osa.request)
282            osa = osa.authenticate()
283
284            # No auth API call
285            if auth_version in ('1.1', '2.0', '2.0_apikey', '2.0_password'):
286                self.assertEqual(osa.request.call_count, 0)
287            elif auth_version in ('3.x_password', '3.x_oidc_access_token'):
288                # v3 only caches token and expiration; service catalog URLs
289                # and the rest of the auth context are fetched from Keystone
290                osa.request.assert_called_once_with(
291                    action='/v3/auth/tokens', params=None, data=None,
292                    headers={'X-Subject-Token': '00000000000000000000000000000000',
293                             'X-Auth-Token': '00000000000000000000000000000000'},
294                    method='GET', raw=False)
295
296            # Cache size unchanged
297            self.assertEqual(len(auth_cache), 1)
298
299            # Authenticates if cached token expired
300            cache_key = list(auth_cache.store.keys())[0]
301            auth_context = auth_cache.get(cache_key)
302            auth_context.expiration = YESTERDAY
303            auth_cache.put(cache_key, auth_context)
304
305            osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
306            osa.request = Mock(wraps=osa.request)
307            osa._get_unscoped_token_from_oidc_token = Mock(return_value='000')
308            OpenStackIdentity_3_0_MockHttp.type = 'GET_UNAUTHORIZED_POST_OK'
309            osa = osa.authenticate()
310
311            if auth_version in ('1.1', '2.0', '2.0_apikey', '2.0_password'):
312                self.assertEqual(osa.request.call_count, 1)
313                self.assertTrue(osa.request.call_args[1]['method'], 'POST')
314            elif auth_version in ('3.x_password', '3.x_oidc_access_token'):
315                self.assertTrue(osa.request.call_args[0][0], '/v3/auth/tokens')
316                self.assertTrue(osa.request.call_args[1]['method'], 'POST')
317
318            # Token evicted from cache if 401 received on another call
319            if hasattr(osa, 'list_projects'):
320                mock_http_class.type = None
321                auth_cache.reset()
322
323                osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
324                osa.request = Mock(wraps=osa.request)
325                osa = osa.authenticate()
326                self.assertEqual(len(auth_cache), 1)
327                mock_http_class.type = 'UNAUTHORIZED'
328                try:
329                    osa.list_projects()
330                except:  # These methods don't handle 401s
331                    pass
332                self.assertEqual(len(auth_cache), 0)
333
334    def _get_mock_connection(self, mock_http_class, auth_url=None):
335        OpenStackBaseConnection.conn_class = mock_http_class
336
337        if auth_url is None:
338            auth_url = "https://auth.api.example.com"
339
340        OpenStackBaseConnection.auth_url = auth_url
341        connection = OpenStackBaseConnection(*OPENSTACK_PARAMS)
342
343        connection._ex_force_base_url = "https://www.foo.com"
344        connection.driver = OpenStack_1_0_NodeDriver(*OPENSTACK_PARAMS)
345
346        return connection
347
348
349class OpenStackIdentity_2_0_ConnectionTests(unittest.TestCase):
350    def setUp(self):
351        mock_cls = OpenStackIdentity_2_0_MockHttp
352        mock_cls.type = None
353        OpenStackIdentity_2_0_Connection.conn_class = mock_cls
354
355        self.auth_instance = OpenStackIdentity_2_0_Connection(auth_url='http://none',
356                                                              user_id='test',
357                                                              key='test',
358                                                              tenant_name='test',
359                                                              proxy_url='http://proxy:8080',
360                                                              timeout=10)
361        self.auth_instance.auth_token = 'mock'
362        self.assertEqual(self.auth_instance.proxy_url, 'http://proxy:8080')
363
364    def test_list_projects(self):
365        result = self.auth_instance.list_projects()
366        self.assertEqual(len(result), 2)
367        self.assertEqual(result[0].id, 'a')
368        self.assertEqual(result[0].name, 'test')
369        self.assertEqual(result[0].description, 'test project')
370        self.assertTrue(result[0].enabled)
371
372
373class OpenStackIdentity_3_0_ConnectionTests(unittest.TestCase):
374    def setUp(self):
375        mock_cls = OpenStackIdentity_3_0_MockHttp
376        mock_cls.type = None
377        OpenStackIdentity_3_0_Connection.conn_class = mock_cls
378
379        self.auth_instance = OpenStackIdentity_3_0_Connection(auth_url='http://none',
380                                                              user_id='test',
381                                                              key='test',
382                                                              tenant_name='test',
383                                                              proxy_url='http://proxy:8080',
384                                                              timeout=10)
385        self.auth_instance.auth_token = 'mock'
386        self.assertEqual(self.auth_instance.proxy_url, 'http://proxy:8080')
387
388    def test_token_scope_argument(self):
389        # Invalid token_scope value
390        expected_msg = 'Invalid value for "token_scope" argument: foo'
391        assertRaisesRegex(self, ValueError, expected_msg,
392                          OpenStackIdentity_3_0_Connection,
393                          auth_url='http://none',
394                          user_id='test',
395                          key='test',
396                          token_scope='foo')
397
398        # Missing tenant_name
399        expected_msg = 'Must provide tenant_name and domain_name argument'
400        assertRaisesRegex(self, ValueError, expected_msg,
401                          OpenStackIdentity_3_0_Connection,
402                          auth_url='http://none',
403                          user_id='test',
404                          key='test',
405                          token_scope='project')
406
407        # Missing domain_name
408        expected_msg = 'Must provide domain_name argument'
409        assertRaisesRegex(self, ValueError, expected_msg,
410                          OpenStackIdentity_3_0_Connection,
411                          auth_url='http://none',
412                          user_id='test',
413                          key='test',
414                          token_scope='domain',
415                          domain_name=None)
416
417        # Scope to project all ok
418        OpenStackIdentity_3_0_Connection(auth_url='http://none',
419                                         user_id='test',
420                                         key='test',
421                                         token_scope='project',
422                                         tenant_name='test',
423                                         domain_name='Default')
424        # Scope to domain
425        OpenStackIdentity_3_0_Connection(auth_url='http://none',
426                                         user_id='test',
427                                         key='test',
428                                         token_scope='domain',
429                                         tenant_name=None,
430                                         domain_name='Default')
431
432    def test_authenticate(self):
433        auth = OpenStackIdentity_3_0_Connection(auth_url='http://none',
434                                                user_id='test_user_id',
435                                                key='test_key',
436                                                token_scope='project',
437                                                tenant_name="test_tenant",
438                                                tenant_domain_id="test_tenant_domain_id",
439                                                domain_name='test_domain',
440                                                proxy_url='http://proxy:8080',
441                                                timeout=10)
442        auth.authenticate()
443        self.assertEqual(auth.proxy_url, 'http://proxy:8080')
444
445    def test_list_supported_versions(self):
446        OpenStackIdentity_3_0_MockHttp.type = 'v3'
447
448        versions = self.auth_instance.list_supported_versions()
449        self.assertEqual(len(versions), 2)
450        self.assertEqual(versions[0].version, 'v2.0')
451        self.assertEqual(versions[0].url,
452                         'http://192.168.18.100:5000/v2.0/')
453        self.assertEqual(versions[1].version, 'v3.0')
454        self.assertEqual(versions[1].url,
455                         'http://192.168.18.100:5000/v3/')
456
457    def test_list_domains(self):
458        domains = self.auth_instance.list_domains()
459        self.assertEqual(len(domains), 1)
460        self.assertEqual(domains[0].id, 'default')
461        self.assertEqual(domains[0].name, 'Default')
462        self.assertTrue(domains[0].enabled)
463
464    def test_list_projects(self):
465        projects = self.auth_instance.list_projects()
466        self.assertEqual(len(projects), 4)
467        self.assertEqual(projects[0].id, 'a')
468        self.assertEqual(projects[0].domain_id, 'default')
469        self.assertTrue(projects[0].enabled)
470        self.assertEqual(projects[0].description, 'Test project')
471
472    def test_list_users(self):
473        users = self.auth_instance.list_users()
474        self.assertEqual(len(users), 12)
475        self.assertEqual(users[0].id, 'a')
476        self.assertEqual(users[0].domain_id, 'default')
477        self.assertEqual(users[0].enabled, True)
478        self.assertEqual(users[0].email, 'openstack-test@localhost')
479
480    def test_list_roles(self):
481        roles = self.auth_instance.list_roles()
482        self.assertEqual(len(roles), 2)
483        self.assertEqual(roles[1].id, 'b')
484        self.assertEqual(roles[1].name, 'admin')
485
486    def test_list_user_projects(self):
487        user = self.auth_instance.list_users()[0]
488        projects = self.auth_instance.list_user_projects(user=user)
489        self.assertEqual(len(projects), 0)
490
491    def test_list_user_domain_roles(self):
492        user = self.auth_instance.list_users()[0]
493        domain = self.auth_instance.list_domains()[0]
494        roles = self.auth_instance.list_user_domain_roles(domain=domain,
495                                                          user=user)
496        self.assertEqual(len(roles), 1)
497        self.assertEqual(roles[0].name, 'admin')
498
499    def test_get_domain(self):
500        domain = self.auth_instance.get_domain(domain_id='default')
501        self.assertEqual(domain.name, 'Default')
502
503    def test_get_user(self):
504        user = self.auth_instance.get_user(user_id='a')
505        self.assertEqual(user.id, 'a')
506        self.assertEqual(user.domain_id, 'default')
507        self.assertEqual(user.enabled, True)
508        self.assertEqual(user.email, 'openstack-test@localhost')
509
510    def test_get_user_without_email(self):
511        user = self.auth_instance.get_user(user_id='b')
512        self.assertEqual(user.id, 'b')
513        self.assertEqual(user.name, 'userwithoutemail')
514        self.assertIsNone(user.email)
515
516    def test_get_user_without_enabled(self):
517        user = self.auth_instance.get_user(user_id='c')
518        self.assertEqual(user.id, 'c')
519        self.assertEqual(user.name, 'userwithoutenabled')
520        self.assertIsNone(user.enabled)
521
522    def test_create_user(self):
523        user = self.auth_instance.create_user(email='test2@localhost', password='test1',
524                                              name='test2', domain_id='default')
525
526        self.assertEqual(user.id, 'c')
527        self.assertEqual(user.name, 'test2')
528
529    def test_enable_user(self):
530        user = self.auth_instance.list_users()[0]
531        result = self.auth_instance.enable_user(user=user)
532        self.assertTrue(isinstance(result, OpenStackIdentityUser))
533
534    def test_disable_user(self):
535        user = self.auth_instance.list_users()[0]
536        result = self.auth_instance.disable_user(user=user)
537        self.assertTrue(isinstance(result, OpenStackIdentityUser))
538
539    def test_grant_domain_role_to_user(self):
540        domain = self.auth_instance.list_domains()[0]
541        role = self.auth_instance.list_roles()[0]
542        user = self.auth_instance.list_users()[0]
543
544        result = self.auth_instance.grant_domain_role_to_user(domain=domain,
545                                                              role=role,
546                                                              user=user)
547        self.assertTrue(result)
548
549    def test_revoke_domain_role_from_user(self):
550        domain = self.auth_instance.list_domains()[0]
551        role = self.auth_instance.list_roles()[0]
552        user = self.auth_instance.list_users()[0]
553
554        result = self.auth_instance.revoke_domain_role_from_user(domain=domain,
555                                                                 role=role,
556                                                                 user=user)
557        self.assertTrue(result)
558
559    def test_grant_project_role_to_user(self):
560        project = self.auth_instance.list_projects()[0]
561        role = self.auth_instance.list_roles()[0]
562        user = self.auth_instance.list_users()[0]
563
564        result = self.auth_instance.grant_project_role_to_user(project=project,
565                                                               role=role,
566                                                               user=user)
567        self.assertTrue(result)
568
569    def test_revoke_project_role_from_user(self):
570        project = self.auth_instance.list_projects()[0]
571        role = self.auth_instance.list_roles()[0]
572        user = self.auth_instance.list_users()[0]
573
574        result = self.auth_instance.revoke_project_role_from_user(project=project,
575                                                                  role=role,
576                                                                  user=user)
577        self.assertTrue(result)
578
579
580class OpenStackIdentity_3_0_Connection_AppCredTests(
581        unittest.TestCase):
582    def setUp(self):
583        mock_cls = OpenStackIdentity_3_0_AppCred_MockHttp
584        mock_cls.type = None
585        OpenStackIdentity_3_0_Connection_AppCred.conn_class = mock_cls
586
587        self.auth_instance = OpenStackIdentity_3_0_Connection_AppCred(auth_url='http://none',
588                                                                                user_id='appcred_id',
589                                                                                key='appcred_secret',
590                                                                                proxy_url='http://proxy:8080',
591                                                                                timeout=10)
592        self.auth_instance.auth_token = 'mock'
593
594    def test_authenticate(self):
595        auth = OpenStackIdentity_3_0_Connection_AppCred(auth_url='http://none',
596                                                        user_id='appcred_id',
597                                                        key='appcred_secret',
598                                                        proxy_url='http://proxy:8080',
599                                                        timeout=10)
600        auth.authenticate()
601
602
603class OpenStackIdentity_3_0_Connection_OIDC_access_token_federation_projectsTests(
604        unittest.TestCase):
605    def setUp(self):
606        mock_cls = OpenStackIdentity_3_0_federation_projects_MockHttp
607        mock_cls.type = None
608        OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
609
610        self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
611                                                                                user_id='idp',
612                                                                                key='token',
613                                                                                tenant_name='oidc',
614                                                                                proxy_url='http://proxy:8080',
615                                                                                timeout=10)
616        self.auth_instance.auth_token = 'mock'
617
618    def test_authenticate(self):
619        auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
620                                                                  user_id='idp',
621                                                                  key='token',
622                                                                  token_scope='project',
623                                                                  tenant_name="oidc",
624                                                                  proxy_url='http://proxy:8080',
625                                                                  timeout=10)
626        auth.authenticate()
627
628
629class OpenStackIdentity_3_0_Connection_OIDC_access_tokenTests(
630        unittest.TestCase):
631    def setUp(self):
632        mock_cls = OpenStackIdentity_3_0_MockHttp
633        mock_cls.type = None
634        OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
635
636        self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
637                                                                                user_id='idp',
638                                                                                key='token',
639                                                                                tenant_name='oidc',
640                                                                                domain_name='project_name2')
641        self.auth_instance.auth_token = 'mock'
642
643    def test_authenticate(self):
644        auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
645                                                                  user_id='idp',
646                                                                  key='token',
647                                                                  token_scope='project',
648                                                                  tenant_name="oidc",
649                                                                  domain_name='project_name2')
650        auth.authenticate()
651
652
653class OpenStackIdentity_3_0_Connection_OIDC_access_token_project_idTests(
654        unittest.TestCase):
655    def setUp(self):
656        mock_cls = OpenStackIdentity_3_0_MockHttp
657        mock_cls.type = None
658        OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
659
660        self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
661                                                                                user_id='idp',
662                                                                                key='token',
663                                                                                tenant_name='oidc',
664                                                                                domain_name='project_id2')
665        self.auth_instance.auth_token = 'mock'
666
667    def test_authenticate_valid_project_id(self):
668        auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
669                                                                  user_id='idp',
670                                                                  key='token',
671                                                                  token_scope='project',
672                                                                  tenant_name="oidc",
673                                                                  domain_name='project_id2')
674        auth.authenticate()
675
676    def test_authenticate_invalid_project_id(self):
677        auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(auth_url='http://none',
678                                                                  user_id='idp',
679                                                                  key='token',
680                                                                  token_scope='project',
681                                                                  tenant_name="oidc",
682                                                                  domain_name='project_id100')
683
684        expected_msg = 'Project project_id100 not found'
685        self.assertRaisesRegex(ValueError, expected_msg, auth.authenticate)
686
687
688class OpenStackIdentity_2_0_Connection_VOMSTests(unittest.TestCase):
689    def setUp(self):
690        mock_cls = OpenStackIdentity_2_0_Connection_VOMSMockHttp
691        mock_cls.type = None
692        OpenStackIdentity_2_0_Connection_VOMS.conn_class = mock_cls
693
694        self.auth_instance = OpenStackIdentity_2_0_Connection_VOMS(auth_url='http://none',
695                                                                   user_id=None,
696                                                                   key='/tmp/proxy.pem',
697                                                                   tenant_name='VO')
698        self.auth_instance.auth_token = 'mock'
699
700    def test_authenticate(self):
701        auth = OpenStackIdentity_2_0_Connection_VOMS(auth_url='http://none',
702                                                     user_id=None,
703                                                     key='/tmp/proxy.pem',
704                                                     token_scope='test',
705                                                     tenant_name="VO")
706        auth.authenticate()
707
708
709class OpenStackServiceCatalogTestCase(unittest.TestCase):
710    fixtures = ComputeFileFixtures('openstack')
711
712    def test_parsing_auth_v1_1(self):
713        data = self.fixtures.load('_v1_1__auth.json')
714        data = json.loads(data)
715        service_catalog = data['auth']['serviceCatalog']
716
717        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
718                                          auth_version='1.0')
719        entries = catalog.get_entries()
720        self.assertEqual(len(entries), 3)
721
722        entry = [e for e in entries if e.service_type == 'cloudFilesCDN'][0]
723        self.assertEqual(entry.service_type, 'cloudFilesCDN')
724        self.assertIsNone(entry.service_name)
725        self.assertEqual(len(entry.endpoints), 2)
726        self.assertEqual(entry.endpoints[0].region, 'ORD')
727        self.assertEqual(entry.endpoints[0].url,
728                         'https://cdn2.clouddrive.com/v1/MossoCloudFS')
729        self.assertEqual(entry.endpoints[0].endpoint_type, 'external')
730        self.assertEqual(entry.endpoints[1].region, 'LON')
731        self.assertEqual(entry.endpoints[1].endpoint_type, 'external')
732
733    def test_parsing_auth_v2(self):
734        data = self.fixtures.load('_v2_0__auth.json')
735        data = json.loads(data)
736        service_catalog = data['access']['serviceCatalog']
737
738        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
739                                          auth_version='2.0')
740        entries = catalog.get_entries()
741        self.assertEqual(len(entries), 10)
742
743        entry = [e for e in entries if e.service_name == 'cloudServers'][0]
744        self.assertEqual(entry.service_type, 'compute')
745        self.assertEqual(entry.service_name, 'cloudServers')
746        self.assertEqual(len(entry.endpoints), 1)
747        self.assertIsNone(entry.endpoints[0].region)
748        self.assertEqual(entry.endpoints[0].url,
749                         'https://servers.api.rackspacecloud.com/v1.0/1337')
750        self.assertEqual(entry.endpoints[0].endpoint_type, 'external')
751
752    def test_parsing_auth_v3(self):
753        data = self.fixtures.load('_v3__auth.json')
754        data = json.loads(data)
755        service_catalog = data['token']['catalog']
756
757        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
758                                          auth_version='3.x')
759        entries = catalog.get_entries()
760        self.assertEqual(len(entries), 6)
761        entry = [e for e in entries if e.service_type == 'volume'][0]
762        self.assertEqual(entry.service_type, 'volume')
763        self.assertIsNone(entry.service_name)
764        self.assertEqual(len(entry.endpoints), 3)
765        self.assertEqual(entry.endpoints[0].region, 'regionOne')
766        self.assertEqual(entry.endpoints[0].endpoint_type, 'external')
767        self.assertEqual(entry.endpoints[1].region, 'regionOne')
768        self.assertEqual(entry.endpoints[1].endpoint_type, 'admin')
769        self.assertEqual(entry.endpoints[2].region, 'regionOne')
770        self.assertEqual(entry.endpoints[2].endpoint_type, 'internal')
771
772    def test_get_public_urls(self):
773        data = self.fixtures.load('_v2_0__auth.json')
774        data = json.loads(data)
775        service_catalog = data['access']['serviceCatalog']
776
777        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
778                                          auth_version='2.0')
779
780        public_urls = catalog.get_public_urls(service_type='object-store')
781        expected_urls = ['https://storage101.lon1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111',
782                         'https://storage101.ord1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111']
783        self.assertEqual(public_urls, expected_urls)
784
785    def test_get_regions(self):
786        data = self.fixtures.load('_v2_0__auth.json')
787        data = json.loads(data)
788        service_catalog = data['access']['serviceCatalog']
789
790        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
791                                          auth_version='2.0')
792
793        regions = catalog.get_regions(service_type='object-store')
794        self.assertEqual(regions, ['LON', 'ORD'])
795
796        regions = catalog.get_regions(service_type='invalid')
797        self.assertEqual(regions, [])
798
799    def test_get_service_types(self):
800        data = self.fixtures.load('_v2_0__auth.json')
801        data = json.loads(data)
802        service_catalog = data['access']['serviceCatalog']
803
804        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
805                                          auth_version='2.0')
806        service_types = catalog.get_service_types()
807        self.assertEqual(service_types, ['compute', 'image', 'network',
808                                         'object-store', 'rax:object-cdn',
809                                         'volumev2', 'volumev3'])
810
811        service_types = catalog.get_service_types(region='ORD')
812        self.assertEqual(service_types, ['rax:object-cdn'])
813
814    def test_get_service_names(self):
815        data = self.fixtures.load('_v2_0__auth.json')
816        data = json.loads(data)
817        service_catalog = data['access']['serviceCatalog']
818
819        catalog = OpenStackServiceCatalog(service_catalog=service_catalog,
820                                          auth_version='2.0')
821
822        service_names = catalog.get_service_names()
823        self.assertEqual(service_names, ['cinderv2', 'cinderv3', 'cloudFiles',
824                                         'cloudFilesCDN', 'cloudServers',
825                                         'cloudServersOpenStack',
826                                         'cloudServersPreprod',
827                                         'glance',
828                                         'neutron',
829                                         'nova'])
830
831        service_names = catalog.get_service_names(service_type='compute')
832        self.assertEqual(service_names, ['cloudServers',
833                                         'cloudServersOpenStack',
834                                         'cloudServersPreprod',
835                                         'nova'])
836
837
838class OpenStackIdentity_2_0_MockHttp(MockHttp):
839    fixtures = ComputeFileFixtures('openstack_identity/v2')
840    json_content_headers = {'content-type': 'application/json; charset=UTF-8'}
841
842    def _v2_0_tenants(self, method, url, body, headers):
843        if method == 'GET':
844            body = self.fixtures.load('v2_0_tenants.json')
845            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
846        raise NotImplementedError()
847
848
849class OpenStackIdentity_3_0_MockHttp(MockHttp):
850    fixtures = ComputeFileFixtures('openstack_identity/v3')
851    json_content_headers = {'content-type': 'application/json; charset=UTF-8'}
852
853    def _v3(self, method, url, body, headers):
854        if method == 'GET':
855            body = self.fixtures.load('v3_versions.json')
856            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
857        raise NotImplementedError()
858
859    def _v3_domains(self, method, url, body, headers):
860        if method == 'GET':
861            body = self.fixtures.load('v3_domains.json')
862            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
863        raise NotImplementedError()
864
865    def _v3_projects(self, method, url, body, headers):
866        if method == 'GET':
867            body = self.fixtures.load('v3_projects.json')
868            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
869        raise NotImplementedError()
870
871    def _v3_projects_UNAUTHORIZED(self, method, url, body, headers):
872        if method == 'GET':
873            body = ComputeFileFixtures('openstack').load('_v3__auth.json')
874            return (httplib.UNAUTHORIZED, body, self.json_content_headers,
875                    httplib.responses[httplib.UNAUTHORIZED])
876        raise NotImplementedError()
877
878    def _v3_OS_FEDERATION_identity_providers_test_user_id_protocols_test_tenant_auth(self, method, url, body, headers):
879        if method == 'GET':
880            if 'Authorization' not in headers:
881                return (httplib.UNAUTHORIZED, '', headers, httplib.responses[httplib.OK])
882
883            if headers['Authorization'] == 'Bearer test_key':
884                response_body = ComputeFileFixtures('openstack').load('_v3__auth.json')
885                response_headers = {
886                    'Content-Type': 'application/json',
887                    'x-subject-token': 'foo-bar'
888                }
889                return (httplib.OK, response_body, response_headers, httplib.responses[httplib.OK])
890
891            return (httplib.UNAUTHORIZED, '{}', headers, httplib.responses[httplib.OK])
892        raise NotImplementedError()
893
894    def _v3_auth_tokens(self, method, url, body, headers):
895        if method == 'GET':
896            body = json.loads(
897                ComputeFileFixtures('openstack').load('_v3__auth.json'))
898            body['token']['expires_at'] = TOMORROW.isoformat()
899            headers = self.json_content_headers.copy()
900            headers['x-subject-token'] = '00000000000000000000000000000000'
901            return (httplib.OK, json.dumps(body), headers,
902                    httplib.responses[httplib.OK])
903        if method == 'POST':
904            status = httplib.OK
905            data = json.loads(body)
906            if 'password' in data['auth']['identity']:
907                if data['auth']['identity']['password']['user']['domain']['name'] != 'test_domain' or \
908                        data['auth']['scope']['project']['domain']['id'] != 'test_tenant_domain_id':
909                    status = httplib.UNAUTHORIZED
910
911            body = ComputeFileFixtures('openstack').load('_v3__auth.json')
912            headers = self.json_content_headers.copy()
913            headers['x-subject-token'] = '00000000000000000000000000000000'
914            return (status, body, headers, httplib.responses[httplib.OK])
915        raise NotImplementedError()
916
917    def _v3_auth_tokens_GET_UNAUTHORIZED_POST_OK(self, method, url, body, headers):
918        if method == 'GET':
919            body = ComputeFileFixtures('openstack').load(
920                '_v3__auth_unauthorized.json')
921            return (httplib.UNAUTHORIZED, body, self.json_content_headers,
922                    httplib.responses[httplib.UNAUTHORIZED])
923        if method == 'POST':
924            return self._v3_auth_tokens(method, url, body, headers)
925        raise NotImplementedError()
926
927    def _v3_users(self, method, url, body, headers):
928        if method == 'GET':
929            # list users
930            body = self.fixtures.load('v3_users.json')
931            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
932        elif method == 'POST':
933            # create user
934            body = self.fixtures.load('v3_create_user.json')
935            return (httplib.CREATED, body, self.json_content_headers,
936                    httplib.responses[httplib.CREATED])
937        raise NotImplementedError()
938
939    def _v3_users_a(self, method, url, body, headers):
940        if method == 'GET':
941            # look up a user
942            body = self.fixtures.load('v3_users_a.json')
943            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
944        if method == 'PATCH':
945            # enable / disable user
946            body = self.fixtures.load('v3_users_a.json')
947            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
948        raise NotImplementedError()
949
950    def _v3_users_b(self, method, url, body, headers):
951        if method == 'GET':
952            # look up a user
953            body = self.fixtures.load('v3_users_b.json')
954            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
955        raise NotImplementedError()
956
957    def _v3_users_c(self, method, url, body, headers):
958        if method == 'GET':
959            # look up a user
960            body = self.fixtures.load('v3_users_c.json')
961            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
962        raise NotImplementedError()
963
964    def _v3_roles(self, method, url, body, headers):
965        if method == 'GET':
966            body = self.fixtures.load('v3_roles.json')
967            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
968        raise NotImplementedError()
969
970    def _v3_domains_default_users_a_roles_a(self, method, url, body, headers):
971        if method == 'PUT':
972            # grant domain role
973            body = ''
974            return (httplib.NO_CONTENT, body, self.json_content_headers,
975                    httplib.responses[httplib.NO_CONTENT])
976        elif method == 'DELETE':
977            # revoke domain role
978            body = ''
979            return (httplib.NO_CONTENT, body, self.json_content_headers,
980                    httplib.responses[httplib.NO_CONTENT])
981        raise NotImplementedError()
982
983    def _v3_projects_a_users_a_roles_a(self, method, url, body, headers):
984        if method == 'PUT':
985            # grant project role
986            body = ''
987            return (httplib.NO_CONTENT, body, self.json_content_headers,
988                    httplib.responses[httplib.NO_CONTENT])
989        elif method == 'DELETE':
990            # revoke project role
991            body = ''
992            return (httplib.NO_CONTENT, body, self.json_content_headers,
993                    httplib.responses[httplib.NO_CONTENT])
994        raise NotImplementedError()
995
996    def _v3_domains_default(self, method, url, body, headers):
997        if method == 'GET':
998            # get domain
999            body = self.fixtures.load('v3_domains_default.json')
1000            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1001        raise NotImplementedError()
1002
1003    def _v3_users_a_projects(self, method, url, body, headers):
1004        if method == 'GET':
1005            # get user projects
1006            body = self.fixtures.load('v3_users_a_projects.json')
1007            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1008        raise NotImplementedError()
1009
1010    def _v3_domains_default_users_a_roles(self, method, url, body, headers):
1011        if method == 'GET':
1012            # get user domain roles
1013            body = self.fixtures.load('v3_domains_default_users_a_roles.json')
1014            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1015        raise NotImplementedError()
1016
1017    def _v3_OS_FEDERATION_identity_providers_idp_protocols_oidc_auth(self, method, url, body, headers):
1018        if method == 'GET':
1019            headers = self.json_content_headers.copy()
1020            headers['x-subject-token'] = '00000000000000000000000000000000'
1021            return (httplib.OK, body, headers, httplib.responses[httplib.OK])
1022        raise NotImplementedError()
1023
1024    def _v3_OS_FEDERATION_projects(self, method, url, body, headers):
1025        if method == 'GET':
1026            # get user projects
1027            body = json.dumps({"projects": [{"id": "project_id", "name": "project_name"},
1028                                            {"id": "project_id2", "name": "project_name2"}]})
1029            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1030        raise NotImplementedError()
1031
1032    def _v3_auth_projects(self, method, url, body, headers):
1033        if method == 'GET':
1034            # get user projects
1035            body = json.dumps({"projects": [{"id": "project_id", "name": "project_name"},
1036                                            {"id": "project_id2", "name": "project_name2"}]})
1037            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1038        raise NotImplementedError()
1039
1040
1041class OpenStackIdentity_3_0_AppCred_MockHttp(OpenStackIdentity_3_0_MockHttp):
1042
1043    def _v3_auth_tokens(self, method, url, body, headers):
1044        if method == 'POST':
1045            status = httplib.OK
1046            data = json.loads(body)
1047            if 'application_credential' not in data['auth']['identity']['methods']:
1048                status = httplib.UNAUTHORIZED
1049            else:
1050                appcred = data['auth']['identity']['application_credential']
1051                if appcred['id'] != 'appcred_id' or appcred['secret'] != 'appcred_secret':
1052                    status = httplib.UNAUTHORIZED
1053
1054            body = ComputeFileFixtures('openstack').load('_v3__auth.json')
1055            headers = self.json_content_headers.copy()
1056            headers['x-subject-token'] = '00000000000000000000000000000000'
1057            return (status, body, headers, httplib.responses[httplib.OK])
1058        raise NotImplementedError()
1059
1060class OpenStackIdentity_3_0_federation_projects_MockHttp(OpenStackIdentity_3_0_MockHttp):
1061    fixtures = ComputeFileFixtures('openstack_identity/v3')
1062    json_content_headers = {'content-type': 'application/json; charset=UTF-8'}
1063
1064    def _v3_OS_FEDERATION_projects(self, method, url, body, headers):
1065        if method == 'GET':
1066            # get user projects
1067            body = json.dumps({"projects": [{"id": "project_id"}]})
1068            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1069        raise NotImplementedError()
1070
1071    def _v3_auth_projects(self, method, url, body, headers):
1072        return (httplib.INTERNAL_SERVER_ERROR, body, self.json_content_headers,
1073                httplib.responses[httplib.INTERNAL_SERVER_ERROR])
1074
1075
1076class OpenStackIdentity_2_0_Connection_VOMSMockHttp(MockHttp):
1077    fixtures = ComputeFileFixtures('openstack_identity/v2')
1078    json_content_headers = {'content-type': 'application/json; charset=UTF-8'}
1079
1080    def _v2_0_tokens(self, method, url, body, headers):
1081        if method == 'POST':
1082            status = httplib.UNAUTHORIZED
1083            data = json.loads(body)
1084            if 'voms' in data['auth'] and data['auth']['voms'] is True:
1085                status = httplib.OK
1086
1087            body = ComputeFileFixtures('openstack').load('_v2_0__auth.json')
1088            headers = self.json_content_headers.copy()
1089            headers['x-subject-token'] = '00000000000000000000000000000000'
1090            return (status, body, headers, httplib.responses[httplib.OK])
1091        raise NotImplementedError()
1092
1093    def _v2_0_tenants(self, method, url, body, headers):
1094        if method == 'GET':
1095            # get user projects
1096            body = json.dumps({"tenant": [{"name": "tenant_name"}]})
1097            return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK])
1098        raise NotImplementedError()
1099
1100
1101if __name__ == '__main__':
1102    sys.exit(unittest.main())
1103