1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from datetime import datetime
16from datetime import timedelta
17from datetime import timezone
18
19import logging
20import unittest
21
22import mock
23
24
25class Test_entry_from_resource(unittest.TestCase):
26    @staticmethod
27    def _call_fut(resource, client, loggers):
28        from google.cloud.logging_v2._helpers import entry_from_resource
29
30        return entry_from_resource(resource, client, loggers)
31
32    def _payload_helper(self, key, class_name):
33        import mock
34
35        resource = {}
36        if key is not None:
37            resource[key] = "yup"
38        client = object()
39        loggers = {}
40        mock_class = EntryMock()
41
42        name = "google.cloud.logging_v2._helpers." + class_name
43        with mock.patch(name, new=mock_class):
44            result = self._call_fut(resource, client, loggers)
45
46        self.assertIs(result, mock_class.sentinel)
47        self.assertEqual(mock_class.called, (resource, client, loggers))
48
49    def test_wo_payload(self):
50        self._payload_helper(None, "LogEntry")
51
52    def test_text_payload(self):
53        self._payload_helper("textPayload", "TextEntry")
54
55    def test_json_payload(self):
56        self._payload_helper("jsonPayload", "StructEntry")
57
58    def test_proto_payload(self):
59        self._payload_helper("protoPayload", "ProtobufEntry")
60
61
62class Test_retrieve_metadata_server(unittest.TestCase):
63    @staticmethod
64    def _call_fut(metadata_key):
65        from google.cloud.logging_v2._helpers import retrieve_metadata_server
66
67        return retrieve_metadata_server(metadata_key)
68
69    def test_metadata_exists(self):
70        status_code_ok = 200
71        response_text = "my-gke-cluster"
72        metadata_key = "test_key"
73
74        response_mock = ResponseMock(status_code=status_code_ok)
75        response_mock.text = response_text
76
77        requests_mock = mock.Mock()
78        requests_mock.get.return_value = response_mock
79        requests_mock.codes.ok = status_code_ok
80
81        patch = mock.patch("google.cloud.logging_v2._helpers.requests", requests_mock)
82
83        with patch:
84            metadata = self._call_fut(metadata_key)
85
86        self.assertEqual(metadata, response_text)
87
88    def test_metadata_does_not_exist(self):
89        status_code_ok = 200
90        status_code_not_found = 404
91        metadata_key = "test_key"
92
93        response_mock = ResponseMock(status_code=status_code_not_found)
94
95        requests_mock = mock.Mock()
96        requests_mock.get.return_value = response_mock
97        requests_mock.codes.ok = status_code_ok
98
99        patch = mock.patch("google.cloud.logging_v2._helpers.requests", requests_mock)
100
101        with patch:
102            metadata = self._call_fut(metadata_key)
103
104        self.assertIsNone(metadata)
105
106    def test_request_exception(self):
107        import requests
108
109        metadata_key = "test_url_cannot_connect"
110        metadata_url = "http://metadata.invalid/"
111
112        requests_get_mock = mock.Mock(spec=["__call__"])
113        requests_get_mock.side_effect = requests.exceptions.RequestException
114
115        requests_get_patch = mock.patch("requests.get", requests_get_mock)
116
117        url_patch = mock.patch(
118            "google.cloud.logging_v2._helpers.METADATA_URL", new=metadata_url
119        )
120
121        with requests_get_patch:
122            with url_patch:
123                metadata = self._call_fut(metadata_key)
124
125        self.assertIsNone(metadata)
126
127
128class Test__normalize_severity(unittest.TestCase):
129    @staticmethod
130    def _stackdriver_severity():
131        from google.cloud.logging_v2._helpers import LogSeverity
132
133        return LogSeverity
134
135    def _normalize_severity_helper(self, stdlib_level, enum_level):
136        from google.cloud.logging_v2._helpers import _normalize_severity
137
138        self.assertEqual(_normalize_severity(stdlib_level), enum_level)
139
140    def test__normalize_severity_critical(self):
141        severity = self._stackdriver_severity()
142        self._normalize_severity_helper(logging.CRITICAL, severity.CRITICAL)
143
144    def test__normalize_severity_error(self):
145        severity = self._stackdriver_severity()
146        self._normalize_severity_helper(logging.ERROR, severity.ERROR)
147
148    def test__normalize_severity_warning(self):
149        severity = self._stackdriver_severity()
150        self._normalize_severity_helper(logging.WARNING, severity.WARNING)
151
152    def test__normalize_severity_info(self):
153        severity = self._stackdriver_severity()
154        self._normalize_severity_helper(logging.INFO, severity.INFO)
155
156    def test__normalize_severity_debug(self):
157        severity = self._stackdriver_severity()
158        self._normalize_severity_helper(logging.DEBUG, severity.DEBUG)
159
160    def test__normalize_severity_notset(self):
161        severity = self._stackdriver_severity()
162        self._normalize_severity_helper(logging.NOTSET, severity.DEFAULT)
163
164    def test__normalize_severity_non_standard(self):
165        unknown_level = 35
166        self._normalize_severity_helper(unknown_level, unknown_level)
167
168
169class Test__add_defaults_to_filter(unittest.TestCase):
170    @staticmethod
171    def _time_format():
172        return "%Y-%m-%dT%H:%M:%S.%f%z"
173
174    @staticmethod
175    def _add_defaults_to_filter(filter_):
176        from google.cloud.logging_v2._helpers import _add_defaults_to_filter
177
178        return _add_defaults_to_filter(filter_)
179
180    def test_filter_defaults_empty_input(self):
181        """Filter should default to return logs < 24 hours old"""
182        out_filter = self._add_defaults_to_filter(None)
183        timestamp = datetime.strptime(
184            out_filter, 'timestamp>="' + self._time_format() + '"'
185        )
186        yesterday = datetime.now(timezone.utc) - timedelta(days=1)
187        self.assertLess(yesterday - timestamp, timedelta(minutes=1))
188
189    def test_filter_defaults_no_timestamp(self):
190        """Filter should append 24 hour timestamp filter to input string"""
191        test_inputs = [
192            "",
193            "  ",
194            "logName=/projects/test/test",
195            "test1 AND test2 AND test3",
196            "time AND stamp  ",
197        ]
198        for in_filter in test_inputs:
199            out_filter = self._add_defaults_to_filter(in_filter)
200            self.assertTrue(in_filter in out_filter)
201            self.assertTrue("timestamp" in out_filter)
202
203            timestamp = datetime.strptime(
204                out_filter, in_filter + ' AND timestamp>="' + self._time_format() + '"'
205            )
206            yesterday = datetime.now(timezone.utc) - timedelta(days=1)
207            self.assertLess(yesterday - timestamp, timedelta(minutes=1))
208
209    def test_filter_defaults_only_timestamp(self):
210        """If user inputs a timestamp filter, don't add default"""
211        in_filter = "timestamp=test"
212        out_filter = self._add_defaults_to_filter(in_filter)
213        self.assertEqual(in_filter, out_filter)
214
215    def test_filter_defaults_capitalized_timestamp(self):
216        """Should work with capitalized timestamp strings"""
217        in_filter = "TIMESTAMP=test"
218        out_filter = self._add_defaults_to_filter(in_filter)
219        self.assertEqual(in_filter, out_filter)
220
221
222class EntryMock(object):
223    def __init__(self):
224        self.sentinel = object()
225        self.called = None
226
227    def from_api_repr(self, resource, client, loggers):
228        self.called = (resource, client, loggers)
229        return self.sentinel
230
231
232class ResponseMock(object):
233    def __init__(self, status_code, text="test_response_text"):
234        self.status_code = status_code
235        self.text = text
236