1import pytest 2import salt.states.grafana_datasource as grafana_datasource 3from tests.support.mock import MagicMock, Mock, patch 4 5profile = { 6 "grafana_url": "http://grafana", 7 "grafana_token": "token", 8} 9 10 11def mock_json_response(data): 12 response = MagicMock() 13 response.json = MagicMock(return_value=data) 14 return Mock(return_value=response) 15 16 17@pytest.fixture 18def configure_loader_modules(): 19 return {grafana_datasource: {}} 20 21 22def test_present(): 23 with patch("requests.get", mock_json_response([])): 24 with patch("requests.post") as rpost: 25 ret = grafana_datasource.present("test", "type", "url", profile=profile) 26 rpost.assert_called_once_with( 27 "http://grafana/api/datasources", 28 grafana_datasource._get_json_data("test", "type", "url"), 29 headers={ 30 "Authorization": "Bearer token", 31 "Accept": "application/json", 32 }, 33 timeout=3, 34 ) 35 assert ret["result"] 36 assert ret["comment"] == "New data source test added" 37 38 data = grafana_datasource._get_json_data("test", "type", "url") 39 data.update({"id": 1, "orgId": 1}) 40 with patch("requests.get", mock_json_response([data])): 41 with patch("requests.put") as rput: 42 ret = grafana_datasource.present("test", "type", "url", profile=profile) 43 rput.assert_called_once_with( 44 "http://grafana/api/datasources/1", 45 grafana_datasource._get_json_data("test", "type", "url"), 46 headers={ 47 "Authorization": "Bearer token", 48 "Accept": "application/json", 49 }, 50 timeout=3, 51 ) 52 assert ret["result"] 53 assert ret["comment"] == "Data source test already up-to-date" 54 assert ret["changes"] == {} 55 56 with patch("requests.put") as rput: 57 ret = grafana_datasource.present("test", "type", "newurl", profile=profile) 58 rput.assert_called_once_with( 59 "http://grafana/api/datasources/1", 60 grafana_datasource._get_json_data("test", "type", "newurl"), 61 headers={ 62 "Authorization": "Bearer token", 63 "Accept": "application/json", 64 }, 65 timeout=3, 66 ) 67 assert ret["result"] 68 assert ret["comment"] == "Data source test updated" 69 assert ret["changes"] == {"old": {"url": "url"}, "new": {"url": "newurl"}} 70 71 72def test_absent(): 73 with patch("requests.get", mock_json_response([])): 74 with patch("requests.delete") as rdelete: 75 ret = grafana_datasource.absent("test", profile=profile) 76 assert rdelete.call_count == 0 77 assert ret["result"] 78 assert ret["comment"] == "Data source test already absent" 79 80 with patch("requests.get", mock_json_response([{"name": "test", "id": 1}])): 81 with patch("requests.delete") as rdelete: 82 ret = grafana_datasource.absent("test", profile=profile) 83 rdelete.assert_called_once_with( 84 "http://grafana/api/datasources/1", 85 headers={ 86 "Authorization": "Bearer token", 87 "Accept": "application/json", 88 }, 89 timeout=3, 90 ) 91 assert ret["result"] 92 assert ret["comment"] == "Data source test was deleted" 93