1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
13import uuid
14
15import fixtures
16try:
17    # requests_kerberos won't be available on py3, it doesn't work with py3.
18    import requests_kerberos
19except ImportError:
20    requests_kerberos = None
21
22from keystoneauth1 import fixture as ks_fixture
23from keystoneauth1.tests.unit import utils as test_utils
24
25
26class KerberosMock(fixtures.Fixture):
27
28    def __init__(self, requests_mock):
29        super(KerberosMock, self).__init__()
30
31        self.challenge_header = 'Negotiate %s' % uuid.uuid4().hex
32        self.pass_header = 'Negotiate %s' % uuid.uuid4().hex
33        self.requests_mock = requests_mock
34
35    def setUp(self):
36        super(KerberosMock, self).setUp()
37
38        if requests_kerberos is None:
39            return
40
41        m = fixtures.MockPatchObject(requests_kerberos.HTTPKerberosAuth,
42                                     'generate_request_header',
43                                     self._generate_request_header)
44
45        self.header_fixture = self.useFixture(m)
46
47        m = fixtures.MockPatchObject(requests_kerberos.HTTPKerberosAuth,
48                                     'authenticate_server',
49                                     self._authenticate_server)
50
51        self.authenticate_fixture = self.useFixture(m)
52
53    def _generate_request_header(self, *args, **kwargs):
54        return self.challenge_header
55
56    def _authenticate_server(self, response):
57        self.called_auth_server = True
58        return response.headers.get('www-authenticate') == self.pass_header
59
60    def mock_auth_success(
61            self,
62            token_id=None,
63            token_body=None,
64            method='POST',
65            url=test_utils.TestCase.TEST_ROOT_URL + 'v3/auth/tokens'):
66        if not token_id:
67            token_id = uuid.uuid4().hex
68        if not token_body:
69            token_body = ks_fixture.V3Token()
70
71        self.called_auth_server = False
72
73        response_list = [{'text': 'Fail',
74                          'status_code': 401,
75                          'headers': {'WWW-Authenticate': 'Negotiate'}},
76                         {'headers': {'X-Subject-Token': token_id,
77                                      'Content-Type': 'application/json',
78                                      'WWW-Authenticate': self.pass_header},
79                          'status_code': 200,
80                          'json': token_body}]
81
82        self.requests_mock.register_uri(method,
83                                        url,
84                                        response_list=response_list)
85
86        return token_id, token_body
87