1# -*- coding: utf-8 -*- 2# Copyright: (c) 2019, XLAB Steampunk <steampunk@xlab.si> 3# 4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import absolute_import, division, print_function 7__metaclass__ = type 8 9import sys 10 11import pytest 12 13from ansible_collections.sensu.sensu_go.plugins.module_utils import ( 14 client, errors, http 15) 16 17pytestmark = pytest.mark.skipif( 18 sys.version_info < (2, 7), reason="requires python2.7 or higher" 19) 20 21 22class TestAuthHeader: 23 def test_using_valid_token(self, mocker): 24 request = mocker.patch.object(http, "request") 25 request.return_value = http.Response(200, '{"access_token": "token"}') 26 27 c = client.Client( 28 "http://example.com/", "user", "pass", None, True, None, 29 ) 30 31 assert dict(Authorization="Bearer token") == c.auth_header 32 assert 1 == request.call_count 33 assert ("GET", "http://example.com/auth") == request.call_args[0] 34 assert "user" == request.call_args[1]["url_username"] 35 assert "pass" == request.call_args[1]["url_password"] 36 37 def test_cache_auth_headers_with_token(self, mocker): 38 request = mocker.patch.object(http, "request") 39 request.return_value = http.Response(200, '{"access_token": "token"}') 40 41 c = client.Client( 42 "http://example.com/", "user", "pass", None, True, None, 43 ) 44 for i in range(5): 45 c.auth_header 46 47 assert 1 == request.call_count 48 49 def test_login_failure_token_bad_status(self, mocker): 50 request = mocker.patch.object(http, "request") 51 request.return_value = http.Response(500, '{"access_token": "token"}') 52 53 with pytest.raises(errors.SensuError, match="500"): 54 client.Client( 55 "http://example.com/", "user", "pass", None, True, None, 56 ).auth_header 57 58 def test_login_failure_token_bad_json(self, mocker): 59 request = mocker.patch.object(http, "request") 60 request.return_value = http.Response(200, "{ not a json }") 61 62 with pytest.raises(errors.SensuError, match="JSON"): 63 client.Client( 64 "http://example.com/", "user", "pass", None, True, None, 65 ).auth_header 66 67 def test_login_failure_token_missing_token(self, mocker): 68 request = mocker.patch.object(http, "request") 69 request.return_value = http.Response(200, '{"access_bla": "token"}') 70 71 with pytest.raises(errors.SensuError, match="token"): 72 client.Client( 73 "http://example.com/", "user", "pass", None, True, None, 74 ).auth_header 75 76 77class TestVersion: 78 def test_valid_version(self, mocker): 79 c = client.Client("http://example.com/", "u", "p", None, True, None) 80 mocker.patch.object(c, "get").return_value = http.Response( 81 200, '{"sensu_backend":"5.21.0#sha-here"}', 82 ) 83 84 assert c.version == "5.21.0" 85 86 def test_valid_version_is_cached(self, mocker): 87 c = client.Client("http://example.com/", "u", "p", None, True, None) 88 get = mocker.patch.object(c, "get") 89 get.return_value = http.Response( 90 200, '{"sensu_backend":"5.21.0#sha-here"}', 91 ) 92 93 for i in range(4): 94 c.version 95 96 get.assert_called_once() 97 98 def test_non_200_response(self, mocker): 99 c = client.Client("http://example.com/", "u", "p", None, True, None) 100 mocker.patch.object(c, "get").return_value = http.Response( 101 400, '{"sensu_backend":"5.21.0#sha-here"}', 102 ) 103 104 with pytest.raises(errors.SensuError, match="400"): 105 c.version 106 107 def test_bad_json_response(self, mocker): 108 c = client.Client("http://example.com/", "u", "p", None, True, None) 109 mocker.patch.object(c, "get").return_value = http.Response( 110 200, '"sensu_backend', 111 ) 112 113 with pytest.raises(errors.SensuError, match="JSON"): 114 c.version 115 116 def test_missing_backend_version_in_response(self, mocker): 117 c = client.Client("http://example.com/", "u", "p", None, True, None) 118 mocker.patch.object(c, "get").return_value = http.Response(200, '{}') 119 120 with pytest.raises(errors.SensuError, match="backend"): 121 c.version 122 123 def test_invalid_version(self, mocker): 124 c = client.Client("http://example.com/", "u", "p", None, True, None) 125 mocker.patch.object(c, "get").return_value = http.Response( 126 200, '{"sensu_backend":"devel"}', 127 ) 128 129 assert c.version == c.BAD_VERSION 130 131 132class TestRequest: 133 def test_request_payload_token(self, mocker): 134 request = mocker.patch.object(http, "request") 135 request.side_effect = ( 136 http.Response(200, '{"access_token": "token"}'), 137 http.Response(200, "data"), 138 ) 139 140 client.Client( 141 "http://example.com/", "user", "pass", None, True, None, 142 ).request("PUT", "/path", dict(some="payload")) 143 144 request.assert_called_with( 145 "PUT", "http://example.com/path", 146 payload=dict(some="payload"), 147 headers=dict(Authorization="Bearer token"), 148 validate_certs=True, 149 ca_path=None, 150 ) 151 152 def test_request_payload_api_key(self, mocker): 153 request = mocker.patch.object(http, "request") 154 request.return_value = http.Response(200, "data") 155 156 client.Client( 157 "http://example.com/", None, None, "key", False, None, 158 ).request("PUT", "/path", dict(some="payload")) 159 160 request.assert_called_once_with( 161 "PUT", "http://example.com/path", 162 payload=dict(some="payload"), 163 headers=dict(Authorization="Key key"), 164 validate_certs=False, 165 ca_path=None, 166 ) 167 168 def test_request_no_payload_token(self, mocker): 169 request = mocker.patch.object(http, "request") 170 request.side_effect = ( 171 http.Response(200, '{"access_token": "token"}'), 172 http.Response(200, "data"), 173 ) 174 175 client.Client( 176 "http://example.com/", "user", "pass", None, True, "/ca", 177 ).request("PUT", "/path") 178 179 request.assert_called_with( 180 "PUT", "http://example.com/path", payload=None, 181 headers=dict(Authorization="Bearer token"), 182 validate_certs=True, 183 ca_path="/ca", 184 ) 185 186 def test_request_no_payload_api_key(self, mocker): 187 request = mocker.patch.object(http, "request") 188 request.return_value = http.Response(200, "data") 189 190 client.Client( 191 "http://example.com/", "u", "p", "key", False, "/ca", 192 ).request("PUT", "/path") 193 194 request.assert_called_once_with( 195 "PUT", "http://example.com/path", payload=None, 196 headers=dict(Authorization="Key key"), 197 validate_certs=False, 198 ca_path="/ca", 199 ) 200 201 @pytest.mark.parametrize("status", [401, 403]) 202 def test_request_bad_credentials(self, status, mocker): 203 request = mocker.patch.object(http, "request") 204 request.return_value = http.Response(status, "data") 205 206 with pytest.raises(errors.SensuError, match="credentials"): 207 client.Client( 208 "http://example.com/", None, None, "key", True, None, 209 ).request("PUT", "/path", dict(some="payload")) 210 211 request.assert_called_once_with( 212 "PUT", "http://example.com/path", 213 payload=dict(some="payload"), 214 headers=dict(Authorization="Key key"), 215 validate_certs=True, 216 ca_path=None, 217 ) 218 219 220class TestGet: 221 def test_get(self, mocker): 222 c = client.Client( 223 "http://example.com/", "user", "pass", None, True, None, 224 ) 225 c.request = mocker.Mock() 226 227 c.get("/path") 228 229 c.request.assert_called_with("GET", "/path") 230 231 232class TestPut: 233 def test_put(self, mocker): 234 c = client.Client( 235 "http://example.com/", "user", "pass", None, True, None, 236 ) 237 c.request = mocker.Mock() 238 239 c.put("/path", {}) 240 241 c.request.assert_called_with("PUT", "/path", {}) 242 243 244class TestDelete: 245 def test_delete(self, mocker): 246 c = client.Client( 247 "http://example.com/", "user", "pass", None, True, None, 248 ) 249 c.request = mocker.Mock() 250 251 c.delete("/path") 252 253 c.request.assert_called_with("DELETE", "/path") 254 255 256class TestValidateAuthData: 257 def test_valid_creds(self, mocker): 258 request = mocker.patch.object(http, "request") 259 request.return_value = http.Response(200, None) 260 c = client.Client( 261 "http://example.com/", "user", "pass", None, True, None, 262 ) 263 264 result = c.validate_auth_data("check_user", "check_pass") 265 266 assert result 267 assert 1 == request.call_count 268 assert ("GET", "http://example.com/auth/test") == request.call_args[0] 269 assert "check_user" == request.call_args[1]["url_username"] 270 assert "check_pass" == request.call_args[1]["url_password"] 271 272 def test_invalid_creds(self, mocker): 273 request = mocker.patch.object(http, "request") 274 request.return_value = http.Response(401, None) 275 c = client.Client( 276 "http://example.com/", "user", "pass", None, True, None, 277 ) 278 279 result = c.validate_auth_data("check_user", "check_pass") 280 281 assert not result 282 assert 1 == request.call_count 283 assert ("GET", "http://example.com/auth/test") == request.call_args[0] 284 assert "check_user" == request.call_args[1]["url_username"] 285 assert "check_pass" == request.call_args[1]["url_password"] 286 287 def test_broken_backend(self, mocker): 288 request = mocker.patch.object(http, "request") 289 request.return_value = http.Response(500, None) 290 c = client.Client( 291 "http://example.com/", "user", "pass", None, True, None, 292 ) 293 294 with pytest.raises(errors.SensuError, match="500"): 295 c.validate_auth_data("check_user", "check_pass") 296