1# Licensed under the Apache License, Version 2.0 (the "License"); you may 2# not use this file except in compliance with the License. You may obtain 3# a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10# License for the specific language governing permissions and limitations 11# under the License. 12 13import uuid 14 15import fixtures 16try: 17 # requests_kerberos won't be available on py3, it doesn't work with py3. 18 import requests_kerberos 19except ImportError: 20 requests_kerberos = None 21 22from keystoneauth1 import fixture as ks_fixture 23from keystoneauth1.tests.unit import utils as test_utils 24 25 26class KerberosMock(fixtures.Fixture): 27 28 def __init__(self, requests_mock): 29 super(KerberosMock, self).__init__() 30 31 self.challenge_header = 'Negotiate %s' % uuid.uuid4().hex 32 self.pass_header = 'Negotiate %s' % uuid.uuid4().hex 33 self.requests_mock = requests_mock 34 35 def setUp(self): 36 super(KerberosMock, self).setUp() 37 38 if requests_kerberos is None: 39 return 40 41 m = fixtures.MockPatchObject(requests_kerberos.HTTPKerberosAuth, 42 'generate_request_header', 43 self._generate_request_header) 44 45 self.header_fixture = self.useFixture(m) 46 47 m = fixtures.MockPatchObject(requests_kerberos.HTTPKerberosAuth, 48 'authenticate_server', 49 self._authenticate_server) 50 51 self.authenticate_fixture = self.useFixture(m) 52 53 def _generate_request_header(self, *args, **kwargs): 54 return self.challenge_header 55 56 def _authenticate_server(self, response): 57 self.called_auth_server = True 58 return response.headers.get('www-authenticate') == self.pass_header 59 60 def mock_auth_success( 61 self, 62 token_id=None, 63 token_body=None, 64 method='POST', 65 url=test_utils.TestCase.TEST_ROOT_URL + 'v3/auth/tokens'): 66 if not token_id: 67 token_id = uuid.uuid4().hex 68 if not token_body: 69 token_body = ks_fixture.V3Token() 70 71 self.called_auth_server = False 72 73 response_list = [{'text': 'Fail', 74 'status_code': 401, 75 'headers': {'WWW-Authenticate': 'Negotiate'}}, 76 {'headers': {'X-Subject-Token': token_id, 77 'Content-Type': 'application/json', 78 'WWW-Authenticate': self.pass_header}, 79 'status_code': 200, 80 'json': token_body}] 81 82 self.requests_mock.register_uri(method, 83 url, 84 response_list=response_list) 85 86 return token_id, token_body 87