1# -*- coding: utf-8 -*-
2# Copyright: (c) 2019, XLAB Steampunk <steampunk@xlab.si>
3#
4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import absolute_import, division, print_function
7__metaclass__ = type
8
9import sys
10
11import pytest
12
13from ansible_collections.sensu.sensu_go.plugins.module_utils import (
14    client, errors, http
15)
16
17pytestmark = pytest.mark.skipif(
18    sys.version_info < (2, 7), reason="requires python2.7 or higher"
19)
20
21
22class TestAuthHeader:
23    def test_using_valid_token(self, mocker):
24        request = mocker.patch.object(http, "request")
25        request.return_value = http.Response(200, '{"access_token": "token"}')
26
27        c = client.Client(
28            "http://example.com/", "user", "pass", None, True, None,
29        )
30
31        assert dict(Authorization="Bearer token") == c.auth_header
32        assert 1 == request.call_count
33        assert ("GET", "http://example.com/auth") == request.call_args[0]
34        assert "user" == request.call_args[1]["url_username"]
35        assert "pass" == request.call_args[1]["url_password"]
36
37    def test_cache_auth_headers_with_token(self, mocker):
38        request = mocker.patch.object(http, "request")
39        request.return_value = http.Response(200, '{"access_token": "token"}')
40
41        c = client.Client(
42            "http://example.com/", "user", "pass", None, True, None,
43        )
44        for i in range(5):
45            c.auth_header
46
47        assert 1 == request.call_count
48
49    def test_login_failure_token_bad_status(self, mocker):
50        request = mocker.patch.object(http, "request")
51        request.return_value = http.Response(500, '{"access_token": "token"}')
52
53        with pytest.raises(errors.SensuError, match="500"):
54            client.Client(
55                "http://example.com/", "user", "pass", None, True, None,
56            ).auth_header
57
58    def test_login_failure_token_bad_json(self, mocker):
59        request = mocker.patch.object(http, "request")
60        request.return_value = http.Response(200, "{ not a json }")
61
62        with pytest.raises(errors.SensuError, match="JSON"):
63            client.Client(
64                "http://example.com/", "user", "pass", None, True, None,
65            ).auth_header
66
67    def test_login_failure_token_missing_token(self, mocker):
68        request = mocker.patch.object(http, "request")
69        request.return_value = http.Response(200, '{"access_bla": "token"}')
70
71        with pytest.raises(errors.SensuError, match="token"):
72            client.Client(
73                "http://example.com/", "user", "pass", None, True, None,
74            ).auth_header
75
76
77class TestVersion:
78    def test_valid_version(self, mocker):
79        c = client.Client("http://example.com/", "u", "p", None, True, None)
80        mocker.patch.object(c, "get").return_value = http.Response(
81            200, '{"sensu_backend":"5.21.0#sha-here"}',
82        )
83
84        assert c.version == "5.21.0"
85
86    def test_valid_version_is_cached(self, mocker):
87        c = client.Client("http://example.com/", "u", "p", None, True, None)
88        get = mocker.patch.object(c, "get")
89        get.return_value = http.Response(
90            200, '{"sensu_backend":"5.21.0#sha-here"}',
91        )
92
93        for i in range(4):
94            c.version
95
96        get.assert_called_once()
97
98    def test_non_200_response(self, mocker):
99        c = client.Client("http://example.com/", "u", "p", None, True, None)
100        mocker.patch.object(c, "get").return_value = http.Response(
101            400, '{"sensu_backend":"5.21.0#sha-here"}',
102        )
103
104        with pytest.raises(errors.SensuError, match="400"):
105            c.version
106
107    def test_bad_json_response(self, mocker):
108        c = client.Client("http://example.com/", "u", "p", None, True, None)
109        mocker.patch.object(c, "get").return_value = http.Response(
110            200, '"sensu_backend',
111        )
112
113        with pytest.raises(errors.SensuError, match="JSON"):
114            c.version
115
116    def test_missing_backend_version_in_response(self, mocker):
117        c = client.Client("http://example.com/", "u", "p", None, True, None)
118        mocker.patch.object(c, "get").return_value = http.Response(200, '{}')
119
120        with pytest.raises(errors.SensuError, match="backend"):
121            c.version
122
123    def test_invalid_version(self, mocker):
124        c = client.Client("http://example.com/", "u", "p", None, True, None)
125        mocker.patch.object(c, "get").return_value = http.Response(
126            200, '{"sensu_backend":"devel"}',
127        )
128
129        assert c.version == c.BAD_VERSION
130
131
132class TestRequest:
133    def test_request_payload_token(self, mocker):
134        request = mocker.patch.object(http, "request")
135        request.side_effect = (
136            http.Response(200, '{"access_token": "token"}'),
137            http.Response(200, "data"),
138        )
139
140        client.Client(
141            "http://example.com/", "user", "pass", None, True, None,
142        ).request("PUT", "/path", dict(some="payload"))
143
144        request.assert_called_with(
145            "PUT", "http://example.com/path",
146            payload=dict(some="payload"),
147            headers=dict(Authorization="Bearer token"),
148            validate_certs=True,
149            ca_path=None,
150        )
151
152    def test_request_payload_api_key(self, mocker):
153        request = mocker.patch.object(http, "request")
154        request.return_value = http.Response(200, "data")
155
156        client.Client(
157            "http://example.com/", None, None, "key", False, None,
158        ).request("PUT", "/path", dict(some="payload"))
159
160        request.assert_called_once_with(
161            "PUT", "http://example.com/path",
162            payload=dict(some="payload"),
163            headers=dict(Authorization="Key key"),
164            validate_certs=False,
165            ca_path=None,
166        )
167
168    def test_request_no_payload_token(self, mocker):
169        request = mocker.patch.object(http, "request")
170        request.side_effect = (
171            http.Response(200, '{"access_token": "token"}'),
172            http.Response(200, "data"),
173        )
174
175        client.Client(
176            "http://example.com/", "user", "pass", None, True, "/ca",
177        ).request("PUT", "/path")
178
179        request.assert_called_with(
180            "PUT", "http://example.com/path", payload=None,
181            headers=dict(Authorization="Bearer token"),
182            validate_certs=True,
183            ca_path="/ca",
184        )
185
186    def test_request_no_payload_api_key(self, mocker):
187        request = mocker.patch.object(http, "request")
188        request.return_value = http.Response(200, "data")
189
190        client.Client(
191            "http://example.com/", "u", "p", "key", False, "/ca",
192        ).request("PUT", "/path")
193
194        request.assert_called_once_with(
195            "PUT", "http://example.com/path", payload=None,
196            headers=dict(Authorization="Key key"),
197            validate_certs=False,
198            ca_path="/ca",
199        )
200
201    @pytest.mark.parametrize("status", [401, 403])
202    def test_request_bad_credentials(self, status, mocker):
203        request = mocker.patch.object(http, "request")
204        request.return_value = http.Response(status, "data")
205
206        with pytest.raises(errors.SensuError, match="credentials"):
207            client.Client(
208                "http://example.com/", None, None, "key", True, None,
209            ).request("PUT", "/path", dict(some="payload"))
210
211        request.assert_called_once_with(
212            "PUT", "http://example.com/path",
213            payload=dict(some="payload"),
214            headers=dict(Authorization="Key key"),
215            validate_certs=True,
216            ca_path=None,
217        )
218
219
220class TestGet:
221    def test_get(self, mocker):
222        c = client.Client(
223            "http://example.com/", "user", "pass", None, True, None,
224        )
225        c.request = mocker.Mock()
226
227        c.get("/path")
228
229        c.request.assert_called_with("GET", "/path")
230
231
232class TestPut:
233    def test_put(self, mocker):
234        c = client.Client(
235            "http://example.com/", "user", "pass", None, True, None,
236        )
237        c.request = mocker.Mock()
238
239        c.put("/path", {})
240
241        c.request.assert_called_with("PUT", "/path", {})
242
243
244class TestDelete:
245    def test_delete(self, mocker):
246        c = client.Client(
247            "http://example.com/", "user", "pass", None, True, None,
248        )
249        c.request = mocker.Mock()
250
251        c.delete("/path")
252
253        c.request.assert_called_with("DELETE", "/path")
254
255
256class TestValidateAuthData:
257    def test_valid_creds(self, mocker):
258        request = mocker.patch.object(http, "request")
259        request.return_value = http.Response(200, None)
260        c = client.Client(
261            "http://example.com/", "user", "pass", None, True, None,
262        )
263
264        result = c.validate_auth_data("check_user", "check_pass")
265
266        assert result
267        assert 1 == request.call_count
268        assert ("GET", "http://example.com/auth/test") == request.call_args[0]
269        assert "check_user" == request.call_args[1]["url_username"]
270        assert "check_pass" == request.call_args[1]["url_password"]
271
272    def test_invalid_creds(self, mocker):
273        request = mocker.patch.object(http, "request")
274        request.return_value = http.Response(401, None)
275        c = client.Client(
276            "http://example.com/", "user", "pass", None, True, None,
277        )
278
279        result = c.validate_auth_data("check_user", "check_pass")
280
281        assert not result
282        assert 1 == request.call_count
283        assert ("GET", "http://example.com/auth/test") == request.call_args[0]
284        assert "check_user" == request.call_args[1]["url_username"]
285        assert "check_pass" == request.call_args[1]["url_password"]
286
287    def test_broken_backend(self, mocker):
288        request = mocker.patch.object(http, "request")
289        request.return_value = http.Response(500, None)
290        c = client.Client(
291            "http://example.com/", "user", "pass", None, True, None,
292        )
293
294        with pytest.raises(errors.SensuError, match="500"):
295            c.validate_auth_data("check_user", "check_pass")
296