1import re
2import sys
3if sys.version_info >= (2, 7):
4    import unittest
5else:
6    try:
7        import unittest2 as unittest
8    except ImportError:
9        raise RuntimeError("unittest2 is required for Python < 2.7")
10import sys
11
12from twiggy import logger, outputs, levels, filters
13from twiggy.compat import StringIO
14import twiggy as _twiggy
15
16class LoggerTestBase(object):
17    """common tests for loggers"""
18    def test_fields_dict(self):
19        d={42:42}
20        log = self.log.fields_dict(d)
21        assert log is not self.log
22        assert log._fields == d
23        assert log._fields is not d
24
25        # we could do the same tests on options/min_level as done
26        # in test_clone, but that's starting to get redundant
27
28    def test_fields(self):
29        log = self.log.fields(a=42)
30        assert log is not self.log
31        assert log._fields == {'a':42}
32
33    def test_name(self):
34        log = self.log.name('bob')
35        assert log is not self.log
36        assert log._fields == {'name':'bob'}
37
38    def test_options(self):
39        log = self.log.options(suppress_newlines=True)
40        assert log is not self.log
41        assert log._options['suppress_newlines'] == True
42
43    def test_bad_options(self):
44        with self.assertRaises(ValueError):
45            log = self.log.options(boom=True)
46
47    def test_trace(self):
48        log = self.log.trace('error')
49        assert log is not self.log
50        assert log._options['trace'] == 'error'
51
52    def test_debug(self):
53        self.log.debug('hi')
54        assert len(self.messages) == 1
55        m = self.messages.pop()
56        assert m.text == 'hi'
57        assert m.fields['level'] == levels.DEBUG
58
59    def test_info(self):
60        self.log.info('hi')
61        assert len(self.messages) == 1
62        m = self.messages.pop()
63        assert m.text == 'hi'
64        assert m.fields['level'] == levels.INFO
65
66    def test_notice(self):
67        self.log.notice('hi')
68        assert len(self.messages) == 1
69        m = self.messages.pop()
70        assert m.text == 'hi'
71        assert m.fields['level'] == levels.NOTICE
72
73    def test_warning(self):
74        self.log.warning('hi')
75        assert len(self.messages) == 1, self.messages
76        m = self.messages.pop()
77        assert m.text == 'hi'
78        assert m.fields['level'] == levels.WARNING
79
80    def test_error(self):
81        self.log.error('hi')
82        assert len(self.messages) == 1
83        m = self.messages.pop()
84        assert m.text == 'hi'
85        assert m.fields['level'] == levels.ERROR
86
87    def test_critical(self):
88        self.log.critical('hi')
89        assert len(self.messages) == 1
90        m = self.messages.pop()
91        assert m.text == 'hi'
92        assert m.fields['level'] == levels.CRITICAL
93
94    def test_logger_min_level(self):
95        log = self.log.name('test_min_level')
96        log.min_level = levels.WARNING
97
98        log.warning('hi')
99        assert len(self.messages) == 1
100        m = self.messages.pop()
101        assert m.text == 'hi'
102
103        log.error('hi')
104        assert len(self.messages) == 1
105        m = self.messages.pop()
106        assert m.text == 'hi'
107
108        log.info('hi')
109        assert len(self.messages) == 0
110
111class InternalLoggerTest(LoggerTestBase, unittest.TestCase):
112
113    def setUp(self):
114        self.output = outputs.ListOutput(close_atexit = False)
115        self.messages = self.output.messages
116        self.log = logger.InternalLogger(output=self.output)
117
118    def tearDown(self):
119        self.output.close()
120
121    def test_clone(self):
122        log = self.log._clone()
123        assert log is not self.log
124        assert type(log) is type(self.log)
125        assert log.output is self.output
126
127        assert log._fields == self.log._fields
128        assert log._fields is not self.log._fields
129
130        assert log._options == self.log._options
131        assert log._options is not self.log._options
132
133        assert log.min_level == self.log.min_level
134
135    def test_trap_msg(self):
136        sio = StringIO()
137
138        def cleanup(stderr):
139            sys.stderr = stderr
140            sio.close()
141
142        self.addCleanup(cleanup, sys.stderr)
143        sys.stderr = sio
144
145        def go_boom():
146            raise RuntimeError("BOOM")
147
148        self.log.fields(func=go_boom).info('hi')
149
150        assert "BOOM" in sio.getvalue()
151        assert "Offending message: None" in sio.getvalue()
152        assert "Error in twiggy internal log! Something is serioulsy broken." in sio.getvalue()
153        assert "Traceback" in sio.getvalue()
154
155    def test_trap_output(self):
156        class BorkedOutput(outputs.ListOutput):
157
158            def _write(self, x):
159                raise RuntimeError("BORK")
160
161        out = BorkedOutput(close_atexit = False)
162
163        sio = StringIO()
164
165        def cleanup(stderr, output):
166            sys.stderr = stderr
167            sio.close()
168            self.log.output = output
169            out.close()
170
171        self.addCleanup(cleanup, sys.stderr, self.log.output)
172        sys.stderr = sio
173        self.log.output = out
174
175
176        self.log.fields().info('hi')
177
178        assert "BORK" in sio.getvalue()
179        assert "Offending message: <twiggy.message.Message object" in sio.getvalue()
180        assert "Error in twiggy internal log! Something is serioulsy broken." in sio.getvalue()
181        assert "Traceback" in sio.getvalue()
182
183class LoggerTestCase(LoggerTestBase, unittest.TestCase):
184
185    def setUp(self):
186        self.log = logger.Logger()
187        self.emitters = self.log._emitters
188        self.output = outputs.ListOutput(close_atexit = False)
189        self.emitters['*'] = filters.Emitter(levels.DEBUG, None, self.output)
190        self.messages = self.output.messages
191
192    def tearDown(self):
193        self.output.close()
194        self.emitters.clear()
195
196    def test_struct_dict(self):
197        d={42:42}
198        log = self.log.struct_dict(d)
199        assert len(self.messages) == 1
200        m = self.messages.pop()
201        self.assertDictContainsSubset(d, m.fields)
202        assert m.fields is not d
203        assert m.text == ""
204        assert m.level == levels.INFO
205
206        # we could do the same tests on options/min_level as done
207        # in test_clone, but that's starting to get redundant
208
209    def test_fields(self):
210        log = self.log.struct(x=42)
211        assert len(self.messages) == 1
212        m = self.messages.pop()
213        self.assertDictContainsSubset({'x':42}, m.fields)
214        assert m.text == ""
215        assert m.level == levels.INFO
216
217    def test_no_emitters(self):
218        self.emitters.clear()
219        self.log.debug('hi')
220        assert len(self.messages) == 0
221
222    def test_min_level_emitters(self):
223        self.emitters['*'].min_level = levels.INFO
224        self.log.debug('hi')
225        assert len(self.messages) == 0
226
227    def test_filter_emitters(self):
228        self.emitters['*'].filter = 'pants'
229        self.log.debug('hi')
230        assert len(self.messages) == 0
231
232    def test_logger_filter(self):
233        self.log.filter = lambda fmt_spec: 'pants' in fmt_spec
234        self.log.debug('hi')
235        assert len(self.messages) == 0
236
237        self.log.filter = lambda fmt_spec: 'hi' in fmt_spec
238        self.log.debug('hi')
239        assert len(self.messages) == 1
240        m = self.messages.pop()
241        assert m.text == 'hi'
242
243class LoggerTrapTestCase(unittest.TestCase):
244
245
246    def setUp(self):
247        self.internal_output = outputs.ListOutput(close_atexit = False)
248        self.internal_messages = self.internal_output.messages
249        _twiggy._populate_globals()
250        _twiggy.internal_log.output = self.internal_output
251
252        self.log = logger.Logger()
253        self.emitters = self.log._emitters
254        self.output = outputs.ListOutput(close_atexit = False)
255        self.emitters['everything'] = filters.Emitter(levels.DEBUG, None, self.output)
256        self.messages = self.output.messages
257
258
259    def tearDown(self):
260        self.internal_output.close()
261        _twiggy._del_globals()
262
263        self.output.close()
264        self.emitters.clear()
265
266    def test_bad_logger_filter(self):
267        def bad_filter(fmt_spec):
268            raise RuntimeError("THUNK")
269
270        self.log.filter = bad_filter
271
272        # a bad filter doesn't stop emitting
273        self.log.debug('hi')
274        assert len(self.messages) == 1
275        m = self.messages.pop()
276        assert m.text == 'hi'
277
278        assert len(self.internal_messages) == 1
279        m = self.internal_messages.pop()
280
281        print(m.text)
282        print(m.traceback)
283        print(_twiggy.internal_log._options)
284
285        assert m.level == levels.INFO
286        assert m.name == 'twiggy.internal'
287        assert "Traceback" in m.traceback
288        assert "THUNK" in m.traceback
289        assert "Error in Logger filtering" in m.text
290        assert re.search("<function .*bad_filter", m.text)
291
292    def test_trap_bad_msg(self):
293        def go_boom():
294            raise RuntimeError("BOOM")
295
296        self.log.fields(func=go_boom).info('hi')
297        assert len(self.messages) == 0
298
299        assert len(self.internal_messages) == 1
300        m = self.internal_messages.pop()
301
302        print(m.text)
303        print(m.traceback)
304        print(_twiggy.internal_log._options)
305
306        assert m.level == levels.INFO
307        assert m.name == 'twiggy.internal'
308        assert "Traceback" in m.traceback
309        assert "BOOM" in m.traceback
310        assert "Error formatting message" in m.text
311        assert re.search("<function .*go_boom", m.text)
312
313    def test_trap_output(self):
314        class BorkedOutput(outputs.ListOutput):
315
316            def _write(self, x):
317                raise RuntimeError("BORK")
318
319        out = BorkedOutput(close_atexit = False)
320
321        def cleanup(output):
322            try:
323                del self.emitters['before']
324            except KeyError:
325                pass
326
327            out.close()
328
329        self.addCleanup(cleanup, out)
330
331        self.emitters['before'] = filters.Emitter(levels.DEBUG, None, out)
332
333        self.log.fields().info('hi')
334
335        assert len(self.messages) == 1
336        m = self.messages.pop()
337        assert m.text == "hi"
338
339        assert len(self.internal_messages) == 1
340        m = self.internal_messages.pop()
341
342        print(m.text)
343        print(m.traceback)
344
345        assert m.level == levels.WARNING
346        assert re.search(
347            "Error outputting with <tests.test_logger.*BorkedOutput",
348            m.text)
349        assert "Traceback" in m.traceback
350        assert "BORK" in m.traceback
351
352
353    def test_trap_filter(self):
354
355        out = outputs.ListOutput(close_atexit = False)
356
357        def cleanup(output):
358            try:
359                del self.emitters['before']
360            except KeyError:
361                pass
362
363            out.close()
364
365        self.addCleanup(cleanup, out)
366
367        def go_boom(msg):
368            raise RuntimeError("BOOM")
369
370        self.emitters['before'] = filters.Emitter(levels.DEBUG, go_boom, out)
371
372        self.log.fields().info('hi')
373
374        # errors in filtering cause messages to be output anyway
375        assert len(out.messages) == 1
376        m1 = out.messages.pop()
377        assert m1.text == "hi"
378
379        assert len(self.messages) == 1
380        m2 = self.messages.pop()
381        assert m2.text == "hi"
382
383        assert m1 is m2
384
385        assert len(self.internal_messages) == 1
386        m = self.internal_messages.pop()
387
388        print(m.text)
389        print(m.traceback)
390
391        assert m.level == levels.INFO
392        assert "Error filtering with emitter before" in m.text
393        assert re.search("<function .*go_boom", m.text)
394        assert "Traceback" in m.traceback
395        assert "BOOM" in m.traceback
396
397
398
399