1
2import os, sys, json, time, bz2, base64, re
3import six
4import mock
5from io import StringIO
6from zope.interface import implementer
7from twisted.trial import unittest
8from twisted.application import service
9from twisted.internet import defer, reactor
10from twisted.internet.defer import inlineCallbacks, returnValue
11try:
12    from twisted import logger as twisted_logger
13except ImportError:
14    twisted_logger = None
15from twisted.web import client
16from twisted.python import log as twisted_log
17from twisted.python import failure, runtime, usage
18import foolscap
19from foolscap.logging import gatherer, log, tail, incident, cli, web, \
20     publish, dumper, flogfile
21from foolscap.logging.interfaces import RILogObserver
22from foolscap.util import format_time, allocate_tcp_port, ensure_dict_str
23from foolscap.eventual import fireEventually, flushEventualQueue
24from foolscap.tokens import NoLocationError
25from foolscap.test.common import PollMixin, StallMixin
26from foolscap.api import RemoteException, Referenceable, Tub
27
28
29class Basic(unittest.TestCase):
30    def testLog(self):
31        l = log.FoolscapLogger()
32        l.explain_facility("ui", "this terse string fully describes the gui")
33        l.msg("one")
34        l.msg("two")
35        l.msg(message="three")
36        l.msg("one=%d, two=%d", 1, 2)
37        l.msg("survive 100% of weird inputs")
38        l.msg(format="foo=%(foo)s, bar=%(bar)s", foo="foo", bar="bar")
39        l.msg() # useless, but make sure it doesn't crash
40        l.msg("ui message", facility="ui")
41        l.msg("so boring it won't even be generated", level=log.NOISY-1)
42        l.msg("blah blah", level=log.NOISY)
43        l.msg("opening file", level=log.OPERATIONAL)
44        l.msg("funny, that doesn't usually happen", level=log.UNUSUAL)
45        l.msg("configuration change noticed", level=log.INFREQUENT)
46        l.msg("error, but recoverable", level=log.CURIOUS)
47        l.msg("ok, that shouldn't have happened", level=log.WEIRD)
48        l.msg("hash doesn't match.. what the hell?", level=log.SCARY)
49        l.msg("I looked into the trap, ray", level=log.BAD)
50
51    def testStacktrace(self):
52        l = log.FoolscapLogger()
53        l.msg("how did we get here?", stacktrace=True)
54
55    def testFailure(self):
56        l = log.FoolscapLogger()
57        f1 = failure.Failure(ValueError("bad value"))
58        l.msg("failure1", failure=f1)
59        # real RemoteExceptions always wrap CopiedFailure, so this is not
60        # really accurate. However, it's a nuisance to create a real
61        # CopiedFailure: look in
62        # test_call.ExamineFailuresMixin._examine_raise for test code that
63        # exercises this properly.
64        f2 = failure.Failure(RemoteException(f1))
65        l.msg("failure2", failure=f2)
66
67    def testParent(self):
68        l = log.FoolscapLogger()
69        p1 = l.msg("operation requested", level=log.OPERATIONAL)
70        l.msg("first step", level=log.NOISY, parent=p1)
71        l.msg("second step", level=log.NOISY, parent=p1)
72        l.msg("second step EXPLODED", level=log.WEIRD, parent=p1)
73        p2 = l.msg("third step", parent=p1)
74        l.msg("fourth step", parent=p1)
75        l.msg("third step deferred activity finally completed", parent=p2)
76        l.msg("operation complete", level=log.OPERATIONAL, parent=p1)
77        l.msg("override number, for some unknown reason", num=45)
78
79    def testTheLogger(self):
80        log.msg("This goes to the One True Logger")
81
82    def testTubLogger(self):
83        t = Tub()
84        t.log("this goes into the tub")
85
86class Advanced(unittest.TestCase):
87
88    def testObserver(self):
89        l = log.FoolscapLogger()
90        out = []
91        l.addObserver(out.append)
92        l.set_generation_threshold(log.OPERATIONAL)
93        l.msg("one")
94        l.msg("two")
95        l.msg("ignored", level=log.NOISY)
96        d = fireEventually()
97        def _check(res):
98            self.assertEqual(len(out), 2)
99            self.assertEqual(out[0]["message"], "one")
100            self.assertEqual(out[1]["message"], "two")
101        d.addCallback(_check)
102        return d
103
104    def testFileObserver(self):
105        basedir = "logging/Advanced/FileObserver"
106        os.makedirs(basedir)
107        l = log.FoolscapLogger()
108        fn = os.path.join(basedir, "observer-log.out")
109        ob = log.LogFileObserver(fn)
110        l.addObserver(ob.msg)
111        l.msg("one")
112        l.msg("two")
113        d = fireEventually()
114        def _check(res):
115            l.removeObserver(ob.msg)
116            ob._logFile.close()
117            f = open(fn, "rb")
118            expected_magic = f.read(len(flogfile.MAGIC))
119            self.assertEqual(expected_magic, flogfile.MAGIC)
120            events = []
121            for line in f:
122                events.append(json.loads(line.decode("utf-8")))
123            self.assertEqual(len(events), 3)
124            self.assertEqual(events[0]["header"]["type"],
125                                 "log-file-observer")
126            self.assertEqual(events[0]["header"]["threshold"],
127                                 log.OPERATIONAL)
128            self.assertEqual(events[1]["from"], "local")
129            self.assertEqual(events[2]["d"]["message"], "two")
130        d.addCallback(_check)
131        return d
132
133    def testDisplace(self):
134        l = log.FoolscapLogger()
135        l.set_buffer_size(log.OPERATIONAL, 3)
136        l.msg("one")
137        l.msg("two")
138        l.msg("three")
139        items = l.buffers[None][log.OPERATIONAL]
140        self.assertEqual(len(items), 3)
141        l.msg("four") # should displace "one"
142        self.assertEqual(len(items), 3)
143        m0 = items[0]
144        self.assertEqual(type(m0), dict)
145        self.assertEqual(m0['message'], "two")
146        self.assertEqual(items[-1]['message'], "four")
147
148    def testFacilities(self):
149        l = log.FoolscapLogger()
150        l.explain_facility("ui", "This is the UI.")
151        l.msg("one", facility="ui")
152        l.msg("two")
153
154        items = l.buffers["ui"][log.OPERATIONAL]
155        self.assertEqual(len(items), 1)
156        self.assertEqual(items[0]["message"], "one")
157
158    def testOnePriority(self):
159        l = log.FoolscapLogger()
160        l.msg("one", level=log.NOISY)
161        l.msg("two", level=log.WEIRD)
162        l.msg("three", level=log.NOISY)
163
164        items = l.buffers[None][log.NOISY]
165        self.assertEqual(len(items), 2)
166        self.assertEqual(items[0]['message'], "one")
167        self.assertEqual(items[1]['message'], "three")
168
169        items = l.buffers[None][log.WEIRD]
170        self.assertEqual(len(items), 1)
171        self.assertEqual(items[0]['message'], "two")
172
173    def testPriorities(self):
174        l = log.FoolscapLogger()
175        l.set_buffer_size(log.NOISY, 3)
176        l.set_buffer_size(log.WEIRD, 3)
177        l.set_buffer_size(log.WEIRD, 4, "new.facility")
178
179        l.msg("one", level=log.WEIRD)
180        l.msg("two", level=log.NOISY)
181        l.msg("three", level=log.NOISY)
182        l.msg("four", level=log.WEIRD)
183        l.msg("five", level=log.NOISY)
184        l.msg("six", level=log.NOISY)
185        l.msg("seven", level=log.NOISY)
186
187        items = l.buffers[None][log.NOISY]
188        self.assertEqual(len(items), 3)
189        self.assertEqual(items[0]['message'], "five")
190        self.assertEqual(items[-1]['message'], "seven")
191
192        items = l.buffers[None][log.WEIRD]
193        self.assertEqual(len(items), 2)
194        self.assertEqual(items[0]['message'], "one")
195        self.assertEqual(items[-1]['message'], "four")
196
197    def testHierarchy(self):
198        l = log.FoolscapLogger()
199
200        n = l.msg("one")
201        n2 = l.msg("two", parent=n)
202        l.msg("three", parent=n2)
203
204class ErrorfulQualifier(incident.IncidentQualifier):
205    def __init__(self):
206        self._first = True
207
208    def check_event(self, ev):
209        if self._first:
210            self._first = False
211            raise ValueError("oops")
212        return False
213
214class NoStdio(unittest.TestCase):
215    # bug #244 is caused, in part, by Foolscap-side logging failures which
216    # write an error message ("unable to serialize X") to stderr, which then
217    # gets captured by twisted's logging (when run in a program under
218    # twistd), then fed back into foolscap logging. Check that unserializable
219    # objects don't cause anything to be written to a mock stdout/stderr
220    # object.
221    #
222    # FoolscapLogger used stdio in two places:
223    # * msg() when format_message() throws
224    # * add_event() when IncidentQualifier.event() throws
225
226    def setUp(self):
227        self.fl = log.FoolscapLogger()
228        self.mock_stdout = StringIO()
229        self.mock_stderr = StringIO()
230        self.orig_stdout = sys.stdout
231        self.orig_stderr = sys.stderr
232        sys.stdout = self.mock_stdout
233        sys.stderr = self.mock_stderr
234
235    def tearDown(self):
236        sys.stdout = self.orig_stdout
237        sys.stderr = self.orig_stderr
238
239    def check_stdio(self):
240        self.assertEqual(self.mock_stdout.getvalue(), "")
241        self.assertEqual(self.mock_stderr.getvalue(), "")
242
243    def test_unformattable(self):
244        self.fl.msg(format="one=%(unformattable)s") # missing format key
245        self.check_stdio()
246
247    def test_unserializable_incident(self):
248        # one #244 pathway involved an unserializable event that caused an
249        # exception during IncidentReporter.incident_declared(), as it tried
250        # to record all recent events. We can test the lack of stdio by using
251        # a qualifier that throws an error directly.
252        self.fl.setIncidentQualifier(ErrorfulQualifier())
253        self.fl.activate_incident_qualifier()
254        # make sure we set it up correctly
255        self.assertTrue(self.fl.active_incident_qualifier)
256        self.fl.msg("oops", arg=lambda : "lambdas are unserializable",
257                    level=log.BAD)
258        self.check_stdio()
259        # The internal error will cause a new "metaevent" to be recorded. The
260        # original event may or may not get recorded first, depending upon
261        # the error (i.e. does it happen before or after buffer.append is
262        # called). Also, get_buffered_events() is unordered. So search for
263        # the right one.
264        events = [e for e in self.fl.get_buffered_events()
265                  if e.get("facility") == "foolscap/internal-error"]
266        self.assertEqual(len(events), 1)
267        m = events[0]["message"]
268        expected = "internal error in log._msg, args=('oops',)"
269        self.assertTrue(m.startswith(expected), m)
270        self.assertIn("ValueError('oops'", m)
271
272def ser(what):
273    return json.dumps(what, cls=flogfile.ExtendedEncoder)
274
275class Serialization(unittest.TestCase):
276    def test_lazy_serialization(self):
277        # Both foolscap and twisted allow (somewhat) arbitrary kwargs in the
278        # log.msg() call. Twisted will either discard the event (if nobody is
279        # listening), or stringify it right away.
280        #
281        # Foolscap does neither. It records the event (kwargs and all) in a
282        # circular buffer, so a later observer can learn about them (either
283        # 'flogtool tail' or a stored Incident file). And it stores the
284        # arguments verbatim, leaving stringification to the future observer
285        # (if they want it), so tools can filter events without using regexps
286        # or parsing prematurely-flattened strings.
287        #
288        # Test this by logging a mutable object, modifying it, then checking
289        # the buffer. We expect to see the modification.
290        fl = log.FoolscapLogger()
291        mutable = {"key": "old"}
292        fl.msg("one", arg=mutable)
293        mutable["key"] = "new"
294        events = list(fl.get_buffered_events())
295        self.assertTrue(events[0]["arg"]["key"], "new")
296
297    def test_failure(self):
298        try:
299            raise ValueError("oops5")
300        except ValueError:
301            f = failure.Failure()
302        out = json.loads(ser({"f": f}))["f"]
303        self.assertEqual(out["@"], "Failure")
304        self.assertIn("ValueError: oops5", out["repr"])
305        self.assertIn("traceback", out)
306
307    def test_unserializable(self):
308        # The code that serializes log events to disk (with JSON) tries very
309        # hard to get *something* recorded, even when you give log.msg()
310        # something strange.
311        self.assertEqual(json.loads(ser({"a": 1})), {"a": 1})
312        unjsonable = [set([1,2])]
313        self.assertEqual(json.loads(ser(unjsonable)),
314                         [{'@': 'UnJSONable',
315                           'repr': repr(set([1, 2])),
316                           'message': "log.msg() was given an object that could not be encoded into JSON. I've replaced it with this UnJSONable object. The object's repr is in .repr"}])
317
318        # if the repr() fails, we get a different message
319        class Unreprable:
320            def __repr__(self):
321                raise ValueError("oops7")
322        unrep = [Unreprable()]
323        self.assertEqual(json.loads(ser(unrep)),
324                         [{"@": "Unreprable",
325                           "exception_repr": repr(ValueError('oops7')),
326                           "message": "log.msg() was given an object that could not be encoded into JSON, and when I tried to repr() it I got an error too. I've put the repr of the exception in .exception_repr",
327                           }])
328
329        # and if repr()ing the failed repr() exception fails, we give up
330        real_repr = repr
331        def really_bad_repr(o):
332            if isinstance(o, ValueError):
333                raise TypeError("oops9")
334            return real_repr(o)
335        if six.PY2:
336            import __builtin__
337            assert __builtin__.repr is repr
338            with mock.patch("__builtin__.repr", really_bad_repr):
339                s = ser(unrep)
340        else:
341            import builtins
342            assert builtins.repr is repr
343            with mock.patch("builtins.repr", really_bad_repr):
344                s = ser(unrep)
345        self.assertEqual(json.loads(s),
346                         [{"@": "ReallyUnreprable",
347                           "message": "log.msg() was given an object that could not be encoded into JSON, and when I tried to repr() it I got an error too. That exception wasn't repr()able either. I give up. Good luck.",
348                           }])
349
350    def test_not_pickle(self):
351        # Older versions of Foolscap used pickle to store events into the
352        # Incident log, and dealt with errors by dropping the event. Newer ones
353        # use JSON, and use a placeholder when errors occur. Test that
354        # pickleable (but not JSON-able) objects are *not* written to the file
355        # directly, but are replaced by an "unjsonable" placeholder.
356        basedir = "logging/Serialization/not_pickle"
357        os.makedirs(basedir)
358        fl = log.FoolscapLogger()
359        ir = incident.IncidentReporter(basedir, fl, "tubid")
360        ir.TRAILING_DELAY = None
361        fl.msg("first")
362        unjsonable = [object()] # still picklable
363        unserializable = [lambda: "neither pickle nor JSON can capture me"]
364        # having unserializble data in the logfile should not break the rest
365        fl.msg("unjsonable", arg=unjsonable)
366        fl.msg("unserializable", arg=unserializable)
367        fl.msg("last")
368        events = list(fl.get_buffered_events())
369        # if unserializable data breaks incident reporting, this
370        # incident_declared() call will cause an exception
371        ir.incident_declared(events[0])
372        # that won't record any trailing events, but does
373        # eventually(finished_Recording), so wait for that to conclude
374        d = flushEventualQueue()
375        def _check(_):
376            files = os.listdir(basedir)
377            self.assertEqual(len(files), 1)
378            fn = os.path.join(basedir, files[0])
379            events = list(flogfile.get_events(fn))
380            self.assertEqual(events[0]["header"]["type"], "incident")
381            self.assertEqual(events[1]["d"]["message"], "first")
382            self.assertEqual(len(events), 5)
383            # actually this should record 5 events: both unrecordable events
384            # should be replaced with error messages that *are* recordable
385            self.assertEqual(events[2]["d"]["message"], "unjsonable")
386            self.assertEqual(events[2]["d"]["arg"][0]["@"], "UnJSONable")
387            self.assertEqual(events[3]["d"]["message"], "unserializable")
388            self.assertEqual(events[3]["d"]["arg"][0]["@"], "UnJSONable")
389            self.assertEqual(events[4]["d"]["message"], "last")
390        d.addCallback(_check)
391        return d
392
393class SuperstitiousQualifier(incident.IncidentQualifier):
394    def check_event(self, ev):
395        if "thirteen" in ev.get("message", ""):
396            return True
397        return False
398
399class ImpatientReporter(incident.IncidentReporter):
400    TRAILING_DELAY = 1.0
401    TRAILING_EVENT_LIMIT = 3
402
403class NoFollowUpReporter(incident.IncidentReporter):
404    TRAILING_DELAY = None
405
406class LogfileReaderMixin:
407    def _read_logfile(self, fn):
408        return list(flogfile.get_events(fn))
409
410class Incidents(unittest.TestCase, PollMixin, LogfileReaderMixin):
411    def test_basic(self):
412        l = log.FoolscapLogger()
413        self.assertEqual(l.incidents_declared, 0)
414        # no qualifiers are run until a logdir is provided
415        l.msg("one", level=log.BAD)
416        self.assertEqual(l.incidents_declared, 0)
417        l.setLogDir("logging/Incidents/basic")
418        l.setLogDir("logging/Incidents/basic") # this should be idempotent
419        got_logdir = l.logdir
420        self.assertEqual(got_logdir,
421                             os.path.abspath("logging/Incidents/basic"))
422        # qualifiers should be run now
423        l.msg("two")
424        l.msg("3-trigger", level=log.BAD)
425        self.assertEqual(l.incidents_declared, 1)
426        self.assertTrue(l.get_active_incident_reporter())
427        # at this point, the uncompressed logfile should be present, and it
428        # should contain all the events up to and including the trigger
429        files = os.listdir(got_logdir)
430        self.assertEqual(len(files), 2)
431        # the uncompressed one will sort earlier, since it lacks the .bz2
432        # extension
433        files.sort()
434        self.assertEqual(files[0] + ".bz2.tmp", files[1])
435        # unix systems let us look inside the uncompressed file while it's
436        # still being written to by the recorder
437        if runtime.platformType == "posix":
438            events = self._read_logfile(os.path.join(got_logdir, files[0]))
439            self.assertEqual(len(events), 1+3)
440            #header = events[0]
441            self.assertTrue("header" in events[0])
442            self.assertEqual(events[0]["header"]["trigger"]["message"],
443                                 "3-trigger")
444            self.assertEqual(events[0]["header"]["versions"]["foolscap"],
445                                 foolscap.__version__)
446            self.assertEqual(events[3]["d"]["message"], "3-trigger")
447
448        l.msg("4-trailing")
449        # this will take 5 seconds to finish trailing events
450        d = self.poll(lambda: bool(l.incidents_recorded), 1.0)
451        def _check(res):
452            self.assertEqual(len(l.recent_recorded_incidents), 1)
453            fn = l.recent_recorded_incidents[0]
454            events = self._read_logfile(fn)
455            self.assertEqual(len(events), 1+4)
456            self.assertTrue("header" in events[0])
457            self.assertEqual(events[0]["header"]["trigger"]["message"],
458                                 "3-trigger")
459            self.assertEqual(events[0]["header"]["versions"]["foolscap"],
460                                 foolscap.__version__)
461            self.assertEqual(events[3]["d"]["message"], "3-trigger")
462            self.assertEqual(events[4]["d"]["message"], "4-trailing")
463
464        d.addCallback(_check)
465        return d
466
467    def test_qualifier1(self):
468        l = log.FoolscapLogger()
469        l.setIncidentQualifier(SuperstitiousQualifier())
470        l.setLogDir("logging/Incidents/qualifier1")
471        l.msg("1", level=log.BAD)
472        self.assertEqual(l.incidents_declared, 0)
473
474    def test_qualifier2(self):
475        l = log.FoolscapLogger()
476        # call them in the other order
477        l.setLogDir("logging/Incidents/qualifier2")
478        l.setIncidentQualifier(SuperstitiousQualifier())
479        l.msg("1", level=log.BAD)
480        self.assertEqual(l.incidents_declared, 0)
481
482    def test_customize(self):
483        l = log.FoolscapLogger()
484        l.setIncidentQualifier(SuperstitiousQualifier())
485        l.setLogDir("logging/Incidents/customize")
486        # you set the reporter *class*, not an instance
487        bad_ir = ImpatientReporter("basedir", "logger", "tubid")
488        self.assertRaises((AssertionError, TypeError),
489                              l.setIncidentReporterFactory, bad_ir)
490        l.setIncidentReporterFactory(ImpatientReporter)
491        l.msg("1", level=log.BAD)
492        self.assertEqual(l.incidents_declared, 0)
493        l.msg("2")
494        l.msg("thirteen is scary")
495        self.assertEqual(l.incidents_declared, 1)
496        l.msg("4")
497        l.msg("5")
498        l.msg("6") # this should hit the trailing event limit
499        l.msg("7") # this should not be recorded
500        d = self.poll(lambda: bool(l.incidents_recorded), 1.0)
501        def _check(res):
502            self.assertEqual(len(l.recent_recorded_incidents), 1)
503            fn = l.recent_recorded_incidents[0]
504            events = self._read_logfile(fn)
505            self.assertEqual(len(events), 1+6)
506            self.assertEqual(events[-1]["d"]["message"], "6")
507        d.addCallback(_check)
508        return d
509
510    def test_overlapping(self):
511        l = log.FoolscapLogger()
512        l.setLogDir("logging/Incidents/overlapping")
513        got_logdir = l.logdir
514        self.assertEqual(got_logdir,
515                             os.path.abspath("logging/Incidents/overlapping"))
516        d = defer.Deferred()
517        def _go(name, trigger):
518            d.callback( (name, trigger) )
519        l.addImmediateIncidentObserver(_go)
520        l.setIncidentReporterFactory(ImpatientReporter)
521        l.msg("1")
522        l.msg("2-trigger", level=log.BAD)
523        self.assertEqual(l.incidents_declared, 1)
524        self.assertTrue(l.get_active_incident_reporter())
525        l.msg("3-trigger", level=log.BAD)
526        self.assertEqual(l.incidents_declared, 2)
527        self.assertTrue(l.get_active_incident_reporter())
528
529        def _check(res):
530            self.assertEqual(l.incidents_recorded, 1)
531            self.assertEqual(len(l.recent_recorded_incidents), 1)
532            # at this point, the logfile should be present, and it should
533            # contain all the events up to and including both triggers
534
535            files = os.listdir(got_logdir)
536            self.assertEqual(len(files), 1)
537            events = self._read_logfile(os.path.join(got_logdir, files[0]))
538
539            self.assertEqual(len(events), 1+3)
540            self.assertEqual(events[0]["header"]["trigger"]["message"],
541                                 "2-trigger")
542            self.assertEqual(events[1]["d"]["message"], "1")
543            self.assertEqual(events[2]["d"]["message"], "2-trigger")
544            self.assertEqual(events[3]["d"]["message"], "3-trigger")
545        d.addCallback(_check)
546
547        return d
548
549    def test_classify(self):
550        l = log.FoolscapLogger()
551        l.setIncidentReporterFactory(incident.NonTrailingIncidentReporter)
552        l.setLogDir("logging/Incidents/classify")
553        got_logdir = l.logdir
554        l.msg("foom", level=log.BAD, failure=failure.Failure(RuntimeError()))
555        d = fireEventually()
556        def _check(res):
557            files = [fn for fn in os.listdir(got_logdir) if fn.endswith(".bz2")]
558            self.assertEqual(len(files), 1)
559
560            ic = incident.IncidentClassifier()
561            def classify_foom(trigger):
562                if "foom" in trigger.get("message",""):
563                    return "foom"
564            ic.add_classifier(classify_foom)
565            options = incident.ClassifyOptions()
566            options.parseOptions([os.path.join(got_logdir, fn) for fn in files])
567            options.stdout = StringIO()
568            ic.run(options)
569            out = options.stdout.getvalue()
570            self.assertTrue(out.strip().endswith(": foom"), out)
571
572            ic2 = incident.IncidentClassifier()
573            options = incident.ClassifyOptions()
574            options.parseOptions(["--verbose"] +
575                                 [os.path.join(got_logdir, fn) for fn in files])
576            options.stdout = StringIO()
577            ic2.run(options)
578            out = options.stdout.getvalue()
579            self.failUnlessIn(".flog.bz2: unknown\n", out)
580            # this should have a JSON-formatted trigger dictionary
581            self.assertTrue(re.search(r'u?"message": u?"foom"', out), out)
582            self.failUnlessIn('"num": 0', out)
583            self.failUnlessIn("RuntimeError", out)
584
585        d.addCallback(_check)
586        return d
587
588@implementer(RILogObserver)
589class Observer(Referenceable):
590    def __init__(self):
591        self.messages = []
592        self.incidents = []
593        self.done_with_incidents = False
594    def remote_msg(self, d):
595        self.messages.append(d)
596
597    def remote_new_incident(self, name, trigger):
598        self.incidents.append( (name, trigger) )
599    def remote_done_with_incident_catchup(self):
600        self.done_with_incidents = True
601
602class MyGatherer(gatherer.GathererService):
603    verbose = False
604
605    def __init__(self, rotate, use_bzip, basedir):
606        portnum = allocate_tcp_port()
607        with open(os.path.join(basedir, "port"), "w") as f:
608            f.write("tcp:%d\n" % portnum)
609        with open(os.path.join(basedir, "location"), "w") as f:
610            f.write("tcp:127.0.0.1:%d\n" % portnum)
611        gatherer.GathererService.__init__(self, rotate, use_bzip, basedir)
612
613    def remote_logport(self, nodeid, publisher):
614        d = gatherer.GathererService.remote_logport(self, nodeid, publisher)
615        d.addBoth(lambda res: self.d.callback(publisher))
616
617class SampleError(Exception):
618    """a sample error"""
619
620class Publish(PollMixin, unittest.TestCase):
621    def setUp(self):
622        self.parent = service.MultiService()
623        self.parent.startService()
624        # make the MAX_QUEUE_SIZE smaller to speed up the test, and restore
625        # it when we're done. The normal value is 2000, chosen to bound the
626        # queue to perhaps 1MB. Lowering the size from 2000 to 500 speeds up
627        # the test from about 10s to 5s.
628        self.saved_queue_size = publish.Subscription.MAX_QUEUE_SIZE
629        publish.Subscription.MAX_QUEUE_SIZE = 500
630
631    def tearDown(self):
632        publish.Subscription.MAX_QUEUE_SIZE = self.saved_queue_size
633        d = defer.succeed(None)
634        d.addCallback(lambda res: self.parent.stopService())
635        d.addCallback(flushEventualQueue)
636        return d
637
638    def test_logport_furlfile1(self):
639        basedir = "logging/Publish/logport_furlfile1"
640        os.makedirs(basedir)
641        furlfile = os.path.join(basedir, "logport.furl")
642        t = Tub()
643        # setOption before setServiceParent
644        t.setOption("logport-furlfile", furlfile)
645        t.setServiceParent(self.parent)
646        self.assertRaises(NoLocationError, t.getLogPort)
647        self.assertRaises(NoLocationError, t.getLogPortFURL)
648        portnum = allocate_tcp_port()
649        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
650        self.assertFalse(os.path.exists(furlfile))
651        t.setLocation("127.0.0.1:%d" % portnum)
652        logport_furl = open(furlfile, "r").read().strip()
653        self.assertEqual(logport_furl, t.getLogPortFURL())
654
655    def test_logport_furlfile2(self):
656        basedir = "logging/Publish/logport_furlfile2"
657        os.makedirs(basedir)
658        furlfile = os.path.join(basedir, "logport.furl")
659        t = Tub()
660        # setServiceParent before setOption
661        t.setServiceParent(self.parent)
662        self.assertRaises(NoLocationError, t.getLogPort)
663        self.assertRaises(NoLocationError, t.getLogPortFURL)
664        portnum = allocate_tcp_port()
665        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
666        t.setOption("logport-furlfile", furlfile)
667        self.assertFalse(os.path.exists(furlfile))
668        t.setLocation("127.0.0.1:%d" % portnum)
669        logport_furl = open(furlfile, "r").read().strip()
670        self.assertEqual(logport_furl, t.getLogPortFURL())
671
672    def test_logpublisher(self):
673        basedir = "logging/Publish/logpublisher"
674        os.makedirs(basedir)
675        furlfile = os.path.join(basedir, "logport.furl")
676        t = Tub()
677        t.setServiceParent(self.parent)
678        portnum = allocate_tcp_port()
679        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
680        self.assertRaises(NoLocationError, t.getLogPort)
681        self.assertRaises(NoLocationError, t.getLogPortFURL)
682
683        t.setLocation("127.0.0.1:%d" % portnum)
684        t.setOption("logport-furlfile", furlfile)
685        logport_furl = t.getLogPortFURL()
686        logport_furl2 = open(furlfile, "r").read().strip()
687        self.assertEqual(logport_furl, logport_furl2)
688        tw_log = twisted_log.LogPublisher()
689        tlb = t.setOption("bridge-twisted-logs", tw_log)
690
691        t2 = Tub()
692        t2.setServiceParent(self.parent)
693        ob = Observer()
694
695        d = t2.getReference(logport_furl)
696        def _got_logport(logport):
697            d = logport.callRemote("get_versions")
698            def _check(versions):
699                versions = ensure_dict_str(versions)
700                self.assertEqual(versions["foolscap"],
701                                 six.ensure_text(foolscap.__version__))
702            d.addCallback(_check)
703            # note: catch_up=False, so this message won't be sent
704            log.msg("message 0 here, before your time")
705            d.addCallback(lambda res:
706                          logport.callRemote("subscribe_to_all", ob))
707            def _emit(subscription):
708                self._subscription = subscription
709                log.msg("message 1 here")
710                tw_log.msg("message 2 here")
711
712                # switch to generic (no tubid) bridge
713                log.unbridgeLogsFromTwisted(tw_log, tlb)
714                log.bridgeLogsFromTwisted(None, tw_log)
715
716                tw_log.msg("message 3 here")
717                tw_log.msg(format="%(foo)s is foo", foo="foo")
718                log.err(failure.Failure(SampleError("err1")))
719                log.err(SampleError("err2"))
720                # simulate twisted.python.log.err, which is unfortunately
721                # not a method of LogPublisher
722                def err(_stuff=None, _why=None):
723                    if isinstance(_stuff, Exception):
724                        tw_log.msg(failure=failure.Failure(_stuff),
725                                   isError=1, why=_why)
726                    else:
727                        tw_log.msg(failure=_stuff, isError=1, why=_why)
728                err(failure.Failure(SampleError("err3")))
729                err(SampleError("err4"))
730            d.addCallback(_emit)
731            # wait until we've seen all the messages, or the test times out
732            d.addCallback(lambda res: self.poll(lambda: len(ob.messages) >= 8))
733            def _check_observer(res):
734                msgs = ob.messages
735                self.assertEqual(len(msgs), 8)
736                self.assertEqual(msgs[0]["message"], "message 1 here")
737                self.assertEqual(msgs[1]["from-twisted"], True)
738                self.assertEqual(msgs[1]["message"], "message 2 here")
739                self.assertEqual(msgs[1]["tubID"], t.tubID)
740                self.assertEqual(msgs[2]["from-twisted"], True)
741                self.assertEqual(msgs[2]["message"], "message 3 here")
742                self.assertEqual(msgs[2]["tubID"], None)
743                self.assertEqual(msgs[3]["from-twisted"], True)
744                self.assertEqual(msgs[3]["message"], "foo is foo")
745
746                # check the errors
747                self.assertEqual(msgs[4]["message"], "")
748                self.assertTrue(msgs[4]["isError"])
749                self.assertTrue("failure" in msgs[4])
750                self.assertTrue(msgs[4]["failure"].check(SampleError))
751                self.assertTrue("err1" in str(msgs[4]["failure"]))
752                self.assertEqual(msgs[5]["message"], "")
753                self.assertTrue(msgs[5]["isError"])
754                self.assertTrue("failure" in msgs[5])
755                self.assertTrue(msgs[5]["failure"].check(SampleError))
756                self.assertTrue("err2" in str(msgs[5]["failure"]))
757
758                # errors coming from twisted are stringified
759                self.assertEqual(msgs[6]["from-twisted"], True)
760                self.assertTrue("Unhandled Error" in msgs[6]["message"])
761                self.assertTrue("SampleError: err3" in msgs[6]["message"])
762                self.assertTrue(msgs[6]["isError"])
763
764                self.assertEqual(msgs[7]["from-twisted"], True)
765                self.assertTrue("Unhandled Error" in msgs[7]["message"])
766                self.assertTrue("SampleError: err4" in msgs[7]["message"])
767                self.assertTrue(msgs[7]["isError"])
768
769            d.addCallback(_check_observer)
770            def _done(res):
771                return logport.callRemote("unsubscribe", self._subscription)
772            d.addCallback(_done)
773            return d
774        d.addCallback(_got_logport)
775        return d
776
777    def test_logpublisher_overload(self):
778        basedir = "logging/Publish/logpublisher_overload"
779        os.makedirs(basedir)
780        furlfile = os.path.join(basedir, "logport.furl")
781        t = Tub()
782        t.setServiceParent(self.parent)
783        portnum = allocate_tcp_port()
784        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
785        t.setLocation("127.0.0.1:%d" % portnum)
786
787        t.setOption("logport-furlfile", furlfile)
788        logport_furl = t.getLogPortFURL()
789        logport_furl2 = open(furlfile, "r").read().strip()
790        self.assertEqual(logport_furl, logport_furl2)
791
792        t2 = Tub()
793        t2.setServiceParent(self.parent)
794        ob = Observer()
795
796        d = t2.getReference(logport_furl)
797        def _got_logport(logport):
798            d = logport.callRemote("subscribe_to_all", ob)
799            def _emit(subscription):
800                self._subscription = subscription
801                for i in range(10000):
802                    log.msg("message %d here" % i)
803            d.addCallback(_emit)
804            # now we wait until the observer has seen nothing for a full
805            # second. I'd prefer something faster and more deterministic, but
806            # this ought to handle the normal slow-host cases.
807            expected = publish.Subscription.MAX_QUEUE_SIZE
808            def _check_f():
809                return bool(len(ob.messages) >= expected)
810            d.addCallback(lambda res: self.poll(_check_f, 0.2))
811            # TODO: I'm not content with that polling, and would prefer to do
812            # something faster and more deterministic
813            #d.addCallback(fireEventually)
814            #d.addCallback(fireEventually)
815            def _check_observer(res):
816                msgs = ob.messages
817                self.assertEqual(len(msgs), expected)
818                # since we discard new messages during overload (and preserve
819                # old ones), we should see 0..MAX_QUEUE_SIZE-1.
820                got = []
821                for m in msgs:
822                    ignored1, number_s, ignored2 = m["message"].split()
823                    number = int(number_s)
824                    got.append(number)
825                self.assertEqual(got, sorted(got))
826                self.assertEqual(got, list(range(expected)))
827
828            d.addCallback(_check_observer)
829            def _done(res):
830                return logport.callRemote("unsubscribe", self._subscription)
831            d.addCallback(_done)
832            return d
833        d.addCallback(_got_logport)
834        return d
835
836    def test_logpublisher_catchup(self):
837        basedir = "logging/Publish/logpublisher_catchup"
838        os.makedirs(basedir)
839        furlfile = os.path.join(basedir, "logport.furl")
840        t = Tub()
841        t.setServiceParent(self.parent)
842        portnum = allocate_tcp_port()
843        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
844        t.setLocation("127.0.0.1:%d" % portnum)
845
846        t.setOption("logport-furlfile", furlfile)
847        logport_furl = t.getLogPortFURL()
848
849        t2 = Tub()
850        t2.setServiceParent(self.parent)
851        ob = Observer()
852
853        d = t2.getReference(logport_furl)
854        def _got_logport(logport):
855            d = logport.callRemote("get_versions")
856            def _check_versions(versions):
857                versions = ensure_dict_str(versions)
858                self.assertEqual(versions["foolscap"],
859                                 six.ensure_text(foolscap.__version__))
860            d.addCallback(_check_versions)
861            d.addCallback(lambda res: logport.callRemote("get_pid"))
862            def _check_pid(pid):
863                self.assertEqual(pid, os.getpid())
864            d.addCallback(_check_pid)
865            # note: catch_up=True, so this message *will* be sent. Also note
866            # that we need this message to be unique, since our logger will
867            # stash messages recorded by other test cases, and we don't want
868            # to confuse the two.
869            log.msg("this is an early message")
870            d.addCallback(lambda res:
871                          logport.callRemote("subscribe_to_all", ob, True))
872            def _emit(subscription):
873                self._subscription = subscription
874                log.msg("this is a later message")
875            d.addCallback(_emit)
876            # wait until we've received the later message
877            def _check_f():
878                for m in ob.messages:
879                    if m.get("message") == "this is a later message":
880                        return True
881                return False
882            d.addCallback(lambda res: self.poll(_check_f))
883            def _check_observer(res):
884                msgs = ob.messages
885                # this gets everything that's been logged since the unit
886                # tests began. The Reconnector that's used by
887                # logport-furlfile will cause some uncertainty.. negotiation
888                # messages might be interleaved with the ones that we
889                # actually care about. So what we verify is that both of our
890                # messages appear *somewhere*, and that they show up in the
891                # correct order.
892                self.assertTrue(len(msgs) >= 2, len(msgs))
893                first = None
894                second = None
895                for i,m in enumerate(msgs):
896                    if m.get("message") == "this is an early message":
897                        first = i
898                    if m.get("message") == "this is a later message":
899                        second = i
900                self.assertTrue(first is not None)
901                self.assertTrue(second is not None)
902                self.assertTrue(first < second,
903                                "%d is not before %d" % (first, second))
904            d.addCallback(_check_observer)
905            def _done(res):
906                return logport.callRemote("unsubscribe", self._subscription)
907            d.addCallback(_done)
908            return d
909        d.addCallback(_got_logport)
910        return d
911
912class IncidentPublisher(PollMixin, unittest.TestCase):
913    def setUp(self):
914        self.parent = service.MultiService()
915        self.parent.startService()
916
917    def tearDown(self):
918        d = defer.succeed(None)
919        d.addCallback(lambda res: self.parent.stopService())
920        d.addCallback(flushEventualQueue)
921        return d
922
923    def _write_to(self, logdir, fn, data="stuff"):
924        f = open(os.path.join(logdir, fn), "w")
925        f.write(data)
926        f.close()
927
928    def test_list_incident_names(self):
929        basedir = "logging/IncidentPublisher/list_incident_names"
930        os.makedirs(basedir)
931        t = Tub()
932        t.setLocation("127.0.0.1:1234")
933        t.logger = self.logger = log.FoolscapLogger()
934        logdir = os.path.join(basedir, "logdir")
935        t.logger.setLogDir(logdir)
936        p = t.getLogPort()
937
938        # dump some other files in the incident directory
939        self._write_to(logdir, "distraction.bz2")
940        self._write_to(logdir, "noise")
941        # and a few real-looking incidents
942        I1 = "incident-2008-07-29-204211-aspkxoi"
943        I2 = "incident-2008-07-30-112233-wodaei"
944        I1_abs = os.path.abspath(os.path.join(logdir, I1 + ".flog"))
945        I2_abs = os.path.abspath(os.path.join(logdir, I2 + ".flog.bz2"))
946        self._write_to(logdir, I1 + ".flog")
947        self._write_to(logdir, I2 + ".flog.bz2")
948
949        all = list(p.list_incident_names())
950        self.assertEqual(set([name for (name,fn) in all]), set([I1, I2]))
951        imap = dict(all)
952        self.assertEqual(imap[I1], I1_abs)
953        self.assertEqual(imap[I2], I2_abs)
954
955        new = list(p.list_incident_names(since=I1))
956        self.assertEqual(set([name for (name,fn) in new]), set([I2]))
957
958
959    def test_get_incidents(self):
960        basedir = "logging/IncidentPublisher/get_incidents"
961        os.makedirs(basedir)
962        furlfile = os.path.join(basedir, "logport.furl")
963        t = Tub()
964        t.logger = self.logger = log.FoolscapLogger()
965        logdir = os.path.join(basedir, "logdir")
966        t.logger.setLogDir(logdir)
967        t.logger.setIncidentReporterFactory(incident.NonTrailingIncidentReporter)
968        # dump some other files in the incident directory
969        f = open(os.path.join(logdir, "distraction.bz2"), "w")
970        f.write("stuff")
971        f.close()
972        f = open(os.path.join(logdir, "noise"), "w")
973        f.write("stuff")
974        f.close()
975
976        # fill the buffers with some messages
977        t.logger.msg("one")
978        t.logger.msg("two")
979        # and trigger an incident
980        t.logger.msg("three", level=log.WEIRD)
981        # the NonTrailingIncidentReporter needs a turn before it will have
982        # finished recording the event: the getReference() call will suffice.
983
984        # now set up a Tub to connect to the logport
985        t.setServiceParent(self.parent)
986        portnum = allocate_tcp_port()
987        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
988        t.setLocation("127.0.0.1:%d" % portnum)
989
990        t.setOption("logport-furlfile", furlfile)
991        logport_furl = t.getLogPortFURL()
992        logport_furl2 = open(furlfile, "r").read().strip()
993        self.assertEqual(logport_furl, logport_furl2)
994
995        t2 = Tub()
996        t2.setServiceParent(self.parent)
997
998        d = t2.getReference(logport_furl)
999        def _got_logport(logport):
1000            d = logport.callRemote("list_incidents")
1001            d.addCallback(self._check_listed)
1002            d.addCallback(lambda res:
1003                          logport.callRemote("get_incident", self.i_name))
1004            d.addCallback(self._check_incident)
1005            def _decompress(res):
1006                # now we manually decompress the logfile for that incident,
1007                # to exercise the code that provides access to incidents that
1008                # did not finish their trailing-gather by the time the
1009                # application was shut down
1010                assert not self.i_name.endswith(".bz2")
1011                fn1 = os.path.join(logdir, self.i_name) + ".flog.bz2"
1012                fn2 = fn1[:-len(".bz2")]
1013                f1 = bz2.BZ2File(fn1, "r")
1014                f2 = open(fn2, "wb")
1015                f2.write(f1.read())
1016                f2.close()
1017                f1.close()
1018                os.unlink(fn1)
1019            d.addCallback(_decompress)
1020            # and do it again
1021            d.addCallback(lambda res: logport.callRemote("list_incidents"))
1022            d.addCallback(self._check_listed)
1023            d.addCallback(lambda res:
1024                          logport.callRemote("get_incident", self.i_name))
1025            d.addCallback(self._check_incident)
1026            return d
1027        d.addCallback(_got_logport)
1028        return d
1029
1030    def _check_listed(self, incidents):
1031        self.assertTrue(isinstance(incidents, dict))
1032        self.assertEqual(len(incidents), 1)
1033        self.i_name = i_name = list(incidents.keys())[0]
1034        self.assertTrue(i_name.startswith("incident"))
1035        self.assertFalse(i_name.endswith(".flog") or i_name.endswith(".bz2"))
1036        trigger = incidents[i_name]
1037        self.assertEqual(trigger["message"], "three")
1038    def _check_incident(self, xxx_todo_changeme2 ):
1039        (header, events) = xxx_todo_changeme2
1040        self.assertEqual(header["type"], "incident")
1041        self.assertEqual(header["trigger"]["message"], "three")
1042        self.assertEqual(len(events), 3)
1043        self.assertEqual(events[0]["message"], "one")
1044
1045    def test_subscribe(self):
1046        basedir = "logging/IncidentPublisher/subscribe"
1047        os.makedirs(basedir)
1048        t = Tub()
1049        t.logger = self.logger = log.FoolscapLogger()
1050        logdir = os.path.join(basedir, "logdir")
1051        t.logger.setLogDir(logdir)
1052        t.logger.setIncidentReporterFactory(incident.NonTrailingIncidentReporter)
1053
1054        # fill the buffers with some messages
1055        t.logger.msg("boring")
1056        t.logger.msg("blah")
1057        # and trigger the first incident
1058        t.logger.msg("one", level=log.WEIRD)
1059        # the NonTrailingIncidentReporter needs a turn before it will have
1060        # finished recording the event: the getReference() call will suffice.
1061
1062        # now set up a Tub to connect to the logport
1063        t.setServiceParent(self.parent)
1064        portnum = allocate_tcp_port()
1065        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1066        t.setLocation("127.0.0.1:%d" % portnum)
1067        logport_furl = t.getLogPortFURL()
1068
1069        ob = Observer()
1070        t2 = Tub()
1071        t2.setServiceParent(self.parent)
1072
1073        d = t2.getReference(logport_furl)
1074        def _got_logport(logport):
1075            self._logport = logport
1076            d2 = logport.callRemote("subscribe_to_incidents", ob) # no catchup
1077            return d2
1078        d.addCallback(_got_logport)
1079        def _subscribed(subscription):
1080            self._subscription = subscription
1081        d.addCallback(_subscribed)
1082        # pause long enough for the incident names to change
1083        d.addCallback(lambda res: time.sleep(2))
1084        d.addCallback(lambda res: t.logger.msg("two", level=log.WEIRD))
1085        d.addCallback(lambda res:
1086                      self.poll(lambda: bool(ob.incidents), 0.1))
1087        def _triggerof(incident):
1088            (name, trigger) = incident
1089            return trigger["message"]
1090        def _check_new(res):
1091            self.assertEqual(len(ob.incidents), 1)
1092            self.assertEqual(_triggerof(ob.incidents[0]), "two")
1093        d.addCallback(_check_new)
1094        d.addCallback(lambda res: self._subscription.callRemote("unsubscribe"))
1095
1096        # now subscribe and catch up on all incidents
1097        ob2 = Observer()
1098        d.addCallback(lambda res:
1099                      self._logport.callRemote("subscribe_to_incidents", ob2,
1100                                               True, ""))
1101        d.addCallback(_subscribed)
1102        d.addCallback(lambda res:
1103                      self.poll(lambda: ob2.done_with_incidents, 0.1))
1104        def _check_all(res):
1105            self.assertEqual(len(ob2.incidents), 2)
1106            self.assertEqual(_triggerof(ob2.incidents[0]), "one")
1107            self.assertEqual(_triggerof(ob2.incidents[1]), "two")
1108        d.addCallback(_check_all)
1109
1110        d.addCallback(lambda res: time.sleep(2))
1111        d.addCallback(lambda res: t.logger.msg("three", level=log.WEIRD))
1112        d.addCallback(lambda res:
1113                      self.poll(lambda: len(ob2.incidents) >= 3, 0.1))
1114        def _check_all2(res):
1115            self.assertEqual(len(ob2.incidents), 3)
1116            self.assertEqual(_triggerof(ob2.incidents[0]), "one")
1117            self.assertEqual(_triggerof(ob2.incidents[1]), "two")
1118            self.assertEqual(_triggerof(ob2.incidents[2]), "three")
1119        d.addCallback(_check_all2)
1120        d.addCallback(lambda res: self._subscription.callRemote("unsubscribe"))
1121
1122        # test the since= argument, setting it equal to the name of the
1123        # second incident. This should give us the third incident.
1124        ob3 = Observer()
1125        d.addCallback(lambda res:
1126                      self._logport.callRemote("subscribe_to_incidents", ob3,
1127                                               True, ob2.incidents[1][0]))
1128        d.addCallback(_subscribed)
1129        d.addCallback(lambda res:
1130                      self.poll(lambda: ob3.done_with_incidents, 0.1))
1131        def _check_since(res):
1132            self.assertEqual(len(ob3.incidents), 1)
1133            self.assertEqual(_triggerof(ob3.incidents[0]), "three")
1134        d.addCallback(_check_since)
1135        d.addCallback(lambda res: time.sleep(2))
1136        d.addCallback(lambda res: t.logger.msg("four", level=log.WEIRD))
1137        d.addCallback(lambda res:
1138                      self.poll(lambda: len(ob3.incidents) >= 2, 0.1))
1139        def _check_since2(res):
1140            self.assertEqual(len(ob3.incidents), 2)
1141            self.assertEqual(_triggerof(ob3.incidents[0]), "three")
1142            self.assertEqual(_triggerof(ob3.incidents[1]), "four")
1143        d.addCallback(_check_since2)
1144        d.addCallback(lambda res: self._subscription.callRemote("unsubscribe"))
1145
1146        return d
1147    test_subscribe.timeout = 20
1148
1149class MyIncidentGathererService(gatherer.IncidentGathererService):
1150    verbose = False
1151    cb_new_incident = None
1152
1153    def remote_logport(self, nodeid, publisher):
1154        d = gatherer.IncidentGathererService.remote_logport(self,
1155                                                            nodeid, publisher)
1156        d.addCallback(lambda res: self.d.callback(publisher))
1157        return d
1158
1159    def new_incident(self, abs_fn, rel_fn, nodeid_s, incident):
1160        gatherer.IncidentGathererService.new_incident(self, abs_fn, rel_fn,
1161                                                      nodeid_s, incident)
1162        if self.cb_new_incident:
1163            self.cb_new_incident((abs_fn, rel_fn))
1164
1165class IncidentGatherer(unittest.TestCase,
1166                       PollMixin, StallMixin, LogfileReaderMixin):
1167    def setUp(self):
1168        self.parent = service.MultiService()
1169        self.parent.startService()
1170        self.logger = log.FoolscapLogger()
1171        self.logger.setIncidentReporterFactory(NoFollowUpReporter)
1172
1173    def tearDown(self):
1174        d = defer.succeed(None)
1175        d.addCallback(lambda res: self.parent.stopService())
1176        d.addCallback(flushEventualQueue)
1177        return d
1178
1179    def create_incident_gatherer(self, basedir, classifiers=[]):
1180        # create an incident gatherer, which will make its own Tub
1181        ig_basedir = os.path.join(basedir, "ig")
1182        if not os.path.isdir(ig_basedir):
1183            os.mkdir(ig_basedir)
1184            portnum = allocate_tcp_port()
1185            with open(os.path.join(ig_basedir, "port"), "w") as f:
1186                f.write("tcp:%d\n" % portnum)
1187            with open(os.path.join(ig_basedir, "location"), "w") as f:
1188                f.write("tcp:127.0.0.1:%d\n" % portnum)
1189        null = StringIO()
1190        ig = MyIncidentGathererService(classifiers=classifiers,
1191                                       basedir=ig_basedir, stdout=null)
1192        ig.d = defer.Deferred()
1193        return ig
1194
1195    def create_connected_tub(self, ig):
1196        t = Tub()
1197        t.logger = self.logger
1198        t.setServiceParent(self.parent)
1199        portnum = allocate_tcp_port()
1200        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1201        t.setLocation("127.0.0.1:%d" % portnum)
1202        t.setOption("log-gatherer-furl", ig.my_furl)
1203
1204    def test_connect(self):
1205        basedir = "logging/IncidentGatherer/connect"
1206        os.makedirs(basedir)
1207        self.logger.setLogDir(basedir)
1208
1209        ig = self.create_incident_gatherer(basedir)
1210        ig.setServiceParent(self.parent)
1211        self.create_connected_tub(ig)
1212
1213        d = ig.d
1214        # give the call to remote_logport a chance to retire
1215        d.addCallback(self.stall, 0.5)
1216        return d
1217
1218    def test_emit(self):
1219        basedir = "logging/IncidentGatherer/emit"
1220        os.makedirs(basedir)
1221        self.logger.setLogDir(basedir)
1222
1223        ig = self.create_incident_gatherer(basedir)
1224        ig.setServiceParent(self.parent)
1225        incident_d = defer.Deferred()
1226        ig.cb_new_incident = incident_d.callback
1227        self.create_connected_tub(ig)
1228
1229        d = ig.d
1230
1231        d.addCallback(lambda res: self.logger.msg("boom", level=log.WEIRD))
1232        d.addCallback(lambda res: incident_d)
1233        def _new_incident(xxx_todo_changeme):
1234            (abs_fn, rel_fn) = xxx_todo_changeme
1235            events = self._read_logfile(abs_fn)
1236            header = events[0]["header"]
1237            self.assertTrue("trigger" in header)
1238            self.assertEqual(header["trigger"]["message"], "boom")
1239            e = events[1]["d"]
1240            self.assertEqual(e["message"], "boom")
1241
1242            # it should have been classified as "unknown"
1243            unknowns_fn = os.path.join(ig.basedir, "classified", "unknown")
1244            unknowns = [fn.strip() for fn in open(unknowns_fn,"r").readlines()]
1245            self.assertEqual(len(unknowns), 1)
1246            self.assertEqual(unknowns[0], rel_fn)
1247        d.addCallback(_new_incident)
1248
1249        # now shut down the gatherer, create a new one with the same basedir
1250        # (with some classifier functions), remove the existing
1251        # classifications, and start it up. It should reclassify everything
1252        # at startup.
1253
1254        d.addCallback(lambda res: ig.disownServiceParent())
1255
1256        def classify_boom(trigger):
1257            if "boom" in trigger.get("message",""):
1258                return "boom"
1259        def classify_foom(trigger):
1260            if "foom" in trigger.get("message",""):
1261                return "foom"
1262
1263        incident_d2 = defer.Deferred()
1264        def _update_classifiers(res):
1265            self.remove_classified_incidents(ig)
1266            ig2 = self.create_incident_gatherer(basedir, [classify_boom])
1267            ##ig2.add_classifier(classify_foom)
1268            # we add classify_foom by writing it into a file, to exercise the
1269            # look-for-classifier-files code
1270            foomfile = os.path.join(ig2.basedir, "classify_foom.py")
1271            f = open(foomfile, "w")
1272            f.write('''
1273def classify_incident(trigger):
1274    if "foom" in trigger.get("message",""):
1275        return "foom"
1276''')
1277            f.close()
1278            ig2.setServiceParent(self.parent)
1279            # now that it's been read, delete it to avoid affecting later
1280            # runs
1281            os.unlink(foomfile)
1282            self.ig2 = ig2
1283
1284            # incidents should be classified in startService
1285            unknowns_fn = os.path.join(ig.basedir, "classified", "unknown")
1286            self.assertFalse(os.path.exists(unknowns_fn))
1287            booms_fn = os.path.join(ig.basedir, "classified", "boom")
1288            booms = [fn.strip() for fn in open(booms_fn,"r").readlines()]
1289            self.assertEqual(len(booms), 1)
1290            fooms_fn = os.path.join(ig.basedir, "classified", "foom")
1291            self.assertFalse(os.path.exists(fooms_fn))
1292
1293            ig2.cb_new_incident = incident_d2.callback
1294
1295            return ig2.d
1296        d.addCallback(_update_classifiers)
1297        d.addCallback(lambda res: self.logger.msg("foom", level=log.WEIRD))
1298        d.addCallback(lambda res: incident_d2)
1299        def _new_incident2(xxx_todo_changeme1):
1300            # this one should be classified as "foom"
1301
1302            # it should have been classified as "unknown"
1303            (abs_fn, rel_fn) = xxx_todo_changeme1
1304            fooms_fn = os.path.join(ig.basedir, "classified", "foom")
1305            fooms = [fn.strip() for fn in open(fooms_fn,"r").readlines()]
1306            self.assertEqual(len(fooms), 1)
1307            self.assertEqual(fooms[0], rel_fn)
1308            unknowns_fn = os.path.join(ig.basedir, "classified", "unknown")
1309            self.assertFalse(os.path.exists(unknowns_fn))
1310        d.addCallback(_new_incident2)
1311        d.addCallback(lambda res: self.ig2.disownServiceParent())
1312
1313        # if we remove just classified/boom, then those incidents should be
1314        # reclassified
1315
1316        def _remove_boom_incidents(res):
1317            booms_fn = os.path.join(ig.basedir, "classified", "boom")
1318            os.remove(booms_fn)
1319
1320            ig2a = self.create_incident_gatherer(basedir, [classify_boom,
1321                                                           classify_foom])
1322            ig2a.setServiceParent(self.parent)
1323            self.ig2a = ig2a
1324
1325            # now classified/boom should be back, and the other files should
1326            # have been left untouched
1327            booms = [fn.strip() for fn in open(booms_fn,"r").readlines()]
1328            self.assertEqual(len(booms), 1)
1329        d.addCallback(_remove_boom_incidents)
1330        d.addCallback(lambda res: self.ig2a.disownServiceParent())
1331
1332        # and if we remove the classification functions (but do *not* remove
1333        # the classified incidents), the new gatherer should not reclassify
1334        # anything
1335
1336        def _update_classifiers_again(res):
1337            ig3 = self.create_incident_gatherer(basedir)
1338            ig3.setServiceParent(self.parent)
1339            self.ig3 = ig3
1340
1341            unknowns_fn = os.path.join(ig.basedir, "classified", "unknown")
1342            self.assertFalse(os.path.exists(unknowns_fn))
1343            booms_fn = os.path.join(ig.basedir, "classified", "boom")
1344            booms = [fn.strip() for fn in open(booms_fn,"r").readlines()]
1345            self.assertEqual(len(booms), 1)
1346            fooms_fn = os.path.join(ig.basedir, "classified", "foom")
1347            fooms = [fn.strip() for fn in open(fooms_fn,"r").readlines()]
1348            self.assertEqual(len(fooms), 1)
1349            return ig3.d
1350        d.addCallback(_update_classifiers_again)
1351
1352        d.addCallback(lambda res: self.ig3.disownServiceParent())
1353
1354        # and if we remove all the stored incidents (and the 'latest'
1355        # record), the gatherer will grab everything. This exercises the
1356        # only-grab-one-at-a-time code. I verified this manually, by adding a
1357        # print to the avoid-duplicate clause of
1358        # IncidentObserver.maybe_fetch_incident .
1359
1360        def _create_ig4(res):
1361            ig4 = self.create_incident_gatherer(basedir)
1362            for nodeid in os.listdir(os.path.join(ig4.basedir, "incidents")):
1363                nodedir = os.path.join(ig4.basedir, "incidents", nodeid)
1364                for fn in os.listdir(nodedir):
1365                    os.unlink(os.path.join(nodedir, fn))
1366                os.rmdir(nodedir)
1367            ig4.setServiceParent(self.parent)
1368            self.ig4 = ig4
1369        d.addCallback(_create_ig4)
1370        d.addCallback(lambda res:
1371                      self.poll(lambda : self.ig4.incidents_received == 2))
1372
1373        return d
1374
1375    def remove_classified_incidents(self, ig):
1376        classified = os.path.join(ig.basedir, "classified")
1377        for category in os.listdir(classified):
1378            os.remove(os.path.join(classified, category))
1379        os.rmdir(classified)
1380
1381class Gatherer(unittest.TestCase, LogfileReaderMixin, StallMixin, PollMixin):
1382    def setUp(self):
1383        self.parent = service.MultiService()
1384        self.parent.startService()
1385
1386    def tearDown(self):
1387        d = defer.succeed(None)
1388        d.addCallback(lambda res: self.parent.stopService())
1389        d.addCallback(flushEventualQueue)
1390        return d
1391
1392
1393    def _emit_messages_and_flush(self, res, t):
1394        log.msg("gathered message here")
1395        try:
1396            raise SampleError("whoops1")
1397        except:
1398            log.err()
1399        try:
1400            raise SampleError("whoops2")
1401        except SampleError:
1402            log.err(failure.Failure())
1403        d = self.stall(None, 1.0)
1404        d.addCallback(lambda res: t.disownServiceParent())
1405        # that will disconnect from the gatherer, which will flush the logfile
1406        d.addCallback(self.stall, 1.0)
1407        return d
1408
1409    def _check_gatherer(self, fn, starting_timestamp, expected_tubid):
1410        events = []
1411        for e in self._read_logfile(fn):
1412            # discard internal foolscap events, like connection
1413            # negotiation
1414            if "d" in e and "foolscap" in e["d"].get("facility", ""):
1415                pass
1416            else:
1417                events.append(e)
1418
1419        if len(events) != 4:
1420            from pprint import pprint
1421            pprint(events)
1422        self.assertEqual(len(events), 4)
1423
1424        # header
1425        data = events.pop(0)
1426        self.assertTrue(isinstance(data, dict))
1427        self.assertTrue("header" in data)
1428        self.assertEqual(data["header"]["type"], "gatherer")
1429        self.assertEqual(data["header"]["start"], starting_timestamp)
1430
1431        # grab the first event from the log
1432        data = events.pop(0)
1433        self.assertTrue(isinstance(data, dict))
1434        self.assertEqual(data['from'], expected_tubid)
1435        self.assertEqual(data['d']['message'], "gathered message here")
1436
1437        # grab the second event from the log
1438        data = events.pop(0)
1439        self.assertTrue(isinstance(data, dict))
1440        self.assertEqual(data['from'], expected_tubid)
1441        self.assertEqual(data['d']['message'], "")
1442        self.assertTrue(data['d']["isError"])
1443        self.assertTrue("failure" in data['d'])
1444        self.failUnlessIn("SampleError", data['d']["failure"]["repr"])
1445        self.failUnlessIn("whoops1", data['d']["failure"]["repr"])
1446
1447        # grab the third event from the log
1448        data = events.pop(0)
1449        self.assertTrue(isinstance(data, dict))
1450        self.assertEqual(data['from'], expected_tubid)
1451        self.assertEqual(data['d']['message'], "")
1452        self.assertTrue(data['d']["isError"])
1453        self.assertTrue("failure" in data['d'])
1454        self.failUnlessIn("SampleError", data['d']["failure"]["repr"])
1455        self.failUnlessIn("whoops2", data['d']["failure"]["repr"])
1456
1457    def test_wrongdir(self):
1458        basedir = "logging/Gatherer/wrongdir"
1459        os.makedirs(basedir)
1460
1461        # create a LogGatherer with an unspecified basedir: it should look
1462        # for a .tac file in the current directory, not see it, and complain
1463        e = self.assertRaises(RuntimeError,
1464                                  gatherer.GathererService, None, True, None)
1465        self.assertTrue("running in the wrong directory" in str(e))
1466
1467    def test_log_gatherer(self):
1468        # setLocation, then set log-gatherer-furl. Also, use bzip=True for
1469        # this one test.
1470        basedir = "logging/Gatherer/log_gatherer"
1471        os.makedirs(basedir)
1472
1473        # create a gatherer, which will create its own Tub
1474        gatherer = MyGatherer(None, True, basedir)
1475        gatherer.d = defer.Deferred()
1476        gatherer.setServiceParent(self.parent)
1477        # that will start the gatherer
1478        gatherer_furl = gatherer.my_furl
1479        starting_timestamp = gatherer._starting_timestamp
1480
1481        t = Tub()
1482        expected_tubid = t.tubID
1483        assert t.tubID is not None
1484        t.setServiceParent(self.parent)
1485        portnum = allocate_tcp_port()
1486        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1487        t.setLocation("127.0.0.1:%d" % portnum)
1488        t.setOption("log-gatherer-furl", gatherer_furl)
1489
1490        # about now, the node will be contacting the Gatherer and
1491        # offering its logport.
1492
1493        # gatherer.d will be fired when subscribe_to_all() has finished
1494        d = gatherer.d
1495        d.addCallback(self._emit_messages_and_flush, t)
1496        # We use do_rotate() to force logfile rotation before checking
1497        # contents of the file, so we know it's been written out to disk
1498        d.addCallback(lambda res: gatherer.do_rotate())
1499        d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid)
1500        return d
1501    test_log_gatherer.timeout = 20
1502
1503    def test_log_gatherer_multiple(self):
1504        # setLocation, then set log-gatherer-furl.
1505        basedir = "logging/Gatherer/log_gatherer_multiple"
1506        os.makedirs(basedir)
1507
1508        # create a gatherer, which will create its own Tub
1509        gatherer1_basedir = os.path.join(basedir, "gatherer1")
1510        os.makedirs(gatherer1_basedir)
1511        gatherer1 = MyGatherer(None, False, gatherer1_basedir)
1512        gatherer1.d = defer.Deferred()
1513        gatherer1.setServiceParent(self.parent)
1514        # that will start the gatherer
1515        gatherer1_furl = gatherer1.my_furl
1516        starting_timestamp1 = gatherer1._starting_timestamp
1517
1518        # create a second one
1519        gatherer2_basedir = os.path.join(basedir, "gatherer2")
1520        os.makedirs(gatherer2_basedir)
1521        gatherer2 = MyGatherer(None, False, gatherer2_basedir)
1522        gatherer2.d = defer.Deferred()
1523        gatherer2.setServiceParent(self.parent)
1524        # that will start the gatherer
1525        gatherer2_furl = gatherer2.my_furl
1526        starting_timestamp2 = gatherer2._starting_timestamp
1527
1528        t = Tub()
1529        expected_tubid = t.tubID
1530        assert t.tubID is not None
1531        t.setServiceParent(self.parent)
1532        portnum = allocate_tcp_port()
1533        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1534        t.setLocation("127.0.0.1:%d" % portnum)
1535        t.setOption("log-gatherer-furl", (gatherer1_furl, gatherer2_furl))
1536
1537        # about now, the node will be contacting the Gatherers and
1538        # offering its logport.
1539
1540        # gatherer.d and gatherer2.d will be fired when subscribe_to_all()
1541        # has finished
1542        dl = defer.DeferredList([gatherer1.d, gatherer2.d])
1543        dl.addCallback(self._emit_messages_and_flush, t)
1544        dl.addCallback(lambda res: gatherer1.do_rotate())
1545        dl.addCallback(self._check_gatherer, starting_timestamp1, expected_tubid)
1546        dl.addCallback(lambda res: gatherer2.do_rotate())
1547        dl.addCallback(self._check_gatherer, starting_timestamp2, expected_tubid)
1548        return dl
1549    test_log_gatherer_multiple.timeout = 40
1550
1551    def test_log_gatherer2(self):
1552        # set log-gatherer-furl, then setLocation. Also, use a timed rotator.
1553        basedir = "logging/Gatherer/log_gatherer2"
1554        os.makedirs(basedir)
1555
1556        # create a gatherer, which will create its own Tub
1557        gatherer = MyGatherer(3600, False, basedir)
1558        gatherer.d = defer.Deferred()
1559        gatherer.setServiceParent(self.parent)
1560        # that will start the gatherer
1561        gatherer_furl = gatherer.my_furl
1562        starting_timestamp = gatherer._starting_timestamp
1563
1564        t = Tub()
1565        expected_tubid = t.tubID
1566        assert t.tubID is not None
1567        t.setServiceParent(self.parent)
1568        portnum = allocate_tcp_port()
1569        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1570        t.setOption("log-gatherer-furl", gatherer_furl)
1571        t.setLocation("127.0.0.1:%d" % portnum)
1572
1573        d = gatherer.d
1574        d.addCallback(self._emit_messages_and_flush, t)
1575        d.addCallback(lambda res: gatherer.do_rotate())
1576        d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid)
1577        return d
1578    test_log_gatherer2.timeout = 20
1579
1580    def test_log_gatherer_furlfile(self):
1581        # setLocation, then set log-gatherer-furlfile
1582        basedir = "logging/Gatherer/log_gatherer_furlfile"
1583        os.makedirs(basedir)
1584
1585        # create a gatherer, which will create its own Tub
1586        gatherer = MyGatherer(None, False, basedir)
1587        gatherer.d = defer.Deferred()
1588        gatherer.setServiceParent(self.parent)
1589        # that will start the gatherer
1590        gatherer_furlfile = os.path.join(basedir, gatherer.furlFile)
1591        starting_timestamp = gatherer._starting_timestamp
1592
1593        t = Tub()
1594        expected_tubid = t.tubID
1595        assert t.tubID is not None
1596        t.setServiceParent(self.parent)
1597        portnum = allocate_tcp_port()
1598        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1599        t.setLocation("127.0.0.1:%d" % portnum)
1600        t.setOption("log-gatherer-furlfile", gatherer_furlfile)
1601
1602        d = gatherer.d
1603        d.addCallback(self._emit_messages_and_flush, t)
1604        d.addCallback(lambda res: gatherer.do_rotate())
1605        d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid)
1606        return d
1607    test_log_gatherer_furlfile.timeout = 20
1608
1609    def test_log_gatherer_furlfile2(self):
1610        # set log-gatherer-furlfile, then setLocation
1611        basedir = "logging/Gatherer/log_gatherer_furlfile2"
1612        os.makedirs(basedir)
1613
1614        # create a gatherer, which will create its own Tub
1615        gatherer = MyGatherer(None, False, basedir)
1616        gatherer.d = defer.Deferred()
1617        gatherer.setServiceParent(self.parent)
1618        # that will start the gatherer
1619        gatherer_furlfile = os.path.join(basedir, gatherer.furlFile)
1620        starting_timestamp = gatherer._starting_timestamp
1621
1622        t = Tub()
1623        expected_tubid = t.tubID
1624        assert t.tubID is not None
1625        t.setServiceParent(self.parent)
1626        portnum = allocate_tcp_port()
1627        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1628        t.setOption("log-gatherer-furlfile", gatherer_furlfile)
1629        # one bug we had was that the log-gatherer was contacted before
1630        # setLocation had occurred, so exercise that case
1631        d = self.stall(None, 1.0)
1632        def _start(res):
1633            t.setLocation("127.0.0.1:%d" % portnum)
1634            return gatherer.d
1635        d.addCallback(_start)
1636        d.addCallback(self._emit_messages_and_flush, t)
1637        d.addCallback(lambda res: gatherer.do_rotate())
1638        d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid)
1639        return d
1640    test_log_gatherer_furlfile2.timeout = 20
1641
1642    def test_log_gatherer_furlfile_multiple(self):
1643        basedir = "logging/Gatherer/log_gatherer_furlfile_multiple"
1644        os.makedirs(basedir)
1645
1646        gatherer1_basedir = os.path.join(basedir, "gatherer1")
1647        os.makedirs(gatherer1_basedir)
1648        gatherer1 = MyGatherer(None, False, gatherer1_basedir)
1649        gatherer1.d = defer.Deferred()
1650        gatherer1.setServiceParent(self.parent)
1651        # that will start the gatherer
1652        gatherer1_furl = gatherer1.my_furl
1653        starting_timestamp1 = gatherer1._starting_timestamp
1654
1655        gatherer2_basedir = os.path.join(basedir, "gatherer2")
1656        os.makedirs(gatherer2_basedir)
1657        gatherer2 = MyGatherer(None, False, gatherer2_basedir)
1658        gatherer2.d = defer.Deferred()
1659        gatherer2.setServiceParent(self.parent)
1660        # that will start the gatherer
1661        gatherer2_furl = gatherer2.my_furl
1662        starting_timestamp2 = gatherer2._starting_timestamp
1663
1664        gatherer3_basedir = os.path.join(basedir, "gatherer3")
1665        os.makedirs(gatherer3_basedir)
1666        gatherer3 = MyGatherer(None, False, gatherer3_basedir)
1667        gatherer3.d = defer.Deferred()
1668        gatherer3.setServiceParent(self.parent)
1669        # that will start the gatherer
1670        gatherer3_furl = gatherer3.my_furl
1671        starting_timestamp3 = gatherer3._starting_timestamp
1672
1673        gatherer_furlfile = os.path.join(basedir, "log_gatherer.furl")
1674        f = open(gatherer_furlfile, "w")
1675        f.write(gatherer1_furl + "\n")
1676        f.write(gatherer2_furl + "\n")
1677        f.close()
1678
1679        t = Tub()
1680        expected_tubid = t.tubID
1681        assert t.tubID is not None
1682        t.setOption("log-gatherer-furl", gatherer3_furl)
1683        t.setServiceParent(self.parent)
1684        portnum = allocate_tcp_port()
1685        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1686        t.setLocation("127.0.0.1:%d" % portnum)
1687        t.setOption("log-gatherer-furlfile", gatherer_furlfile)
1688        # now both log gatherer connections will be being established
1689
1690        d = defer.DeferredList([gatherer1.d, gatherer2.d, gatherer3.d],
1691                               fireOnOneErrback=True)
1692        d.addCallback(self._emit_messages_and_flush, t)
1693        d.addCallback(lambda res: gatherer1.do_rotate())
1694        d.addCallback(self._check_gatherer, starting_timestamp1, expected_tubid)
1695        d.addCallback(lambda res: gatherer2.do_rotate())
1696        d.addCallback(self._check_gatherer, starting_timestamp2, expected_tubid)
1697        d.addCallback(lambda res: gatherer3.do_rotate())
1698        d.addCallback(self._check_gatherer, starting_timestamp3, expected_tubid)
1699        return d
1700    test_log_gatherer_furlfile_multiple.timeout = 20
1701
1702    def test_log_gatherer_empty_furlfile(self):
1703        basedir = "logging/Gatherer/log_gatherer_empty_furlfile"
1704        os.makedirs(basedir)
1705
1706        gatherer_fn = os.path.join(basedir, "lg.furl")
1707        open(gatherer_fn, "w").close()
1708        # leave the furlfile empty: use no gatherer
1709
1710        t = Tub()
1711        t.setServiceParent(self.parent)
1712        portnum = allocate_tcp_port()
1713        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1714        t.setLocation("127.0.0.1:%d" % portnum)
1715        t.setOption("log-gatherer-furlfile", gatherer_fn)
1716
1717        lp_furl = t.getLogPortFURL()
1718        del lp_furl
1719        t.log("this message shouldn't make anything explode")
1720    test_log_gatherer_empty_furlfile.timeout = 20
1721
1722    def test_log_gatherer_missing_furlfile(self):
1723        basedir = "logging/Gatherer/log_gatherer_missing_furlfile"
1724        os.makedirs(basedir)
1725
1726        gatherer_fn = os.path.join(basedir, "missing_lg.furl")
1727        open(gatherer_fn, "w").close()
1728        # leave the furlfile missing: use no gatherer
1729
1730        t = Tub()
1731        t.setServiceParent(self.parent)
1732        portnum = allocate_tcp_port()
1733        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
1734        t.setLocation("127.0.0.1:%d" % portnum)
1735        t.setOption("log-gatherer-furlfile", gatherer_fn)
1736
1737        lp_furl = t.getLogPortFURL()
1738        del lp_furl
1739        t.log("this message shouldn't make anything explode")
1740    test_log_gatherer_missing_furlfile.timeout = 20
1741
1742
1743class Tail(unittest.TestCase):
1744    def test_logprinter(self):
1745        target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v"
1746        options1 = {"save-to": None,
1747                   "verbose": None,
1748                   "timestamps": "short-local"}
1749        out = StringIO()
1750        lp = tail.LogPrinter(options1, target_tubid_s[:8], out)
1751        lp.got_versions({})
1752        lp.remote_msg({"time": 1207005906.527782,
1753                       "level": 25,
1754                       "num": 123,
1755                       "message": "howdy",
1756                       })
1757        outmsg = out.getvalue()
1758        # this contains a localtime string, so don't check the hour
1759        self.assertTrue(":06.527 L25 []#123 howdy" in outmsg)
1760
1761        lp.remote_msg({"time": 1207005907.527782,
1762                       "level": 25,
1763                       "num": 124,
1764                       "format": "howdy %(there)s",
1765                       "there": "pardner",
1766                       })
1767        outmsg = out.getvalue()
1768        # this contains a localtime string, so don't check the hour
1769        self.assertTrue(":07.527 L25 []#124 howdy pardner" in outmsg)
1770
1771        try:
1772            raise RuntimeError("fake error")
1773        except RuntimeError:
1774            f = failure.Failure()
1775
1776        lp.remote_msg({"time": 1207005950.002,
1777                       "level": 30,
1778                       "num": 125,
1779                       "message": "oops",
1780                       "failure": f,
1781                       })
1782        outmsg = out.getvalue()
1783
1784        self.assertTrue(":50.002 L30 []#125 oops\n FAILURE:\n" in outmsg,
1785                        outmsg)
1786        self.assertTrue("RuntimeError" in outmsg, outmsg)
1787        self.assertTrue(": fake error" in outmsg, outmsg)
1788        self.assertTrue("--- <exception caught here> ---\n" in outmsg, outmsg)
1789
1790    def test_logprinter_verbose(self):
1791        target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v"
1792        options1 = {"save-to": None,
1793                   "verbose": True,
1794                   "timestamps": "short-local"}
1795        out = StringIO()
1796        lp = tail.LogPrinter(options1, target_tubid_s[:8], out)
1797        lp.got_versions({})
1798        lp.remote_msg({"time": 1207005906.527782,
1799                       "level": 25,
1800                       "num": 123,
1801                       "message": "howdy",
1802                       })
1803        outmsg = out.getvalue()
1804        self.assertTrue("'message': 'howdy'" in outmsg, outmsg)
1805        self.assertTrue("'time': 1207005906.527782" in outmsg, outmsg)
1806        self.assertTrue("'level': 25" in outmsg, outmsg)
1807        self.assertTrue("{" in outmsg, outmsg)
1808
1809    def test_logprinter_saveto(self):
1810        target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v"
1811        saveto_filename = "test_logprinter_saveto.flog"
1812        options = {"save-to": saveto_filename,
1813                   "verbose": False,
1814                   "timestamps": "short-local"}
1815        out = StringIO()
1816        lp = tail.LogPrinter(options, target_tubid_s[:8], out)
1817        lp.got_versions({})
1818        lp.remote_msg({"time": 1207005906.527782,
1819                       "level": 25,
1820                       "num": 123,
1821                       "message": "howdy",
1822                       })
1823        outmsg = out.getvalue()
1824        del outmsg
1825        lp.saver.disconnected() # cause the file to be closed
1826        f = open(saveto_filename, "rb")
1827        expected_magic = f.read(len(flogfile.MAGIC))
1828        self.assertEqual(expected_magic, flogfile.MAGIC)
1829        data = json.loads(f.readline().decode("utf-8")) # header
1830        self.assertEqual(data["header"]["type"], "tail")
1831        data = json.loads(f.readline().decode("utf-8")) # event
1832        self.assertEqual(data["from"], "jiijpvbg")
1833        self.assertEqual(data["d"]["message"], "howdy")
1834        self.assertEqual(data["d"]["num"], 123)
1835
1836    def test_options(self):
1837        basedir = "logging/Tail/options"
1838        os.makedirs(basedir)
1839        fn = os.path.join(basedir, "foo")
1840        f = open(fn, "w")
1841        f.write("pretend this is a furl")
1842        f.close()
1843        f = open(os.path.join(basedir, "logport.furl"), "w")
1844        f.write("this too")
1845        f.close()
1846
1847        to = tail.TailOptions()
1848        to.parseOptions(["pb:pretend-furl"])
1849        self.assertFalse(to["verbose"])
1850        self.assertFalse(to["catch-up"])
1851        self.assertEqual(to.target_furl, "pb:pretend-furl")
1852
1853        to = tail.TailOptions()
1854        to.parseOptions(["--verbose", "--catch-up", basedir])
1855        self.assertTrue(to["verbose"])
1856        self.assertTrue(to["catch-up"])
1857        self.assertEqual(to.target_furl, "this too")
1858
1859        to = tail.TailOptions()
1860        to.parseOptions(["--save-to", "save.flog", fn])
1861        self.assertFalse(to["verbose"])
1862        self.assertFalse(to["catch-up"])
1863        self.assertEqual(to["save-to"], "save.flog")
1864        self.assertEqual(to.target_furl, "pretend this is a furl")
1865
1866        to = tail.TailOptions()
1867        self.assertRaises(RuntimeError, to.parseOptions, ["bogus.txt"])
1868
1869# applications that provide a command-line tool may find it useful to include
1870# a "flogtool" subcommand, using something like this:
1871class WrapperOptions(usage.Options):
1872    synopsis = "Usage: wrapper flogtool <command>"
1873    subCommands = [("flogtool", None, cli.Options, "foolscap log tool")]
1874
1875def run_wrapper(argv):
1876    config = WrapperOptions()
1877    config.parseOptions(argv)
1878    command = config.subCommand
1879    if command == "flogtool":
1880        return cli.run_flogtool(argv[1:], run_by_human=False)
1881
1882class CLI(unittest.TestCase):
1883    def test_create_gatherer(self):
1884        basedir = "logging/CLI/create_gatherer"
1885        argv = ["flogtool", "create-gatherer",
1886                "--port", "tcp:3117", "--location", "tcp:localhost:3117",
1887                "--quiet", basedir]
1888        cli.run_flogtool(argv[1:], run_by_human=False)
1889        self.assertTrue(os.path.exists(basedir))
1890
1891        basedir = "logging/CLI/create_gatherer2"
1892        argv = ["flogtool", "create-gatherer", "--rotate", "3600",
1893                "--port", "tcp:3117", "--location", "tcp:localhost:3117",
1894                "--quiet", basedir]
1895        cli.run_flogtool(argv[1:], run_by_human=False)
1896        self.assertTrue(os.path.exists(basedir))
1897
1898        basedir = "logging/CLI/create_gatherer3"
1899        argv = ["flogtool", "create-gatherer",
1900                "--port", "tcp:3117", "--location", "tcp:localhost:3117",
1901                basedir]
1902        (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
1903        self.assertTrue(os.path.exists(basedir))
1904        self.assertTrue(("Gatherer created in directory %s" % basedir)
1905                        in out, out)
1906        self.assertTrue("Now run" in out, out)
1907        self.assertTrue("to launch the daemon" in out, out)
1908
1909    def test_create_gatherer_badly(self):
1910        #basedir = "logging/CLI/create_gatherer"
1911        argv = ["flogtool", "create-gatherer", "--bogus-arg"]
1912        self.assertRaises(usage.UsageError,
1913                              cli.run_flogtool, argv[1:], run_by_human=False)
1914
1915    def test_create_gatherer_no_location(self):
1916        basedir = "logging/CLI/create_gatherer_no_location"
1917        argv = ["flogtool", "create-gatherer", basedir]
1918        e = self.assertRaises(usage.UsageError,
1919                                  cli.run_flogtool, argv[1:],
1920                                  run_by_human=False)
1921        self.failUnlessIn("--location= is mandatory", str(e))
1922
1923    def test_wrapper(self):
1924        basedir = "logging/CLI/wrapper"
1925        argv = ["wrapper", "flogtool", "create-gatherer",
1926                "--port", "tcp:3117", "--location", "tcp:localhost:3117",
1927                "--quiet", basedir]
1928        run_wrapper(argv[1:])
1929        self.assertTrue(os.path.exists(basedir))
1930
1931    def test_create_incident_gatherer(self):
1932        basedir = "logging/CLI/create_incident_gatherer"
1933        argv = ["flogtool", "create-incident-gatherer",
1934                "--port", "tcp:3118", "--location", "tcp:localhost:3118",
1935                "--quiet", basedir]
1936        cli.run_flogtool(argv[1:], run_by_human=False)
1937        self.assertTrue(os.path.exists(basedir))
1938
1939        basedir = "logging/CLI/create_incident_gatherer2"
1940        argv = ["flogtool", "create-incident-gatherer",
1941                "--port", "tcp:3118", "--location", "tcp:localhost:3118",
1942                basedir]
1943        (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
1944        self.assertTrue(os.path.exists(basedir))
1945        self.assertTrue(("Incident Gatherer created in directory %s" % basedir)
1946                        in out, out)
1947        self.assertTrue("Now run" in out, out)
1948        self.assertTrue("to launch the daemon" in out, out)
1949
1950class LogfileWriterMixin:
1951
1952    def create_logfile(self):
1953        if not os.path.exists(self.basedir):
1954            os.makedirs(self.basedir)
1955        fn = os.path.join(self.basedir, "dump.flog")
1956        l = log.FoolscapLogger()
1957        lfo = log.LogFileObserver(fn, level=0)
1958        l.addObserver(lfo.msg)
1959        l.msg("one", facility="big.facility")
1960        time.sleep(0.2) # give filter --after something to work with
1961        l.msg("two", level=log.OPERATIONAL-1)
1962        try:
1963            raise SampleError("whoops1")
1964        except:
1965            l.err(message="three")
1966        l.msg("four")
1967        d = fireEventually()
1968        def _done(res):
1969            lfo._stop()
1970            #events = self._read_logfile(fn)
1971            #self.failUnlessEqual(len(events), 1+3)
1972            return fn
1973        d.addCallback(_done)
1974        return d
1975
1976    def create_incident(self):
1977        if not os.path.exists(self.basedir):
1978            os.makedirs(self.basedir)
1979        l = log.FoolscapLogger()
1980        l.setLogDir(self.basedir)
1981        l.setIncidentReporterFactory(NoFollowUpReporter)
1982
1983        d = defer.Deferred()
1984        def _done(name, trigger):
1985            d.callback( (name,trigger) )
1986        l.addImmediateIncidentObserver(_done)
1987
1988        l.msg("one")
1989        l.msg("two")
1990        l.msg("boom", level=log.WEIRD)
1991        l.msg("four")
1992
1993        d.addCallback(lambda name_trigger:
1994                      os.path.join(self.basedir, name_trigger[0]+".flog.bz2"))
1995
1996        return d
1997
1998class Dumper(unittest.TestCase, LogfileWriterMixin, LogfileReaderMixin):
1999    # create a logfile, then dump it, and examine the output to make sure it
2000    # worked right.
2001
2002    def test_dump(self):
2003        self.basedir = "logging/Dumper/dump"
2004        d = self.create_logfile()
2005        def _check(fn):
2006            events = self._read_logfile(fn)
2007
2008            d = dumper.LogDumper()
2009            # initialize the LogDumper() timestamp mode
2010            d.options = dumper.DumpOptions()
2011            d.options.parseOptions([fn])
2012            tmode = d.options["timestamps"]
2013
2014            argv = ["flogtool", "dump", fn]
2015            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2016            self.assertEqual(err, "")
2017            lines = list(StringIO(out).readlines())
2018            self.assertTrue(lines[0].strip().startswith("Application versions"),
2019                            lines[0])
2020            mypid = os.getpid()
2021            self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3])
2022            lines = lines[5:]
2023            line0 = "local#%d %s: one" % (events[1]["d"]["num"],
2024                                          format_time(events[1]["d"]["time"],
2025                                                      tmode))
2026            self.assertEqual(lines[0].strip(), line0)
2027            self.assertTrue("FAILURE:" in lines[3])
2028            self.failUnlessIn("test_logging.SampleError", lines[4])
2029            self.failUnlessIn(": whoops1", lines[4])
2030            self.assertTrue(lines[-1].startswith("local#3 "))
2031
2032            argv = ["flogtool", "dump", "--just-numbers", fn]
2033            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2034            self.assertEqual(err, "")
2035            lines = list(StringIO(out).readlines())
2036            line0 = "%s %d" % (format_time(events[1]["d"]["time"], tmode),
2037                               events[1]["d"]["num"])
2038            self.assertEqual(lines[0].strip(), line0)
2039            self.assertTrue(lines[1].strip().endswith(" 1"))
2040            self.assertTrue(lines[-1].strip().endswith(" 3"))
2041            # failures are not dumped in --just-numbers
2042            self.assertEqual(len(lines), 1+3)
2043
2044            argv = ["flogtool", "dump", "--rx-time", fn]
2045            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2046            self.assertEqual(err, "")
2047            lines = list(StringIO(out).readlines())
2048            self.assertTrue(lines[0].strip().startswith("Application versions"),
2049                            lines[0])
2050            mypid = os.getpid()
2051            self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3])
2052            lines = lines[5:]
2053            line0 = "local#%d rx(%s) emit(%s): one" % \
2054                    (events[1]["d"]["num"],
2055                     format_time(events[1]["rx_time"], tmode),
2056                     format_time(events[1]["d"]["time"], tmode))
2057            self.assertEqual(lines[0].strip(), line0)
2058            self.assertTrue(lines[-1].strip().endswith(" four"))
2059
2060            argv = ["flogtool", "dump", "--verbose", fn]
2061            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2062            self.assertEqual(err, "")
2063            lines = list(StringIO(out).readlines())
2064            self.assertTrue("header" in lines[0])
2065            self.assertTrue(re.search(r"u?'message': u?'one'", lines[1]), lines[1])
2066            self.assertTrue("'level': 20" in lines[1])
2067            self.assertTrue(": four: {" in lines[-1])
2068
2069        d.addCallback(_check)
2070        return d
2071
2072    def test_incident(self):
2073        self.basedir = "logging/Dumper/incident"
2074        d = self.create_incident()
2075        def _check(fn):
2076            events = self._read_logfile(fn)
2077            # for sanity, make sure we created the incident correctly
2078            assert events[0]["header"]["type"] == "incident"
2079            assert events[0]["header"]["trigger"]["num"] == 2
2080
2081            argv = ["flogtool", "dump", fn]
2082            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2083            self.assertEqual(err, "")
2084            lines = list(StringIO(out).readlines())
2085            self.assertEqual(len(lines), 8)
2086            self.assertEqual(lines[0].strip(),
2087                                 "Application versions (embedded in logfile):")
2088            self.assertTrue(lines[1].strip().startswith("foolscap:"), lines[1])
2089            self.assertTrue(lines[2].strip().startswith("twisted:"), lines[2])
2090            mypid = os.getpid()
2091            self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3])
2092            self.assertEqual(lines[4].strip(), "")
2093            self.assertFalse("[INCIDENT-TRIGGER]" in lines[5])
2094            self.assertFalse("[INCIDENT-TRIGGER]" in lines[6])
2095            self.assertTrue(lines[7].strip().endswith(": boom [INCIDENT-TRIGGER]"))
2096        d.addCallback(_check)
2097        return d
2098
2099    def test_oops_furl(self):
2100        self.basedir = os.path.join("logging", "Dumper", "oops_furl")
2101        if not os.path.exists(self.basedir):
2102            os.makedirs(self.basedir)
2103        fn = os.path.join(self.basedir, "logport.furl")
2104        f = open(fn, "w")
2105        f.write("pb://TUBID@HINTS/SWISSNUM\n")
2106        f.close()
2107
2108        d = dumper.LogDumper()
2109        # initialize the LogDumper() timestamp mode
2110        d.options = dumper.DumpOptions()
2111        d.options.parseOptions([fn])
2112        argv = ["flogtool", "dump", fn]
2113        (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2114        self.assertEqual(err, "Error: %s appears to be a FURL file.\nPerhaps you meant to run 'flogtool tail' instead of 'flogtool dump'?\n" % fn)
2115
2116PICKLE_DUMPFILE_B64 = """
2117KGRwMApTJ2hlYWRlcicKcDEKKGRwMgpTJ3RocmVzaG9sZCcKcDMKSTAKc1MncGlkJwpwNA
2118pJMTg3MjgKc1MndHlwZScKcDUKUydsb2ctZmlsZS1vYnNlcnZlcicKcDYKc1MndmVyc2lv
2119bnMnCnA3CihkcDgKUydmb29sc2NhcCcKcDkKUycwLjkuMSsyMi5nNzhlNWEzZC5kaXJ0eS
2120cKcDEwCnNTJ3R3aXN0ZWQnCnAxMQpTJzE1LjUuMCcKcDEyCnNzcy6AAn1xAChVBGZyb21x
2121AVUFbG9jYWxxAlUHcnhfdGltZXEDR0HVmqGrUXpjVQFkcQR9cQUoVQVsZXZlbHEGSxRVC2
2122luY2FybmF0aW9ucQdVCMZQLsaodzvDcQhOhnEJVQhmYWNpbGl0eXEKVQxiaWcuZmFjaWxp
2123dHlxC1UDbnVtcQxLAFUEdGltZXENR0HVmqGrRFtbVQdtZXNzYWdlcQ5VA29uZXEPdXUugA
2124J9cQAoVQRmcm9tcQFVBWxvY2FscQJVB3J4X3RpbWVxA0dB1Zqhq1F+s1UBZHEEfXEFKFUH
2125bWVzc2FnZXEGVQN0d29xB1UDbnVtcQhLAVUEdGltZXEJR0HVmqGrUU6cVQtpbmNhcm5hdG
2126lvbnEKVQjGUC7GqHc7w3ELToZxDFUFbGV2ZWxxDUsTdXUugAJ9cQAoVQRmcm9tcQFVBWxv
2127Y2FscQJVB3J4X3RpbWVxA0dB1Zqhq1GAiFUBZHEEfXEFKFUFbGV2ZWxxBksUVQtpbmNhcm
21285hdGlvbnEHVQjGUC7GqHc7w3EIToZxCVUDd2h5cQpOVQdmYWlsdXJlcQsoY2Zvb2xzY2Fw
2129LmNhbGwKQ29waWVkRmFpbHVyZQpxDG9xDX1xDyhVAnRicRBOVQl0cmFjZWJhY2txEVSBAw
2130AAVHJhY2ViYWNrIChtb3N0IHJlY2VudCBjYWxsIGxhc3QpOgogIEZpbGUgIi9Vc2Vycy93
2131YXJuZXIvc3R1ZmYvcHl0aG9uL2Zvb2xzY2FwL3ZlL2xpYi9weXRob24yLjcvc2l0ZS1wYW
2132NrYWdlcy90d2lzdGVkL3RyaWFsL19hc3luY3Rlc3QucHkiLCBsaW5lIDExMiwgaW4gX3J1
2133bgogICAgdXRpbHMucnVuV2l0aFdhcm5pbmdzU3VwcHJlc3NlZCwgc2VsZi5fZ2V0U3VwcH
2134Jlc3MoKSwgbWV0aG9kKQogIEZpbGUgIi9Vc2Vycy93YXJuZXIvc3R1ZmYvcHl0aG9uL2Zv
2135b2xzY2FwL3ZlL2xpYi9weXRob24yLjcvc2l0ZS1wYWNrYWdlcy90d2lzdGVkL2ludGVybm
2136V0L2RlZmVyLnB5IiwgbGluZSAxNTAsIGluIG1heWJlRGVmZXJyZWQKICAgIHJlc3VsdCA9
2137IGYoKmFyZ3MsICoqa3cpCiAgRmlsZSAiL1VzZXJzL3dhcm5lci9zdHVmZi9weXRob24vZm
21389vbHNjYXAvdmUvbGliL3B5dGhvbjIuNy9zaXRlLXBhY2thZ2VzL3R3aXN0ZWQvaW50ZXJu
2139ZXQvdXRpbHMucHkiLCBsaW5lIDE5NywgaW4gcnVuV2l0aFdhcm5pbmdzU3VwcHJlc3NlZA
2140ogICAgcmVzdWx0ID0gZigqYSwgKiprdykKICBGaWxlICIvVXNlcnMvd2FybmVyL3N0dWZm
2141L3B5dGhvbi9mb29sc2NhcC9mb29sc2NhcC90ZXN0L3Rlc3RfbG9nZ2luZy5weSIsIGxpbm
2142UgMTg4MywgaW4gdGVzdF9kdW1wCiAgICBkID0gc2VsZi5jcmVhdGVfbG9nZmlsZSgpCi0t
2143LSA8ZXhjZXB0aW9uIGNhdWdodCBoZXJlPiAtLS0KICBGaWxlICIvVXNlcnMvd2FybmVyL3
2144N0dWZmL3B5dGhvbi9mb29sc2NhcC9mb29sc2NhcC90ZXN0L3Rlc3RfbG9nZ2luZy5weSIs
2145IGxpbmUgMTg0MiwgaW4gY3JlYXRlX2xvZ2ZpbGUKICAgIHJhaXNlIFNhbXBsZUVycm9yKC
2146J3aG9vcHMxIikKZm9vbHNjYXAudGVzdC50ZXN0X2xvZ2dpbmcuU2FtcGxlRXJyb3I6IHdo
2147b29wczEKcRJVBXZhbHVlcRNVB3dob29wczFxFFUHcGFyZW50c3EVXXEWKFUmZm9vbHNjYX
2148AudGVzdC50ZXN0X2xvZ2dpbmcuU2FtcGxlRXJyb3JxF1UUZXhjZXB0aW9ucy5FeGNlcHRp
2149b25xGFUYZXhjZXB0aW9ucy5CYXNlRXhjZXB0aW9ucRlVEl9fYnVpbHRpbl9fLm9iamVjdH
2150EaZVUGZnJhbWVzcRtdcRxVBHR5cGVxHVUmZm9vbHNjYXAudGVzdC50ZXN0X2xvZ2dpbmcu
2151U2FtcGxlRXJyb3JxHlUFc3RhY2txH11xIHViVQNudW1xIUsCVQR0aW1lcSJHQdWaoatRVt
2152JVB21lc3NhZ2VxI1UFdGhyZWVxJFUHaXNFcnJvcnElSwF1dS6AAn1xAChVBGZyb21xAVUF
2153bG9jYWxxAlUHcnhfdGltZXEDR0HVmqGrUYXkVQFkcQR9cQUoVQdtZXNzYWdlcQZVBGZvdX
2154JxB1UDbnVtcQhLA1UEdGltZXEJR0HVmqGrUXU2VQtpbmNhcm5hdGlvbnEKVQjGUC7GqHc7
2155w3ELToZxDFUFbGV2ZWxxDUsUdXUu
2156"""
2157
2158PICKLE_INCIDENT_B64 = """
2159QlpoOTFBWSZTWUOW3hEAAHjfgAAQAcl/4QkhCAS/59/iQAGdWS2BJRTNNQ2oB6gGgPU9T1
2160BoEkintKGIABiAAaAwANGhowjJoNGmgMCpJDSaNGqbSMnqGhoaAP1S5rw5GxrNlUoxLXu2
2161sZ5TYy2rVCVNHMKgeDE97TBiw1hXtCfdSCISDpSlL61KFiacqWj9apY80J2PIpO7mde+vd
2162Jz18Myu4+djYU10JPMGU5vFAcUmmyk0kmcGUSMIDUJcKkog4W2EyyQStwwSYUEohGpr6Wm
2163F4KU7qccsjPJf8dTIv3ydZM5hpkW41JjJ8j0PETxlRRVFSeZYsqFU+hufU3n5O3hmYASDC
2164DhWMHFPJE7nXCYRsz5BGjktwUQCu6d4cixrgmGYLYA7JVCM7UqkMDVD9EMaclrFuayYGBR
2165xMIwXxM9pjeUuZVv2ceR5E6FSWpVRKKD98ObK5wmGmU9vqNBKqjp0wwqZlZ3x3nA4n+LTS
2166rmhbVjNyWeh/xdyRThQkEOW3hE
2167"""
2168
2169class OldPickleDumper(unittest.TestCase):
2170    def test_dump(self):
2171        self.basedir = "logging/OldPickleDumper/dump"
2172        if not os.path.exists(self.basedir):
2173            os.makedirs(self.basedir)
2174        fn = os.path.join(self.basedir, "dump.flog")
2175        with open(fn, "wb") as f:
2176            f.write(base64.b64decode(PICKLE_DUMPFILE_B64))
2177
2178        argv = ["flogtool", "dump", fn]
2179        (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2180        self.assertEqual(out, "")
2181        self.failUnlessIn("which cannot be loaded safely", err)
2182
2183    def test_incident(self):
2184        self.basedir = "logging/OldPickleDumper/incident"
2185        if not os.path.exists(self.basedir):
2186            os.makedirs(self.basedir)
2187        fn = os.path.join(self.basedir,
2188                          "incident-2015-12-11--08-18-28Z-uqyuiea.flog.bz2")
2189        with open(fn, "wb") as f:
2190            f.write(base64.b64decode(PICKLE_INCIDENT_B64))
2191
2192        argv = ["flogtool", "dump", fn]
2193        (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2194        self.assertEqual(out, "")
2195        self.failUnlessIn("which cannot be loaded safely", err)
2196
2197class Filter(unittest.TestCase, LogfileWriterMixin, LogfileReaderMixin):
2198
2199    def compare_events(self, a, b):
2200        ## # cmp(a,b) won't quite work, because two instances of CopiedFailure
2201        ## # loaded from the same pickle don't compare as equal
2202
2203        # in fact we no longer create CopiedFailure instances in logs, so a
2204        # simple failUnlessEqual will now suffice
2205        self.assertEqual(a, b)
2206
2207
2208    def test_basic(self):
2209        self.basedir = "logging/Filter/basic"
2210        d = self.create_logfile()
2211        def _check(fn):
2212            events = self._read_logfile(fn)
2213            count = len(events)
2214            assert count == 5
2215
2216            dirname,filename = os.path.split(fn)
2217            fn2 = os.path.join(dirname, "filtered-" + filename)
2218
2219            # pass-through
2220            argv = ["flogtool", "filter", fn, fn2]
2221            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2222            self.assertTrue("copied 5 of 5 events into new file" in out, out)
2223            self.compare_events(events, self._read_logfile(fn2))
2224
2225            # convert to .bz2 while we're at it
2226            fn2bz2 = fn2 + ".bz2"
2227            argv = ["flogtool", "filter", fn, fn2bz2]
2228            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2229            self.assertTrue("copied 5 of 5 events into new file" in out, out)
2230            self.compare_events(events, self._read_logfile(fn2bz2))
2231
2232            # modify the file in place
2233            argv = ["flogtool", "filter", "--above", "20", fn2]
2234            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2235            self.assertTrue("modifying event file in place" in out, out)
2236            self.assertTrue("--above: removing events below level 20" in out, out)
2237            self.assertTrue("copied 4 of 5 events into new file" in out, out)
2238            self.compare_events([events[0], events[1], events[3], events[4]],
2239                                self._read_logfile(fn2))
2240
2241            # modify the file in place, two-argument version
2242            argv = ["flogtool", "filter", fn2, fn2]
2243            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2244            self.assertTrue("modifying event file in place" in out, out)
2245            self.assertTrue("copied 4 of 4 events into new file" in out, out)
2246            self.compare_events([events[0], events[1], events[3], events[4]],
2247                                self._read_logfile(fn2))
2248
2249            # --above with a string argument
2250            argv = ["flogtool", "filter", "--above", "OPERATIONAL", fn, fn2]
2251            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2252            self.assertTrue("--above: removing events below level 20" in out, out)
2253            self.assertTrue("copied 4 of 5 events into new file" in out, out)
2254            self.compare_events([events[0], events[1], events[3], events[4]],
2255                                self._read_logfile(fn2))
2256
2257            t_one = events[1]["d"]["time"]
2258            # we can only pass integers into --before and --after, so we'll
2259            # just test that we get all or nothing
2260            argv = ["flogtool", "filter", "--before", str(int(t_one - 10)),
2261                    fn, fn2]
2262            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2263            self.assertTrue("copied 1 of 5 events into new file" in out, out)
2264            # we always get the header, so it's 1 instead of 0
2265            self.compare_events(events[:1], self._read_logfile(fn2))
2266
2267            argv = ["flogtool", "filter", "--after", str(int(t_one + 10)),
2268                    fn, fn2]
2269            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2270            self.assertTrue("copied 1 of 5 events into new file" in out, out)
2271            self.compare_events(events[:1], self._read_logfile(fn2))
2272
2273            # --facility
2274            argv = ["flogtool", "filter", "--strip-facility", "big", fn, fn2]
2275            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2276            self.assertTrue("--strip-facility: removing events for big and children" in out, out)
2277            self.assertTrue("copied 4 of 5 events into new file" in out, out)
2278            self.compare_events([events[0],events[2],events[3],events[4]],
2279                                self._read_logfile(fn2))
2280
2281            # pass-through, --verbose, read from .bz2
2282            argv = ["flogtool", "filter", "--verbose", fn2bz2, fn2]
2283            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2284            self.assertTrue("copied 5 of 5 events into new file" in out, out)
2285            lines = [l.strip() for l in StringIO(out).readlines()]
2286            self.assertEqual(lines,
2287                                 ["HEADER", "0", "1", "2", "3",
2288                                  "copied 5 of 5 events into new file"])
2289            self.compare_events(events, self._read_logfile(fn2))
2290
2291            # --from . This normally takes a base32 tubid prefix, but the
2292            # things we've logged all say ["from"]="local". So just test
2293            # all-or-nothing.
2294            argv = ["flogtool", "filter", "--from", "local", fn, fn2]
2295            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2296            self.assertTrue("--from: retaining events only from tubid prefix local" in out, out)
2297            self.assertTrue("copied 5 of 5 events into new file" in out, out)
2298            self.compare_events(events, self._read_logfile(fn2))
2299
2300            argv = ["flogtool", "filter", "--from", "NOTlocal", fn, fn2]
2301            (out,err) = cli.run_flogtool(argv[1:], run_by_human=False)
2302            self.assertTrue("--from: retaining events only from tubid prefix NOTlocal" in out, out)
2303            self.assertTrue("copied 1 of 5 events into new file" in out, out)
2304            self.compare_events(events[:1], self._read_logfile(fn2))
2305
2306
2307        d.addCallback(_check)
2308        return d
2309
2310
2311@inlineCallbacks
2312def getPage(url):
2313    a = client.Agent(reactor)
2314    response = yield a.request(b"GET", six.ensure_binary(url))
2315    import warnings
2316    with warnings.catch_warnings():
2317        warnings.simplefilter("ignore")
2318        # Twisted can emit a spurious internal warning here ("Using readBody
2319        # with a transport that does not have an abortConnection method")
2320        # which seems to be https://twistedmatrix.com/trac/ticket/8227
2321        page = yield client.readBody(response)
2322    if response.code != 200:
2323        raise ValueError("request failed (%d), page contents were: %s" % (
2324            response.code, six.ensure_str(page)))
2325    returnValue(page)
2326
2327class Web(unittest.TestCase):
2328    def setUp(self):
2329        self.viewer = None
2330    def tearDown(self):
2331        d = defer.maybeDeferred(unittest.TestCase.tearDown, self)
2332        if self.viewer:
2333            d.addCallback(lambda res: self.viewer.stop())
2334        return d
2335
2336    @inlineCallbacks
2337    def test_basic(self):
2338        basedir = "logging/Web/basic"
2339        os.makedirs(basedir)
2340        l = log.FoolscapLogger()
2341        fn = os.path.join(basedir, "flog.out")
2342        ob = log.LogFileObserver(fn)
2343        l.addObserver(ob.msg)
2344        l.msg("one")
2345        lp = l.msg("two")
2346        l.msg("three", parent=lp, failure=failure.Failure(RuntimeError("yo")))
2347        l.msg("four", level=log.UNUSUAL)
2348        yield fireEventually()
2349        l.removeObserver(ob.msg)
2350        ob._stop()
2351
2352        portnum = allocate_tcp_port()
2353        argv = ["-p", "tcp:%d:interface=127.0.0.1" % portnum,
2354                "--quiet",
2355                fn]
2356        options = web.WebViewerOptions()
2357        options.parseOptions(argv)
2358        self.viewer = web.WebViewer()
2359        self.url = yield self.viewer.start(options)
2360        self.baseurl = self.url[:self.url.rfind("/")] + "/"
2361
2362        page = yield getPage(self.url)
2363        page = six.ensure_str(page)
2364        mypid = os.getpid()
2365        self.assertTrue("PID %s" % mypid in page,
2366                        "didn't see 'PID %s' in '%s'" % (mypid, page))
2367        self.assertTrue("Application Versions:" in page, page)
2368        self.assertTrue("foolscap: %s" % foolscap.__version__ in page, page)
2369        self.assertTrue("4 events covering" in page)
2370        self.assertTrue('href="summary/0-20">3 events</a> at level 20'
2371                        in page)
2372
2373        page = yield getPage(self.baseurl + "summary/0-20")
2374        page = six.ensure_str(page)
2375        self.assertTrue("Events at level 20" in page)
2376        self.assertTrue(": two" in page)
2377        self.assertFalse("four" in page)
2378
2379        def check_all_events(page):
2380            page = six.ensure_str(page)
2381            self.assertTrue("3 root events" in page)
2382            self.assertTrue(": one</span>" in page)
2383            self.assertTrue(": two</span>" in page)
2384            self.assertTrue(": three FAILURE:" in page)
2385            self.assertTrue(": UNUSUAL four</span>" in page)
2386
2387        page = yield getPage(self.baseurl + "all-events")
2388        check_all_events(page)
2389
2390        page = yield getPage(self.baseurl + "all-events?sort=number")
2391        check_all_events(page)
2392
2393        page = yield getPage(self.baseurl + "all-events?sort=time")
2394        check_all_events(page)
2395
2396        page = yield getPage(self.baseurl + "all-events?sort=nested")
2397        check_all_events(page)
2398
2399        page = yield getPage(self.baseurl + "all-events?timestamps=short-local")
2400        check_all_events(page)
2401
2402        page = yield getPage(self.baseurl + "all-events?timestamps=utc")
2403        check_all_events(page)
2404
2405
2406
2407class Bridge(unittest.TestCase):
2408    def test_foolscap_to_twisted(self):
2409        fl = log.FoolscapLogger()
2410        tw = twisted_log.LogPublisher()
2411        log.bridgeLogsToTwisted(None, fl, tw)
2412        tw_out = []
2413        tw.addObserver(tw_out.append)
2414        fl_out = []
2415        fl.addObserver(fl_out.append)
2416
2417        fl.msg("one")
2418        fl.msg(format="two %(two)d", two=2)
2419        fl.msg("three", level=log.NOISY) # should be removed
2420        d = flushEventualQueue()
2421        def _check(res):
2422            self.assertEqual(len(fl_out), 3)
2423            self.assertEqual(fl_out[0]["message"], "one")
2424            self.assertEqual(fl_out[1]["format"], "two %(two)d")
2425            self.assertEqual(fl_out[2]["message"], "three")
2426
2427            self.assertEqual(len(tw_out), 2)
2428            self.assertEqual(tw_out[0]["message"], ("one",))
2429            self.assertTrue(tw_out[0]["from-foolscap"])
2430            self.assertEqual(tw_out[1]["message"], ("two 2",))
2431            self.assertTrue(tw_out[1]["from-foolscap"])
2432
2433        d.addCallback(_check)
2434        return d
2435
2436    def test_twisted_to_foolscap(self):
2437        fl = log.FoolscapLogger()
2438        tw = twisted_log.LogPublisher()
2439        log.bridgeLogsFromTwisted(None, tw, fl)
2440        tw_out = []
2441        tw.addObserver(tw_out.append)
2442        fl_out = []
2443        fl.addObserver(fl_out.append)
2444
2445        tw.msg("one")
2446        tw.msg(format="two %(two)d", two=2)
2447        # twisted now has places (e.g. Factory.doStart) where the new
2448        # Logger.info() is called with arbitrary (unserializable) kwargs for
2449        # string formatting, which are passed into the old LogPublisher(),
2450        # from which they arrive in foolscap. Make sure we can tolerate that.
2451        # The rule is that foolscap immediately stringifies all events it
2452        # gets from twisted (with log.textFromEventDict), and doesn't store
2453        # the additional arguments. So it's ok to put an *unserializable*
2454        # argument into the log.msg() call, as long as it's still
2455        # *stringifyable*.
2456        unserializable = lambda: "unserializable"
2457        tw.msg(format="three is %(evil)s", evil=unserializable)
2458
2459        d = flushEventualQueue()
2460        def _check(res):
2461            self.assertEqual(len(tw_out), 3)
2462            self.assertEqual(tw_out[0]["message"], ("one",))
2463            self.assertEqual(tw_out[1]["format"], "two %(two)d")
2464            self.assertEqual(tw_out[1]["two"], 2)
2465            self.assertEqual(tw_out[2]["format"], "three is %(evil)s")
2466            self.assertEqual(tw_out[2]["evil"], unserializable)
2467            self.assertEqual(len(fl_out), 3)
2468            self.assertEqual(fl_out[0]["message"], "one")
2469            self.assertTrue(fl_out[0]["from-twisted"])
2470            self.assertEqual(fl_out[1]["message"], "two 2")
2471            self.assertTrue(fl_out[1]["from-twisted"])
2472            # str(unserializable) is like "<function <lambda> at 0xblahblah>"
2473            self.assertEqual(fl_out[2]["message"],
2474                                 "three is " + str(unserializable))
2475            self.assertTrue(fl_out[2]["from-twisted"])
2476
2477        d.addCallback(_check)
2478        return d
2479
2480    def test_twisted_logger_to_foolscap(self):
2481        if not twisted_logger:
2482            raise unittest.SkipTest("needs twisted.logger from Twisted>=15.2.0")
2483        new_pub = twisted_logger.LogPublisher()
2484        old_pub = twisted_log.LogPublisher(observerPublisher=new_pub,
2485                                           publishPublisher=new_pub)
2486        fl = log.FoolscapLogger()
2487        log.bridgeLogsFromTwisted(None, old_pub, fl)
2488        tw_out = []
2489        old_pub.addObserver(tw_out.append)
2490        fl_out = []
2491        fl.addObserver(fl_out.append)
2492
2493        tl = twisted_logger.Logger(observer=new_pub)
2494        tl.info("one")
2495        # note: new twisted logger wants PEP3101 format strings, {} not %
2496        tl.info(format="two {two}", two=2)
2497        # twisted's new Logger.info() takes arbitrary (unserializable) kwargs
2498        # for string formatting, and passes them into the old LogPublisher(),
2499        # so make sure we can tolerate that. The rule is that foolscap
2500        # stringifies all events it gets from twisted, and doesn't store the
2501        # additional arguments.
2502        unserializable = lambda: "unserializable"
2503        tl.info("three is {evil!s}", evil=unserializable)
2504
2505        d = flushEventualQueue()
2506        def _check(res):
2507            self.assertEqual(len(fl_out), 3)
2508            self.assertEqual(fl_out[0]["message"], "one")
2509            self.assertTrue(fl_out[0]["from-twisted"])
2510            self.assertEqual(fl_out[1]["message"], "two 2")
2511            self.assertFalse("two" in fl_out[1])
2512            self.assertTrue(fl_out[1]["from-twisted"])
2513            # str(unserializable) is like "<function <lambda> at 0xblahblah>"
2514            self.assertEqual(fl_out[2]["message"],
2515                                 "three is " + str(unserializable))
2516            self.assertTrue(fl_out[2]["from-twisted"])
2517
2518        d.addCallback(_check)
2519        return d
2520
2521    def test_no_loops(self):
2522        fl = log.FoolscapLogger()
2523        tw = twisted_log.LogPublisher()
2524        log.bridgeLogsFromTwisted(None, tw, fl)
2525        log.bridgeLogsToTwisted(None, fl, tw)
2526        tw_out = []
2527        tw.addObserver(tw_out.append)
2528        fl_out = []
2529        fl.addObserver(fl_out.append)
2530
2531        tw.msg("one")
2532        fl.msg("two")
2533
2534        d = flushEventualQueue()
2535        def _check(res):
2536            self.assertEqual(len(tw_out), 2)
2537            self.assertEqual(tw_out[0]["message"], ("one",))
2538            self.assertEqual(tw_out[1]["message"], ("two",))
2539
2540            self.assertEqual(len(fl_out), 2)
2541            self.assertEqual(fl_out[0]["message"], "one")
2542            self.assertEqual(fl_out[1]["message"], "two")
2543
2544        d.addCallback(_check)
2545        return d
2546
2547