1 2import os, sys, json, time, bz2, base64, re 3import six 4import mock 5from io import StringIO 6from zope.interface import implementer 7from twisted.trial import unittest 8from twisted.application import service 9from twisted.internet import defer, reactor 10from twisted.internet.defer import inlineCallbacks, returnValue 11try: 12 from twisted import logger as twisted_logger 13except ImportError: 14 twisted_logger = None 15from twisted.web import client 16from twisted.python import log as twisted_log 17from twisted.python import failure, runtime, usage 18import foolscap 19from foolscap.logging import gatherer, log, tail, incident, cli, web, \ 20 publish, dumper, flogfile 21from foolscap.logging.interfaces import RILogObserver 22from foolscap.util import format_time, allocate_tcp_port, ensure_dict_str 23from foolscap.eventual import fireEventually, flushEventualQueue 24from foolscap.tokens import NoLocationError 25from foolscap.test.common import PollMixin, StallMixin 26from foolscap.api import RemoteException, Referenceable, Tub 27 28 29class Basic(unittest.TestCase): 30 def testLog(self): 31 l = log.FoolscapLogger() 32 l.explain_facility("ui", "this terse string fully describes the gui") 33 l.msg("one") 34 l.msg("two") 35 l.msg(message="three") 36 l.msg("one=%d, two=%d", 1, 2) 37 l.msg("survive 100% of weird inputs") 38 l.msg(format="foo=%(foo)s, bar=%(bar)s", foo="foo", bar="bar") 39 l.msg() # useless, but make sure it doesn't crash 40 l.msg("ui message", facility="ui") 41 l.msg("so boring it won't even be generated", level=log.NOISY-1) 42 l.msg("blah blah", level=log.NOISY) 43 l.msg("opening file", level=log.OPERATIONAL) 44 l.msg("funny, that doesn't usually happen", level=log.UNUSUAL) 45 l.msg("configuration change noticed", level=log.INFREQUENT) 46 l.msg("error, but recoverable", level=log.CURIOUS) 47 l.msg("ok, that shouldn't have happened", level=log.WEIRD) 48 l.msg("hash doesn't match.. what the hell?", level=log.SCARY) 49 l.msg("I looked into the trap, ray", level=log.BAD) 50 51 def testStacktrace(self): 52 l = log.FoolscapLogger() 53 l.msg("how did we get here?", stacktrace=True) 54 55 def testFailure(self): 56 l = log.FoolscapLogger() 57 f1 = failure.Failure(ValueError("bad value")) 58 l.msg("failure1", failure=f1) 59 # real RemoteExceptions always wrap CopiedFailure, so this is not 60 # really accurate. However, it's a nuisance to create a real 61 # CopiedFailure: look in 62 # test_call.ExamineFailuresMixin._examine_raise for test code that 63 # exercises this properly. 64 f2 = failure.Failure(RemoteException(f1)) 65 l.msg("failure2", failure=f2) 66 67 def testParent(self): 68 l = log.FoolscapLogger() 69 p1 = l.msg("operation requested", level=log.OPERATIONAL) 70 l.msg("first step", level=log.NOISY, parent=p1) 71 l.msg("second step", level=log.NOISY, parent=p1) 72 l.msg("second step EXPLODED", level=log.WEIRD, parent=p1) 73 p2 = l.msg("third step", parent=p1) 74 l.msg("fourth step", parent=p1) 75 l.msg("third step deferred activity finally completed", parent=p2) 76 l.msg("operation complete", level=log.OPERATIONAL, parent=p1) 77 l.msg("override number, for some unknown reason", num=45) 78 79 def testTheLogger(self): 80 log.msg("This goes to the One True Logger") 81 82 def testTubLogger(self): 83 t = Tub() 84 t.log("this goes into the tub") 85 86class Advanced(unittest.TestCase): 87 88 def testObserver(self): 89 l = log.FoolscapLogger() 90 out = [] 91 l.addObserver(out.append) 92 l.set_generation_threshold(log.OPERATIONAL) 93 l.msg("one") 94 l.msg("two") 95 l.msg("ignored", level=log.NOISY) 96 d = fireEventually() 97 def _check(res): 98 self.assertEqual(len(out), 2) 99 self.assertEqual(out[0]["message"], "one") 100 self.assertEqual(out[1]["message"], "two") 101 d.addCallback(_check) 102 return d 103 104 def testFileObserver(self): 105 basedir = "logging/Advanced/FileObserver" 106 os.makedirs(basedir) 107 l = log.FoolscapLogger() 108 fn = os.path.join(basedir, "observer-log.out") 109 ob = log.LogFileObserver(fn) 110 l.addObserver(ob.msg) 111 l.msg("one") 112 l.msg("two") 113 d = fireEventually() 114 def _check(res): 115 l.removeObserver(ob.msg) 116 ob._logFile.close() 117 f = open(fn, "rb") 118 expected_magic = f.read(len(flogfile.MAGIC)) 119 self.assertEqual(expected_magic, flogfile.MAGIC) 120 events = [] 121 for line in f: 122 events.append(json.loads(line.decode("utf-8"))) 123 self.assertEqual(len(events), 3) 124 self.assertEqual(events[0]["header"]["type"], 125 "log-file-observer") 126 self.assertEqual(events[0]["header"]["threshold"], 127 log.OPERATIONAL) 128 self.assertEqual(events[1]["from"], "local") 129 self.assertEqual(events[2]["d"]["message"], "two") 130 d.addCallback(_check) 131 return d 132 133 def testDisplace(self): 134 l = log.FoolscapLogger() 135 l.set_buffer_size(log.OPERATIONAL, 3) 136 l.msg("one") 137 l.msg("two") 138 l.msg("three") 139 items = l.buffers[None][log.OPERATIONAL] 140 self.assertEqual(len(items), 3) 141 l.msg("four") # should displace "one" 142 self.assertEqual(len(items), 3) 143 m0 = items[0] 144 self.assertEqual(type(m0), dict) 145 self.assertEqual(m0['message'], "two") 146 self.assertEqual(items[-1]['message'], "four") 147 148 def testFacilities(self): 149 l = log.FoolscapLogger() 150 l.explain_facility("ui", "This is the UI.") 151 l.msg("one", facility="ui") 152 l.msg("two") 153 154 items = l.buffers["ui"][log.OPERATIONAL] 155 self.assertEqual(len(items), 1) 156 self.assertEqual(items[0]["message"], "one") 157 158 def testOnePriority(self): 159 l = log.FoolscapLogger() 160 l.msg("one", level=log.NOISY) 161 l.msg("two", level=log.WEIRD) 162 l.msg("three", level=log.NOISY) 163 164 items = l.buffers[None][log.NOISY] 165 self.assertEqual(len(items), 2) 166 self.assertEqual(items[0]['message'], "one") 167 self.assertEqual(items[1]['message'], "three") 168 169 items = l.buffers[None][log.WEIRD] 170 self.assertEqual(len(items), 1) 171 self.assertEqual(items[0]['message'], "two") 172 173 def testPriorities(self): 174 l = log.FoolscapLogger() 175 l.set_buffer_size(log.NOISY, 3) 176 l.set_buffer_size(log.WEIRD, 3) 177 l.set_buffer_size(log.WEIRD, 4, "new.facility") 178 179 l.msg("one", level=log.WEIRD) 180 l.msg("two", level=log.NOISY) 181 l.msg("three", level=log.NOISY) 182 l.msg("four", level=log.WEIRD) 183 l.msg("five", level=log.NOISY) 184 l.msg("six", level=log.NOISY) 185 l.msg("seven", level=log.NOISY) 186 187 items = l.buffers[None][log.NOISY] 188 self.assertEqual(len(items), 3) 189 self.assertEqual(items[0]['message'], "five") 190 self.assertEqual(items[-1]['message'], "seven") 191 192 items = l.buffers[None][log.WEIRD] 193 self.assertEqual(len(items), 2) 194 self.assertEqual(items[0]['message'], "one") 195 self.assertEqual(items[-1]['message'], "four") 196 197 def testHierarchy(self): 198 l = log.FoolscapLogger() 199 200 n = l.msg("one") 201 n2 = l.msg("two", parent=n) 202 l.msg("three", parent=n2) 203 204class ErrorfulQualifier(incident.IncidentQualifier): 205 def __init__(self): 206 self._first = True 207 208 def check_event(self, ev): 209 if self._first: 210 self._first = False 211 raise ValueError("oops") 212 return False 213 214class NoStdio(unittest.TestCase): 215 # bug #244 is caused, in part, by Foolscap-side logging failures which 216 # write an error message ("unable to serialize X") to stderr, which then 217 # gets captured by twisted's logging (when run in a program under 218 # twistd), then fed back into foolscap logging. Check that unserializable 219 # objects don't cause anything to be written to a mock stdout/stderr 220 # object. 221 # 222 # FoolscapLogger used stdio in two places: 223 # * msg() when format_message() throws 224 # * add_event() when IncidentQualifier.event() throws 225 226 def setUp(self): 227 self.fl = log.FoolscapLogger() 228 self.mock_stdout = StringIO() 229 self.mock_stderr = StringIO() 230 self.orig_stdout = sys.stdout 231 self.orig_stderr = sys.stderr 232 sys.stdout = self.mock_stdout 233 sys.stderr = self.mock_stderr 234 235 def tearDown(self): 236 sys.stdout = self.orig_stdout 237 sys.stderr = self.orig_stderr 238 239 def check_stdio(self): 240 self.assertEqual(self.mock_stdout.getvalue(), "") 241 self.assertEqual(self.mock_stderr.getvalue(), "") 242 243 def test_unformattable(self): 244 self.fl.msg(format="one=%(unformattable)s") # missing format key 245 self.check_stdio() 246 247 def test_unserializable_incident(self): 248 # one #244 pathway involved an unserializable event that caused an 249 # exception during IncidentReporter.incident_declared(), as it tried 250 # to record all recent events. We can test the lack of stdio by using 251 # a qualifier that throws an error directly. 252 self.fl.setIncidentQualifier(ErrorfulQualifier()) 253 self.fl.activate_incident_qualifier() 254 # make sure we set it up correctly 255 self.assertTrue(self.fl.active_incident_qualifier) 256 self.fl.msg("oops", arg=lambda : "lambdas are unserializable", 257 level=log.BAD) 258 self.check_stdio() 259 # The internal error will cause a new "metaevent" to be recorded. The 260 # original event may or may not get recorded first, depending upon 261 # the error (i.e. does it happen before or after buffer.append is 262 # called). Also, get_buffered_events() is unordered. So search for 263 # the right one. 264 events = [e for e in self.fl.get_buffered_events() 265 if e.get("facility") == "foolscap/internal-error"] 266 self.assertEqual(len(events), 1) 267 m = events[0]["message"] 268 expected = "internal error in log._msg, args=('oops',)" 269 self.assertTrue(m.startswith(expected), m) 270 self.assertIn("ValueError('oops'", m) 271 272def ser(what): 273 return json.dumps(what, cls=flogfile.ExtendedEncoder) 274 275class Serialization(unittest.TestCase): 276 def test_lazy_serialization(self): 277 # Both foolscap and twisted allow (somewhat) arbitrary kwargs in the 278 # log.msg() call. Twisted will either discard the event (if nobody is 279 # listening), or stringify it right away. 280 # 281 # Foolscap does neither. It records the event (kwargs and all) in a 282 # circular buffer, so a later observer can learn about them (either 283 # 'flogtool tail' or a stored Incident file). And it stores the 284 # arguments verbatim, leaving stringification to the future observer 285 # (if they want it), so tools can filter events without using regexps 286 # or parsing prematurely-flattened strings. 287 # 288 # Test this by logging a mutable object, modifying it, then checking 289 # the buffer. We expect to see the modification. 290 fl = log.FoolscapLogger() 291 mutable = {"key": "old"} 292 fl.msg("one", arg=mutable) 293 mutable["key"] = "new" 294 events = list(fl.get_buffered_events()) 295 self.assertTrue(events[0]["arg"]["key"], "new") 296 297 def test_failure(self): 298 try: 299 raise ValueError("oops5") 300 except ValueError: 301 f = failure.Failure() 302 out = json.loads(ser({"f": f}))["f"] 303 self.assertEqual(out["@"], "Failure") 304 self.assertIn("ValueError: oops5", out["repr"]) 305 self.assertIn("traceback", out) 306 307 def test_unserializable(self): 308 # The code that serializes log events to disk (with JSON) tries very 309 # hard to get *something* recorded, even when you give log.msg() 310 # something strange. 311 self.assertEqual(json.loads(ser({"a": 1})), {"a": 1}) 312 unjsonable = [set([1,2])] 313 self.assertEqual(json.loads(ser(unjsonable)), 314 [{'@': 'UnJSONable', 315 'repr': repr(set([1, 2])), 316 'message': "log.msg() was given an object that could not be encoded into JSON. I've replaced it with this UnJSONable object. The object's repr is in .repr"}]) 317 318 # if the repr() fails, we get a different message 319 class Unreprable: 320 def __repr__(self): 321 raise ValueError("oops7") 322 unrep = [Unreprable()] 323 self.assertEqual(json.loads(ser(unrep)), 324 [{"@": "Unreprable", 325 "exception_repr": repr(ValueError('oops7')), 326 "message": "log.msg() was given an object that could not be encoded into JSON, and when I tried to repr() it I got an error too. I've put the repr of the exception in .exception_repr", 327 }]) 328 329 # and if repr()ing the failed repr() exception fails, we give up 330 real_repr = repr 331 def really_bad_repr(o): 332 if isinstance(o, ValueError): 333 raise TypeError("oops9") 334 return real_repr(o) 335 if six.PY2: 336 import __builtin__ 337 assert __builtin__.repr is repr 338 with mock.patch("__builtin__.repr", really_bad_repr): 339 s = ser(unrep) 340 else: 341 import builtins 342 assert builtins.repr is repr 343 with mock.patch("builtins.repr", really_bad_repr): 344 s = ser(unrep) 345 self.assertEqual(json.loads(s), 346 [{"@": "ReallyUnreprable", 347 "message": "log.msg() was given an object that could not be encoded into JSON, and when I tried to repr() it I got an error too. That exception wasn't repr()able either. I give up. Good luck.", 348 }]) 349 350 def test_not_pickle(self): 351 # Older versions of Foolscap used pickle to store events into the 352 # Incident log, and dealt with errors by dropping the event. Newer ones 353 # use JSON, and use a placeholder when errors occur. Test that 354 # pickleable (but not JSON-able) objects are *not* written to the file 355 # directly, but are replaced by an "unjsonable" placeholder. 356 basedir = "logging/Serialization/not_pickle" 357 os.makedirs(basedir) 358 fl = log.FoolscapLogger() 359 ir = incident.IncidentReporter(basedir, fl, "tubid") 360 ir.TRAILING_DELAY = None 361 fl.msg("first") 362 unjsonable = [object()] # still picklable 363 unserializable = [lambda: "neither pickle nor JSON can capture me"] 364 # having unserializble data in the logfile should not break the rest 365 fl.msg("unjsonable", arg=unjsonable) 366 fl.msg("unserializable", arg=unserializable) 367 fl.msg("last") 368 events = list(fl.get_buffered_events()) 369 # if unserializable data breaks incident reporting, this 370 # incident_declared() call will cause an exception 371 ir.incident_declared(events[0]) 372 # that won't record any trailing events, but does 373 # eventually(finished_Recording), so wait for that to conclude 374 d = flushEventualQueue() 375 def _check(_): 376 files = os.listdir(basedir) 377 self.assertEqual(len(files), 1) 378 fn = os.path.join(basedir, files[0]) 379 events = list(flogfile.get_events(fn)) 380 self.assertEqual(events[0]["header"]["type"], "incident") 381 self.assertEqual(events[1]["d"]["message"], "first") 382 self.assertEqual(len(events), 5) 383 # actually this should record 5 events: both unrecordable events 384 # should be replaced with error messages that *are* recordable 385 self.assertEqual(events[2]["d"]["message"], "unjsonable") 386 self.assertEqual(events[2]["d"]["arg"][0]["@"], "UnJSONable") 387 self.assertEqual(events[3]["d"]["message"], "unserializable") 388 self.assertEqual(events[3]["d"]["arg"][0]["@"], "UnJSONable") 389 self.assertEqual(events[4]["d"]["message"], "last") 390 d.addCallback(_check) 391 return d 392 393class SuperstitiousQualifier(incident.IncidentQualifier): 394 def check_event(self, ev): 395 if "thirteen" in ev.get("message", ""): 396 return True 397 return False 398 399class ImpatientReporter(incident.IncidentReporter): 400 TRAILING_DELAY = 1.0 401 TRAILING_EVENT_LIMIT = 3 402 403class NoFollowUpReporter(incident.IncidentReporter): 404 TRAILING_DELAY = None 405 406class LogfileReaderMixin: 407 def _read_logfile(self, fn): 408 return list(flogfile.get_events(fn)) 409 410class Incidents(unittest.TestCase, PollMixin, LogfileReaderMixin): 411 def test_basic(self): 412 l = log.FoolscapLogger() 413 self.assertEqual(l.incidents_declared, 0) 414 # no qualifiers are run until a logdir is provided 415 l.msg("one", level=log.BAD) 416 self.assertEqual(l.incidents_declared, 0) 417 l.setLogDir("logging/Incidents/basic") 418 l.setLogDir("logging/Incidents/basic") # this should be idempotent 419 got_logdir = l.logdir 420 self.assertEqual(got_logdir, 421 os.path.abspath("logging/Incidents/basic")) 422 # qualifiers should be run now 423 l.msg("two") 424 l.msg("3-trigger", level=log.BAD) 425 self.assertEqual(l.incidents_declared, 1) 426 self.assertTrue(l.get_active_incident_reporter()) 427 # at this point, the uncompressed logfile should be present, and it 428 # should contain all the events up to and including the trigger 429 files = os.listdir(got_logdir) 430 self.assertEqual(len(files), 2) 431 # the uncompressed one will sort earlier, since it lacks the .bz2 432 # extension 433 files.sort() 434 self.assertEqual(files[0] + ".bz2.tmp", files[1]) 435 # unix systems let us look inside the uncompressed file while it's 436 # still being written to by the recorder 437 if runtime.platformType == "posix": 438 events = self._read_logfile(os.path.join(got_logdir, files[0])) 439 self.assertEqual(len(events), 1+3) 440 #header = events[0] 441 self.assertTrue("header" in events[0]) 442 self.assertEqual(events[0]["header"]["trigger"]["message"], 443 "3-trigger") 444 self.assertEqual(events[0]["header"]["versions"]["foolscap"], 445 foolscap.__version__) 446 self.assertEqual(events[3]["d"]["message"], "3-trigger") 447 448 l.msg("4-trailing") 449 # this will take 5 seconds to finish trailing events 450 d = self.poll(lambda: bool(l.incidents_recorded), 1.0) 451 def _check(res): 452 self.assertEqual(len(l.recent_recorded_incidents), 1) 453 fn = l.recent_recorded_incidents[0] 454 events = self._read_logfile(fn) 455 self.assertEqual(len(events), 1+4) 456 self.assertTrue("header" in events[0]) 457 self.assertEqual(events[0]["header"]["trigger"]["message"], 458 "3-trigger") 459 self.assertEqual(events[0]["header"]["versions"]["foolscap"], 460 foolscap.__version__) 461 self.assertEqual(events[3]["d"]["message"], "3-trigger") 462 self.assertEqual(events[4]["d"]["message"], "4-trailing") 463 464 d.addCallback(_check) 465 return d 466 467 def test_qualifier1(self): 468 l = log.FoolscapLogger() 469 l.setIncidentQualifier(SuperstitiousQualifier()) 470 l.setLogDir("logging/Incidents/qualifier1") 471 l.msg("1", level=log.BAD) 472 self.assertEqual(l.incidents_declared, 0) 473 474 def test_qualifier2(self): 475 l = log.FoolscapLogger() 476 # call them in the other order 477 l.setLogDir("logging/Incidents/qualifier2") 478 l.setIncidentQualifier(SuperstitiousQualifier()) 479 l.msg("1", level=log.BAD) 480 self.assertEqual(l.incidents_declared, 0) 481 482 def test_customize(self): 483 l = log.FoolscapLogger() 484 l.setIncidentQualifier(SuperstitiousQualifier()) 485 l.setLogDir("logging/Incidents/customize") 486 # you set the reporter *class*, not an instance 487 bad_ir = ImpatientReporter("basedir", "logger", "tubid") 488 self.assertRaises((AssertionError, TypeError), 489 l.setIncidentReporterFactory, bad_ir) 490 l.setIncidentReporterFactory(ImpatientReporter) 491 l.msg("1", level=log.BAD) 492 self.assertEqual(l.incidents_declared, 0) 493 l.msg("2") 494 l.msg("thirteen is scary") 495 self.assertEqual(l.incidents_declared, 1) 496 l.msg("4") 497 l.msg("5") 498 l.msg("6") # this should hit the trailing event limit 499 l.msg("7") # this should not be recorded 500 d = self.poll(lambda: bool(l.incidents_recorded), 1.0) 501 def _check(res): 502 self.assertEqual(len(l.recent_recorded_incidents), 1) 503 fn = l.recent_recorded_incidents[0] 504 events = self._read_logfile(fn) 505 self.assertEqual(len(events), 1+6) 506 self.assertEqual(events[-1]["d"]["message"], "6") 507 d.addCallback(_check) 508 return d 509 510 def test_overlapping(self): 511 l = log.FoolscapLogger() 512 l.setLogDir("logging/Incidents/overlapping") 513 got_logdir = l.logdir 514 self.assertEqual(got_logdir, 515 os.path.abspath("logging/Incidents/overlapping")) 516 d = defer.Deferred() 517 def _go(name, trigger): 518 d.callback( (name, trigger) ) 519 l.addImmediateIncidentObserver(_go) 520 l.setIncidentReporterFactory(ImpatientReporter) 521 l.msg("1") 522 l.msg("2-trigger", level=log.BAD) 523 self.assertEqual(l.incidents_declared, 1) 524 self.assertTrue(l.get_active_incident_reporter()) 525 l.msg("3-trigger", level=log.BAD) 526 self.assertEqual(l.incidents_declared, 2) 527 self.assertTrue(l.get_active_incident_reporter()) 528 529 def _check(res): 530 self.assertEqual(l.incidents_recorded, 1) 531 self.assertEqual(len(l.recent_recorded_incidents), 1) 532 # at this point, the logfile should be present, and it should 533 # contain all the events up to and including both triggers 534 535 files = os.listdir(got_logdir) 536 self.assertEqual(len(files), 1) 537 events = self._read_logfile(os.path.join(got_logdir, files[0])) 538 539 self.assertEqual(len(events), 1+3) 540 self.assertEqual(events[0]["header"]["trigger"]["message"], 541 "2-trigger") 542 self.assertEqual(events[1]["d"]["message"], "1") 543 self.assertEqual(events[2]["d"]["message"], "2-trigger") 544 self.assertEqual(events[3]["d"]["message"], "3-trigger") 545 d.addCallback(_check) 546 547 return d 548 549 def test_classify(self): 550 l = log.FoolscapLogger() 551 l.setIncidentReporterFactory(incident.NonTrailingIncidentReporter) 552 l.setLogDir("logging/Incidents/classify") 553 got_logdir = l.logdir 554 l.msg("foom", level=log.BAD, failure=failure.Failure(RuntimeError())) 555 d = fireEventually() 556 def _check(res): 557 files = [fn for fn in os.listdir(got_logdir) if fn.endswith(".bz2")] 558 self.assertEqual(len(files), 1) 559 560 ic = incident.IncidentClassifier() 561 def classify_foom(trigger): 562 if "foom" in trigger.get("message",""): 563 return "foom" 564 ic.add_classifier(classify_foom) 565 options = incident.ClassifyOptions() 566 options.parseOptions([os.path.join(got_logdir, fn) for fn in files]) 567 options.stdout = StringIO() 568 ic.run(options) 569 out = options.stdout.getvalue() 570 self.assertTrue(out.strip().endswith(": foom"), out) 571 572 ic2 = incident.IncidentClassifier() 573 options = incident.ClassifyOptions() 574 options.parseOptions(["--verbose"] + 575 [os.path.join(got_logdir, fn) for fn in files]) 576 options.stdout = StringIO() 577 ic2.run(options) 578 out = options.stdout.getvalue() 579 self.failUnlessIn(".flog.bz2: unknown\n", out) 580 # this should have a JSON-formatted trigger dictionary 581 self.assertTrue(re.search(r'u?"message": u?"foom"', out), out) 582 self.failUnlessIn('"num": 0', out) 583 self.failUnlessIn("RuntimeError", out) 584 585 d.addCallback(_check) 586 return d 587 588@implementer(RILogObserver) 589class Observer(Referenceable): 590 def __init__(self): 591 self.messages = [] 592 self.incidents = [] 593 self.done_with_incidents = False 594 def remote_msg(self, d): 595 self.messages.append(d) 596 597 def remote_new_incident(self, name, trigger): 598 self.incidents.append( (name, trigger) ) 599 def remote_done_with_incident_catchup(self): 600 self.done_with_incidents = True 601 602class MyGatherer(gatherer.GathererService): 603 verbose = False 604 605 def __init__(self, rotate, use_bzip, basedir): 606 portnum = allocate_tcp_port() 607 with open(os.path.join(basedir, "port"), "w") as f: 608 f.write("tcp:%d\n" % portnum) 609 with open(os.path.join(basedir, "location"), "w") as f: 610 f.write("tcp:127.0.0.1:%d\n" % portnum) 611 gatherer.GathererService.__init__(self, rotate, use_bzip, basedir) 612 613 def remote_logport(self, nodeid, publisher): 614 d = gatherer.GathererService.remote_logport(self, nodeid, publisher) 615 d.addBoth(lambda res: self.d.callback(publisher)) 616 617class SampleError(Exception): 618 """a sample error""" 619 620class Publish(PollMixin, unittest.TestCase): 621 def setUp(self): 622 self.parent = service.MultiService() 623 self.parent.startService() 624 # make the MAX_QUEUE_SIZE smaller to speed up the test, and restore 625 # it when we're done. The normal value is 2000, chosen to bound the 626 # queue to perhaps 1MB. Lowering the size from 2000 to 500 speeds up 627 # the test from about 10s to 5s. 628 self.saved_queue_size = publish.Subscription.MAX_QUEUE_SIZE 629 publish.Subscription.MAX_QUEUE_SIZE = 500 630 631 def tearDown(self): 632 publish.Subscription.MAX_QUEUE_SIZE = self.saved_queue_size 633 d = defer.succeed(None) 634 d.addCallback(lambda res: self.parent.stopService()) 635 d.addCallback(flushEventualQueue) 636 return d 637 638 def test_logport_furlfile1(self): 639 basedir = "logging/Publish/logport_furlfile1" 640 os.makedirs(basedir) 641 furlfile = os.path.join(basedir, "logport.furl") 642 t = Tub() 643 # setOption before setServiceParent 644 t.setOption("logport-furlfile", furlfile) 645 t.setServiceParent(self.parent) 646 self.assertRaises(NoLocationError, t.getLogPort) 647 self.assertRaises(NoLocationError, t.getLogPortFURL) 648 portnum = allocate_tcp_port() 649 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 650 self.assertFalse(os.path.exists(furlfile)) 651 t.setLocation("127.0.0.1:%d" % portnum) 652 logport_furl = open(furlfile, "r").read().strip() 653 self.assertEqual(logport_furl, t.getLogPortFURL()) 654 655 def test_logport_furlfile2(self): 656 basedir = "logging/Publish/logport_furlfile2" 657 os.makedirs(basedir) 658 furlfile = os.path.join(basedir, "logport.furl") 659 t = Tub() 660 # setServiceParent before setOption 661 t.setServiceParent(self.parent) 662 self.assertRaises(NoLocationError, t.getLogPort) 663 self.assertRaises(NoLocationError, t.getLogPortFURL) 664 portnum = allocate_tcp_port() 665 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 666 t.setOption("logport-furlfile", furlfile) 667 self.assertFalse(os.path.exists(furlfile)) 668 t.setLocation("127.0.0.1:%d" % portnum) 669 logport_furl = open(furlfile, "r").read().strip() 670 self.assertEqual(logport_furl, t.getLogPortFURL()) 671 672 def test_logpublisher(self): 673 basedir = "logging/Publish/logpublisher" 674 os.makedirs(basedir) 675 furlfile = os.path.join(basedir, "logport.furl") 676 t = Tub() 677 t.setServiceParent(self.parent) 678 portnum = allocate_tcp_port() 679 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 680 self.assertRaises(NoLocationError, t.getLogPort) 681 self.assertRaises(NoLocationError, t.getLogPortFURL) 682 683 t.setLocation("127.0.0.1:%d" % portnum) 684 t.setOption("logport-furlfile", furlfile) 685 logport_furl = t.getLogPortFURL() 686 logport_furl2 = open(furlfile, "r").read().strip() 687 self.assertEqual(logport_furl, logport_furl2) 688 tw_log = twisted_log.LogPublisher() 689 tlb = t.setOption("bridge-twisted-logs", tw_log) 690 691 t2 = Tub() 692 t2.setServiceParent(self.parent) 693 ob = Observer() 694 695 d = t2.getReference(logport_furl) 696 def _got_logport(logport): 697 d = logport.callRemote("get_versions") 698 def _check(versions): 699 versions = ensure_dict_str(versions) 700 self.assertEqual(versions["foolscap"], 701 six.ensure_text(foolscap.__version__)) 702 d.addCallback(_check) 703 # note: catch_up=False, so this message won't be sent 704 log.msg("message 0 here, before your time") 705 d.addCallback(lambda res: 706 logport.callRemote("subscribe_to_all", ob)) 707 def _emit(subscription): 708 self._subscription = subscription 709 log.msg("message 1 here") 710 tw_log.msg("message 2 here") 711 712 # switch to generic (no tubid) bridge 713 log.unbridgeLogsFromTwisted(tw_log, tlb) 714 log.bridgeLogsFromTwisted(None, tw_log) 715 716 tw_log.msg("message 3 here") 717 tw_log.msg(format="%(foo)s is foo", foo="foo") 718 log.err(failure.Failure(SampleError("err1"))) 719 log.err(SampleError("err2")) 720 # simulate twisted.python.log.err, which is unfortunately 721 # not a method of LogPublisher 722 def err(_stuff=None, _why=None): 723 if isinstance(_stuff, Exception): 724 tw_log.msg(failure=failure.Failure(_stuff), 725 isError=1, why=_why) 726 else: 727 tw_log.msg(failure=_stuff, isError=1, why=_why) 728 err(failure.Failure(SampleError("err3"))) 729 err(SampleError("err4")) 730 d.addCallback(_emit) 731 # wait until we've seen all the messages, or the test times out 732 d.addCallback(lambda res: self.poll(lambda: len(ob.messages) >= 8)) 733 def _check_observer(res): 734 msgs = ob.messages 735 self.assertEqual(len(msgs), 8) 736 self.assertEqual(msgs[0]["message"], "message 1 here") 737 self.assertEqual(msgs[1]["from-twisted"], True) 738 self.assertEqual(msgs[1]["message"], "message 2 here") 739 self.assertEqual(msgs[1]["tubID"], t.tubID) 740 self.assertEqual(msgs[2]["from-twisted"], True) 741 self.assertEqual(msgs[2]["message"], "message 3 here") 742 self.assertEqual(msgs[2]["tubID"], None) 743 self.assertEqual(msgs[3]["from-twisted"], True) 744 self.assertEqual(msgs[3]["message"], "foo is foo") 745 746 # check the errors 747 self.assertEqual(msgs[4]["message"], "") 748 self.assertTrue(msgs[4]["isError"]) 749 self.assertTrue("failure" in msgs[4]) 750 self.assertTrue(msgs[4]["failure"].check(SampleError)) 751 self.assertTrue("err1" in str(msgs[4]["failure"])) 752 self.assertEqual(msgs[5]["message"], "") 753 self.assertTrue(msgs[5]["isError"]) 754 self.assertTrue("failure" in msgs[5]) 755 self.assertTrue(msgs[5]["failure"].check(SampleError)) 756 self.assertTrue("err2" in str(msgs[5]["failure"])) 757 758 # errors coming from twisted are stringified 759 self.assertEqual(msgs[6]["from-twisted"], True) 760 self.assertTrue("Unhandled Error" in msgs[6]["message"]) 761 self.assertTrue("SampleError: err3" in msgs[6]["message"]) 762 self.assertTrue(msgs[6]["isError"]) 763 764 self.assertEqual(msgs[7]["from-twisted"], True) 765 self.assertTrue("Unhandled Error" in msgs[7]["message"]) 766 self.assertTrue("SampleError: err4" in msgs[7]["message"]) 767 self.assertTrue(msgs[7]["isError"]) 768 769 d.addCallback(_check_observer) 770 def _done(res): 771 return logport.callRemote("unsubscribe", self._subscription) 772 d.addCallback(_done) 773 return d 774 d.addCallback(_got_logport) 775 return d 776 777 def test_logpublisher_overload(self): 778 basedir = "logging/Publish/logpublisher_overload" 779 os.makedirs(basedir) 780 furlfile = os.path.join(basedir, "logport.furl") 781 t = Tub() 782 t.setServiceParent(self.parent) 783 portnum = allocate_tcp_port() 784 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 785 t.setLocation("127.0.0.1:%d" % portnum) 786 787 t.setOption("logport-furlfile", furlfile) 788 logport_furl = t.getLogPortFURL() 789 logport_furl2 = open(furlfile, "r").read().strip() 790 self.assertEqual(logport_furl, logport_furl2) 791 792 t2 = Tub() 793 t2.setServiceParent(self.parent) 794 ob = Observer() 795 796 d = t2.getReference(logport_furl) 797 def _got_logport(logport): 798 d = logport.callRemote("subscribe_to_all", ob) 799 def _emit(subscription): 800 self._subscription = subscription 801 for i in range(10000): 802 log.msg("message %d here" % i) 803 d.addCallback(_emit) 804 # now we wait until the observer has seen nothing for a full 805 # second. I'd prefer something faster and more deterministic, but 806 # this ought to handle the normal slow-host cases. 807 expected = publish.Subscription.MAX_QUEUE_SIZE 808 def _check_f(): 809 return bool(len(ob.messages) >= expected) 810 d.addCallback(lambda res: self.poll(_check_f, 0.2)) 811 # TODO: I'm not content with that polling, and would prefer to do 812 # something faster and more deterministic 813 #d.addCallback(fireEventually) 814 #d.addCallback(fireEventually) 815 def _check_observer(res): 816 msgs = ob.messages 817 self.assertEqual(len(msgs), expected) 818 # since we discard new messages during overload (and preserve 819 # old ones), we should see 0..MAX_QUEUE_SIZE-1. 820 got = [] 821 for m in msgs: 822 ignored1, number_s, ignored2 = m["message"].split() 823 number = int(number_s) 824 got.append(number) 825 self.assertEqual(got, sorted(got)) 826 self.assertEqual(got, list(range(expected))) 827 828 d.addCallback(_check_observer) 829 def _done(res): 830 return logport.callRemote("unsubscribe", self._subscription) 831 d.addCallback(_done) 832 return d 833 d.addCallback(_got_logport) 834 return d 835 836 def test_logpublisher_catchup(self): 837 basedir = "logging/Publish/logpublisher_catchup" 838 os.makedirs(basedir) 839 furlfile = os.path.join(basedir, "logport.furl") 840 t = Tub() 841 t.setServiceParent(self.parent) 842 portnum = allocate_tcp_port() 843 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 844 t.setLocation("127.0.0.1:%d" % portnum) 845 846 t.setOption("logport-furlfile", furlfile) 847 logport_furl = t.getLogPortFURL() 848 849 t2 = Tub() 850 t2.setServiceParent(self.parent) 851 ob = Observer() 852 853 d = t2.getReference(logport_furl) 854 def _got_logport(logport): 855 d = logport.callRemote("get_versions") 856 def _check_versions(versions): 857 versions = ensure_dict_str(versions) 858 self.assertEqual(versions["foolscap"], 859 six.ensure_text(foolscap.__version__)) 860 d.addCallback(_check_versions) 861 d.addCallback(lambda res: logport.callRemote("get_pid")) 862 def _check_pid(pid): 863 self.assertEqual(pid, os.getpid()) 864 d.addCallback(_check_pid) 865 # note: catch_up=True, so this message *will* be sent. Also note 866 # that we need this message to be unique, since our logger will 867 # stash messages recorded by other test cases, and we don't want 868 # to confuse the two. 869 log.msg("this is an early message") 870 d.addCallback(lambda res: 871 logport.callRemote("subscribe_to_all", ob, True)) 872 def _emit(subscription): 873 self._subscription = subscription 874 log.msg("this is a later message") 875 d.addCallback(_emit) 876 # wait until we've received the later message 877 def _check_f(): 878 for m in ob.messages: 879 if m.get("message") == "this is a later message": 880 return True 881 return False 882 d.addCallback(lambda res: self.poll(_check_f)) 883 def _check_observer(res): 884 msgs = ob.messages 885 # this gets everything that's been logged since the unit 886 # tests began. The Reconnector that's used by 887 # logport-furlfile will cause some uncertainty.. negotiation 888 # messages might be interleaved with the ones that we 889 # actually care about. So what we verify is that both of our 890 # messages appear *somewhere*, and that they show up in the 891 # correct order. 892 self.assertTrue(len(msgs) >= 2, len(msgs)) 893 first = None 894 second = None 895 for i,m in enumerate(msgs): 896 if m.get("message") == "this is an early message": 897 first = i 898 if m.get("message") == "this is a later message": 899 second = i 900 self.assertTrue(first is not None) 901 self.assertTrue(second is not None) 902 self.assertTrue(first < second, 903 "%d is not before %d" % (first, second)) 904 d.addCallback(_check_observer) 905 def _done(res): 906 return logport.callRemote("unsubscribe", self._subscription) 907 d.addCallback(_done) 908 return d 909 d.addCallback(_got_logport) 910 return d 911 912class IncidentPublisher(PollMixin, unittest.TestCase): 913 def setUp(self): 914 self.parent = service.MultiService() 915 self.parent.startService() 916 917 def tearDown(self): 918 d = defer.succeed(None) 919 d.addCallback(lambda res: self.parent.stopService()) 920 d.addCallback(flushEventualQueue) 921 return d 922 923 def _write_to(self, logdir, fn, data="stuff"): 924 f = open(os.path.join(logdir, fn), "w") 925 f.write(data) 926 f.close() 927 928 def test_list_incident_names(self): 929 basedir = "logging/IncidentPublisher/list_incident_names" 930 os.makedirs(basedir) 931 t = Tub() 932 t.setLocation("127.0.0.1:1234") 933 t.logger = self.logger = log.FoolscapLogger() 934 logdir = os.path.join(basedir, "logdir") 935 t.logger.setLogDir(logdir) 936 p = t.getLogPort() 937 938 # dump some other files in the incident directory 939 self._write_to(logdir, "distraction.bz2") 940 self._write_to(logdir, "noise") 941 # and a few real-looking incidents 942 I1 = "incident-2008-07-29-204211-aspkxoi" 943 I2 = "incident-2008-07-30-112233-wodaei" 944 I1_abs = os.path.abspath(os.path.join(logdir, I1 + ".flog")) 945 I2_abs = os.path.abspath(os.path.join(logdir, I2 + ".flog.bz2")) 946 self._write_to(logdir, I1 + ".flog") 947 self._write_to(logdir, I2 + ".flog.bz2") 948 949 all = list(p.list_incident_names()) 950 self.assertEqual(set([name for (name,fn) in all]), set([I1, I2])) 951 imap = dict(all) 952 self.assertEqual(imap[I1], I1_abs) 953 self.assertEqual(imap[I2], I2_abs) 954 955 new = list(p.list_incident_names(since=I1)) 956 self.assertEqual(set([name for (name,fn) in new]), set([I2])) 957 958 959 def test_get_incidents(self): 960 basedir = "logging/IncidentPublisher/get_incidents" 961 os.makedirs(basedir) 962 furlfile = os.path.join(basedir, "logport.furl") 963 t = Tub() 964 t.logger = self.logger = log.FoolscapLogger() 965 logdir = os.path.join(basedir, "logdir") 966 t.logger.setLogDir(logdir) 967 t.logger.setIncidentReporterFactory(incident.NonTrailingIncidentReporter) 968 # dump some other files in the incident directory 969 f = open(os.path.join(logdir, "distraction.bz2"), "w") 970 f.write("stuff") 971 f.close() 972 f = open(os.path.join(logdir, "noise"), "w") 973 f.write("stuff") 974 f.close() 975 976 # fill the buffers with some messages 977 t.logger.msg("one") 978 t.logger.msg("two") 979 # and trigger an incident 980 t.logger.msg("three", level=log.WEIRD) 981 # the NonTrailingIncidentReporter needs a turn before it will have 982 # finished recording the event: the getReference() call will suffice. 983 984 # now set up a Tub to connect to the logport 985 t.setServiceParent(self.parent) 986 portnum = allocate_tcp_port() 987 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 988 t.setLocation("127.0.0.1:%d" % portnum) 989 990 t.setOption("logport-furlfile", furlfile) 991 logport_furl = t.getLogPortFURL() 992 logport_furl2 = open(furlfile, "r").read().strip() 993 self.assertEqual(logport_furl, logport_furl2) 994 995 t2 = Tub() 996 t2.setServiceParent(self.parent) 997 998 d = t2.getReference(logport_furl) 999 def _got_logport(logport): 1000 d = logport.callRemote("list_incidents") 1001 d.addCallback(self._check_listed) 1002 d.addCallback(lambda res: 1003 logport.callRemote("get_incident", self.i_name)) 1004 d.addCallback(self._check_incident) 1005 def _decompress(res): 1006 # now we manually decompress the logfile for that incident, 1007 # to exercise the code that provides access to incidents that 1008 # did not finish their trailing-gather by the time the 1009 # application was shut down 1010 assert not self.i_name.endswith(".bz2") 1011 fn1 = os.path.join(logdir, self.i_name) + ".flog.bz2" 1012 fn2 = fn1[:-len(".bz2")] 1013 f1 = bz2.BZ2File(fn1, "r") 1014 f2 = open(fn2, "wb") 1015 f2.write(f1.read()) 1016 f2.close() 1017 f1.close() 1018 os.unlink(fn1) 1019 d.addCallback(_decompress) 1020 # and do it again 1021 d.addCallback(lambda res: logport.callRemote("list_incidents")) 1022 d.addCallback(self._check_listed) 1023 d.addCallback(lambda res: 1024 logport.callRemote("get_incident", self.i_name)) 1025 d.addCallback(self._check_incident) 1026 return d 1027 d.addCallback(_got_logport) 1028 return d 1029 1030 def _check_listed(self, incidents): 1031 self.assertTrue(isinstance(incidents, dict)) 1032 self.assertEqual(len(incidents), 1) 1033 self.i_name = i_name = list(incidents.keys())[0] 1034 self.assertTrue(i_name.startswith("incident")) 1035 self.assertFalse(i_name.endswith(".flog") or i_name.endswith(".bz2")) 1036 trigger = incidents[i_name] 1037 self.assertEqual(trigger["message"], "three") 1038 def _check_incident(self, xxx_todo_changeme2 ): 1039 (header, events) = xxx_todo_changeme2 1040 self.assertEqual(header["type"], "incident") 1041 self.assertEqual(header["trigger"]["message"], "three") 1042 self.assertEqual(len(events), 3) 1043 self.assertEqual(events[0]["message"], "one") 1044 1045 def test_subscribe(self): 1046 basedir = "logging/IncidentPublisher/subscribe" 1047 os.makedirs(basedir) 1048 t = Tub() 1049 t.logger = self.logger = log.FoolscapLogger() 1050 logdir = os.path.join(basedir, "logdir") 1051 t.logger.setLogDir(logdir) 1052 t.logger.setIncidentReporterFactory(incident.NonTrailingIncidentReporter) 1053 1054 # fill the buffers with some messages 1055 t.logger.msg("boring") 1056 t.logger.msg("blah") 1057 # and trigger the first incident 1058 t.logger.msg("one", level=log.WEIRD) 1059 # the NonTrailingIncidentReporter needs a turn before it will have 1060 # finished recording the event: the getReference() call will suffice. 1061 1062 # now set up a Tub to connect to the logport 1063 t.setServiceParent(self.parent) 1064 portnum = allocate_tcp_port() 1065 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1066 t.setLocation("127.0.0.1:%d" % portnum) 1067 logport_furl = t.getLogPortFURL() 1068 1069 ob = Observer() 1070 t2 = Tub() 1071 t2.setServiceParent(self.parent) 1072 1073 d = t2.getReference(logport_furl) 1074 def _got_logport(logport): 1075 self._logport = logport 1076 d2 = logport.callRemote("subscribe_to_incidents", ob) # no catchup 1077 return d2 1078 d.addCallback(_got_logport) 1079 def _subscribed(subscription): 1080 self._subscription = subscription 1081 d.addCallback(_subscribed) 1082 # pause long enough for the incident names to change 1083 d.addCallback(lambda res: time.sleep(2)) 1084 d.addCallback(lambda res: t.logger.msg("two", level=log.WEIRD)) 1085 d.addCallback(lambda res: 1086 self.poll(lambda: bool(ob.incidents), 0.1)) 1087 def _triggerof(incident): 1088 (name, trigger) = incident 1089 return trigger["message"] 1090 def _check_new(res): 1091 self.assertEqual(len(ob.incidents), 1) 1092 self.assertEqual(_triggerof(ob.incidents[0]), "two") 1093 d.addCallback(_check_new) 1094 d.addCallback(lambda res: self._subscription.callRemote("unsubscribe")) 1095 1096 # now subscribe and catch up on all incidents 1097 ob2 = Observer() 1098 d.addCallback(lambda res: 1099 self._logport.callRemote("subscribe_to_incidents", ob2, 1100 True, "")) 1101 d.addCallback(_subscribed) 1102 d.addCallback(lambda res: 1103 self.poll(lambda: ob2.done_with_incidents, 0.1)) 1104 def _check_all(res): 1105 self.assertEqual(len(ob2.incidents), 2) 1106 self.assertEqual(_triggerof(ob2.incidents[0]), "one") 1107 self.assertEqual(_triggerof(ob2.incidents[1]), "two") 1108 d.addCallback(_check_all) 1109 1110 d.addCallback(lambda res: time.sleep(2)) 1111 d.addCallback(lambda res: t.logger.msg("three", level=log.WEIRD)) 1112 d.addCallback(lambda res: 1113 self.poll(lambda: len(ob2.incidents) >= 3, 0.1)) 1114 def _check_all2(res): 1115 self.assertEqual(len(ob2.incidents), 3) 1116 self.assertEqual(_triggerof(ob2.incidents[0]), "one") 1117 self.assertEqual(_triggerof(ob2.incidents[1]), "two") 1118 self.assertEqual(_triggerof(ob2.incidents[2]), "three") 1119 d.addCallback(_check_all2) 1120 d.addCallback(lambda res: self._subscription.callRemote("unsubscribe")) 1121 1122 # test the since= argument, setting it equal to the name of the 1123 # second incident. This should give us the third incident. 1124 ob3 = Observer() 1125 d.addCallback(lambda res: 1126 self._logport.callRemote("subscribe_to_incidents", ob3, 1127 True, ob2.incidents[1][0])) 1128 d.addCallback(_subscribed) 1129 d.addCallback(lambda res: 1130 self.poll(lambda: ob3.done_with_incidents, 0.1)) 1131 def _check_since(res): 1132 self.assertEqual(len(ob3.incidents), 1) 1133 self.assertEqual(_triggerof(ob3.incidents[0]), "three") 1134 d.addCallback(_check_since) 1135 d.addCallback(lambda res: time.sleep(2)) 1136 d.addCallback(lambda res: t.logger.msg("four", level=log.WEIRD)) 1137 d.addCallback(lambda res: 1138 self.poll(lambda: len(ob3.incidents) >= 2, 0.1)) 1139 def _check_since2(res): 1140 self.assertEqual(len(ob3.incidents), 2) 1141 self.assertEqual(_triggerof(ob3.incidents[0]), "three") 1142 self.assertEqual(_triggerof(ob3.incidents[1]), "four") 1143 d.addCallback(_check_since2) 1144 d.addCallback(lambda res: self._subscription.callRemote("unsubscribe")) 1145 1146 return d 1147 test_subscribe.timeout = 20 1148 1149class MyIncidentGathererService(gatherer.IncidentGathererService): 1150 verbose = False 1151 cb_new_incident = None 1152 1153 def remote_logport(self, nodeid, publisher): 1154 d = gatherer.IncidentGathererService.remote_logport(self, 1155 nodeid, publisher) 1156 d.addCallback(lambda res: self.d.callback(publisher)) 1157 return d 1158 1159 def new_incident(self, abs_fn, rel_fn, nodeid_s, incident): 1160 gatherer.IncidentGathererService.new_incident(self, abs_fn, rel_fn, 1161 nodeid_s, incident) 1162 if self.cb_new_incident: 1163 self.cb_new_incident((abs_fn, rel_fn)) 1164 1165class IncidentGatherer(unittest.TestCase, 1166 PollMixin, StallMixin, LogfileReaderMixin): 1167 def setUp(self): 1168 self.parent = service.MultiService() 1169 self.parent.startService() 1170 self.logger = log.FoolscapLogger() 1171 self.logger.setIncidentReporterFactory(NoFollowUpReporter) 1172 1173 def tearDown(self): 1174 d = defer.succeed(None) 1175 d.addCallback(lambda res: self.parent.stopService()) 1176 d.addCallback(flushEventualQueue) 1177 return d 1178 1179 def create_incident_gatherer(self, basedir, classifiers=[]): 1180 # create an incident gatherer, which will make its own Tub 1181 ig_basedir = os.path.join(basedir, "ig") 1182 if not os.path.isdir(ig_basedir): 1183 os.mkdir(ig_basedir) 1184 portnum = allocate_tcp_port() 1185 with open(os.path.join(ig_basedir, "port"), "w") as f: 1186 f.write("tcp:%d\n" % portnum) 1187 with open(os.path.join(ig_basedir, "location"), "w") as f: 1188 f.write("tcp:127.0.0.1:%d\n" % portnum) 1189 null = StringIO() 1190 ig = MyIncidentGathererService(classifiers=classifiers, 1191 basedir=ig_basedir, stdout=null) 1192 ig.d = defer.Deferred() 1193 return ig 1194 1195 def create_connected_tub(self, ig): 1196 t = Tub() 1197 t.logger = self.logger 1198 t.setServiceParent(self.parent) 1199 portnum = allocate_tcp_port() 1200 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1201 t.setLocation("127.0.0.1:%d" % portnum) 1202 t.setOption("log-gatherer-furl", ig.my_furl) 1203 1204 def test_connect(self): 1205 basedir = "logging/IncidentGatherer/connect" 1206 os.makedirs(basedir) 1207 self.logger.setLogDir(basedir) 1208 1209 ig = self.create_incident_gatherer(basedir) 1210 ig.setServiceParent(self.parent) 1211 self.create_connected_tub(ig) 1212 1213 d = ig.d 1214 # give the call to remote_logport a chance to retire 1215 d.addCallback(self.stall, 0.5) 1216 return d 1217 1218 def test_emit(self): 1219 basedir = "logging/IncidentGatherer/emit" 1220 os.makedirs(basedir) 1221 self.logger.setLogDir(basedir) 1222 1223 ig = self.create_incident_gatherer(basedir) 1224 ig.setServiceParent(self.parent) 1225 incident_d = defer.Deferred() 1226 ig.cb_new_incident = incident_d.callback 1227 self.create_connected_tub(ig) 1228 1229 d = ig.d 1230 1231 d.addCallback(lambda res: self.logger.msg("boom", level=log.WEIRD)) 1232 d.addCallback(lambda res: incident_d) 1233 def _new_incident(xxx_todo_changeme): 1234 (abs_fn, rel_fn) = xxx_todo_changeme 1235 events = self._read_logfile(abs_fn) 1236 header = events[0]["header"] 1237 self.assertTrue("trigger" in header) 1238 self.assertEqual(header["trigger"]["message"], "boom") 1239 e = events[1]["d"] 1240 self.assertEqual(e["message"], "boom") 1241 1242 # it should have been classified as "unknown" 1243 unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") 1244 unknowns = [fn.strip() for fn in open(unknowns_fn,"r").readlines()] 1245 self.assertEqual(len(unknowns), 1) 1246 self.assertEqual(unknowns[0], rel_fn) 1247 d.addCallback(_new_incident) 1248 1249 # now shut down the gatherer, create a new one with the same basedir 1250 # (with some classifier functions), remove the existing 1251 # classifications, and start it up. It should reclassify everything 1252 # at startup. 1253 1254 d.addCallback(lambda res: ig.disownServiceParent()) 1255 1256 def classify_boom(trigger): 1257 if "boom" in trigger.get("message",""): 1258 return "boom" 1259 def classify_foom(trigger): 1260 if "foom" in trigger.get("message",""): 1261 return "foom" 1262 1263 incident_d2 = defer.Deferred() 1264 def _update_classifiers(res): 1265 self.remove_classified_incidents(ig) 1266 ig2 = self.create_incident_gatherer(basedir, [classify_boom]) 1267 ##ig2.add_classifier(classify_foom) 1268 # we add classify_foom by writing it into a file, to exercise the 1269 # look-for-classifier-files code 1270 foomfile = os.path.join(ig2.basedir, "classify_foom.py") 1271 f = open(foomfile, "w") 1272 f.write(''' 1273def classify_incident(trigger): 1274 if "foom" in trigger.get("message",""): 1275 return "foom" 1276''') 1277 f.close() 1278 ig2.setServiceParent(self.parent) 1279 # now that it's been read, delete it to avoid affecting later 1280 # runs 1281 os.unlink(foomfile) 1282 self.ig2 = ig2 1283 1284 # incidents should be classified in startService 1285 unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") 1286 self.assertFalse(os.path.exists(unknowns_fn)) 1287 booms_fn = os.path.join(ig.basedir, "classified", "boom") 1288 booms = [fn.strip() for fn in open(booms_fn,"r").readlines()] 1289 self.assertEqual(len(booms), 1) 1290 fooms_fn = os.path.join(ig.basedir, "classified", "foom") 1291 self.assertFalse(os.path.exists(fooms_fn)) 1292 1293 ig2.cb_new_incident = incident_d2.callback 1294 1295 return ig2.d 1296 d.addCallback(_update_classifiers) 1297 d.addCallback(lambda res: self.logger.msg("foom", level=log.WEIRD)) 1298 d.addCallback(lambda res: incident_d2) 1299 def _new_incident2(xxx_todo_changeme1): 1300 # this one should be classified as "foom" 1301 1302 # it should have been classified as "unknown" 1303 (abs_fn, rel_fn) = xxx_todo_changeme1 1304 fooms_fn = os.path.join(ig.basedir, "classified", "foom") 1305 fooms = [fn.strip() for fn in open(fooms_fn,"r").readlines()] 1306 self.assertEqual(len(fooms), 1) 1307 self.assertEqual(fooms[0], rel_fn) 1308 unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") 1309 self.assertFalse(os.path.exists(unknowns_fn)) 1310 d.addCallback(_new_incident2) 1311 d.addCallback(lambda res: self.ig2.disownServiceParent()) 1312 1313 # if we remove just classified/boom, then those incidents should be 1314 # reclassified 1315 1316 def _remove_boom_incidents(res): 1317 booms_fn = os.path.join(ig.basedir, "classified", "boom") 1318 os.remove(booms_fn) 1319 1320 ig2a = self.create_incident_gatherer(basedir, [classify_boom, 1321 classify_foom]) 1322 ig2a.setServiceParent(self.parent) 1323 self.ig2a = ig2a 1324 1325 # now classified/boom should be back, and the other files should 1326 # have been left untouched 1327 booms = [fn.strip() for fn in open(booms_fn,"r").readlines()] 1328 self.assertEqual(len(booms), 1) 1329 d.addCallback(_remove_boom_incidents) 1330 d.addCallback(lambda res: self.ig2a.disownServiceParent()) 1331 1332 # and if we remove the classification functions (but do *not* remove 1333 # the classified incidents), the new gatherer should not reclassify 1334 # anything 1335 1336 def _update_classifiers_again(res): 1337 ig3 = self.create_incident_gatherer(basedir) 1338 ig3.setServiceParent(self.parent) 1339 self.ig3 = ig3 1340 1341 unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") 1342 self.assertFalse(os.path.exists(unknowns_fn)) 1343 booms_fn = os.path.join(ig.basedir, "classified", "boom") 1344 booms = [fn.strip() for fn in open(booms_fn,"r").readlines()] 1345 self.assertEqual(len(booms), 1) 1346 fooms_fn = os.path.join(ig.basedir, "classified", "foom") 1347 fooms = [fn.strip() for fn in open(fooms_fn,"r").readlines()] 1348 self.assertEqual(len(fooms), 1) 1349 return ig3.d 1350 d.addCallback(_update_classifiers_again) 1351 1352 d.addCallback(lambda res: self.ig3.disownServiceParent()) 1353 1354 # and if we remove all the stored incidents (and the 'latest' 1355 # record), the gatherer will grab everything. This exercises the 1356 # only-grab-one-at-a-time code. I verified this manually, by adding a 1357 # print to the avoid-duplicate clause of 1358 # IncidentObserver.maybe_fetch_incident . 1359 1360 def _create_ig4(res): 1361 ig4 = self.create_incident_gatherer(basedir) 1362 for nodeid in os.listdir(os.path.join(ig4.basedir, "incidents")): 1363 nodedir = os.path.join(ig4.basedir, "incidents", nodeid) 1364 for fn in os.listdir(nodedir): 1365 os.unlink(os.path.join(nodedir, fn)) 1366 os.rmdir(nodedir) 1367 ig4.setServiceParent(self.parent) 1368 self.ig4 = ig4 1369 d.addCallback(_create_ig4) 1370 d.addCallback(lambda res: 1371 self.poll(lambda : self.ig4.incidents_received == 2)) 1372 1373 return d 1374 1375 def remove_classified_incidents(self, ig): 1376 classified = os.path.join(ig.basedir, "classified") 1377 for category in os.listdir(classified): 1378 os.remove(os.path.join(classified, category)) 1379 os.rmdir(classified) 1380 1381class Gatherer(unittest.TestCase, LogfileReaderMixin, StallMixin, PollMixin): 1382 def setUp(self): 1383 self.parent = service.MultiService() 1384 self.parent.startService() 1385 1386 def tearDown(self): 1387 d = defer.succeed(None) 1388 d.addCallback(lambda res: self.parent.stopService()) 1389 d.addCallback(flushEventualQueue) 1390 return d 1391 1392 1393 def _emit_messages_and_flush(self, res, t): 1394 log.msg("gathered message here") 1395 try: 1396 raise SampleError("whoops1") 1397 except: 1398 log.err() 1399 try: 1400 raise SampleError("whoops2") 1401 except SampleError: 1402 log.err(failure.Failure()) 1403 d = self.stall(None, 1.0) 1404 d.addCallback(lambda res: t.disownServiceParent()) 1405 # that will disconnect from the gatherer, which will flush the logfile 1406 d.addCallback(self.stall, 1.0) 1407 return d 1408 1409 def _check_gatherer(self, fn, starting_timestamp, expected_tubid): 1410 events = [] 1411 for e in self._read_logfile(fn): 1412 # discard internal foolscap events, like connection 1413 # negotiation 1414 if "d" in e and "foolscap" in e["d"].get("facility", ""): 1415 pass 1416 else: 1417 events.append(e) 1418 1419 if len(events) != 4: 1420 from pprint import pprint 1421 pprint(events) 1422 self.assertEqual(len(events), 4) 1423 1424 # header 1425 data = events.pop(0) 1426 self.assertTrue(isinstance(data, dict)) 1427 self.assertTrue("header" in data) 1428 self.assertEqual(data["header"]["type"], "gatherer") 1429 self.assertEqual(data["header"]["start"], starting_timestamp) 1430 1431 # grab the first event from the log 1432 data = events.pop(0) 1433 self.assertTrue(isinstance(data, dict)) 1434 self.assertEqual(data['from'], expected_tubid) 1435 self.assertEqual(data['d']['message'], "gathered message here") 1436 1437 # grab the second event from the log 1438 data = events.pop(0) 1439 self.assertTrue(isinstance(data, dict)) 1440 self.assertEqual(data['from'], expected_tubid) 1441 self.assertEqual(data['d']['message'], "") 1442 self.assertTrue(data['d']["isError"]) 1443 self.assertTrue("failure" in data['d']) 1444 self.failUnlessIn("SampleError", data['d']["failure"]["repr"]) 1445 self.failUnlessIn("whoops1", data['d']["failure"]["repr"]) 1446 1447 # grab the third event from the log 1448 data = events.pop(0) 1449 self.assertTrue(isinstance(data, dict)) 1450 self.assertEqual(data['from'], expected_tubid) 1451 self.assertEqual(data['d']['message'], "") 1452 self.assertTrue(data['d']["isError"]) 1453 self.assertTrue("failure" in data['d']) 1454 self.failUnlessIn("SampleError", data['d']["failure"]["repr"]) 1455 self.failUnlessIn("whoops2", data['d']["failure"]["repr"]) 1456 1457 def test_wrongdir(self): 1458 basedir = "logging/Gatherer/wrongdir" 1459 os.makedirs(basedir) 1460 1461 # create a LogGatherer with an unspecified basedir: it should look 1462 # for a .tac file in the current directory, not see it, and complain 1463 e = self.assertRaises(RuntimeError, 1464 gatherer.GathererService, None, True, None) 1465 self.assertTrue("running in the wrong directory" in str(e)) 1466 1467 def test_log_gatherer(self): 1468 # setLocation, then set log-gatherer-furl. Also, use bzip=True for 1469 # this one test. 1470 basedir = "logging/Gatherer/log_gatherer" 1471 os.makedirs(basedir) 1472 1473 # create a gatherer, which will create its own Tub 1474 gatherer = MyGatherer(None, True, basedir) 1475 gatherer.d = defer.Deferred() 1476 gatherer.setServiceParent(self.parent) 1477 # that will start the gatherer 1478 gatherer_furl = gatherer.my_furl 1479 starting_timestamp = gatherer._starting_timestamp 1480 1481 t = Tub() 1482 expected_tubid = t.tubID 1483 assert t.tubID is not None 1484 t.setServiceParent(self.parent) 1485 portnum = allocate_tcp_port() 1486 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1487 t.setLocation("127.0.0.1:%d" % portnum) 1488 t.setOption("log-gatherer-furl", gatherer_furl) 1489 1490 # about now, the node will be contacting the Gatherer and 1491 # offering its logport. 1492 1493 # gatherer.d will be fired when subscribe_to_all() has finished 1494 d = gatherer.d 1495 d.addCallback(self._emit_messages_and_flush, t) 1496 # We use do_rotate() to force logfile rotation before checking 1497 # contents of the file, so we know it's been written out to disk 1498 d.addCallback(lambda res: gatherer.do_rotate()) 1499 d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid) 1500 return d 1501 test_log_gatherer.timeout = 20 1502 1503 def test_log_gatherer_multiple(self): 1504 # setLocation, then set log-gatherer-furl. 1505 basedir = "logging/Gatherer/log_gatherer_multiple" 1506 os.makedirs(basedir) 1507 1508 # create a gatherer, which will create its own Tub 1509 gatherer1_basedir = os.path.join(basedir, "gatherer1") 1510 os.makedirs(gatherer1_basedir) 1511 gatherer1 = MyGatherer(None, False, gatherer1_basedir) 1512 gatherer1.d = defer.Deferred() 1513 gatherer1.setServiceParent(self.parent) 1514 # that will start the gatherer 1515 gatherer1_furl = gatherer1.my_furl 1516 starting_timestamp1 = gatherer1._starting_timestamp 1517 1518 # create a second one 1519 gatherer2_basedir = os.path.join(basedir, "gatherer2") 1520 os.makedirs(gatherer2_basedir) 1521 gatherer2 = MyGatherer(None, False, gatherer2_basedir) 1522 gatherer2.d = defer.Deferred() 1523 gatherer2.setServiceParent(self.parent) 1524 # that will start the gatherer 1525 gatherer2_furl = gatherer2.my_furl 1526 starting_timestamp2 = gatherer2._starting_timestamp 1527 1528 t = Tub() 1529 expected_tubid = t.tubID 1530 assert t.tubID is not None 1531 t.setServiceParent(self.parent) 1532 portnum = allocate_tcp_port() 1533 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1534 t.setLocation("127.0.0.1:%d" % portnum) 1535 t.setOption("log-gatherer-furl", (gatherer1_furl, gatherer2_furl)) 1536 1537 # about now, the node will be contacting the Gatherers and 1538 # offering its logport. 1539 1540 # gatherer.d and gatherer2.d will be fired when subscribe_to_all() 1541 # has finished 1542 dl = defer.DeferredList([gatherer1.d, gatherer2.d]) 1543 dl.addCallback(self._emit_messages_and_flush, t) 1544 dl.addCallback(lambda res: gatherer1.do_rotate()) 1545 dl.addCallback(self._check_gatherer, starting_timestamp1, expected_tubid) 1546 dl.addCallback(lambda res: gatherer2.do_rotate()) 1547 dl.addCallback(self._check_gatherer, starting_timestamp2, expected_tubid) 1548 return dl 1549 test_log_gatherer_multiple.timeout = 40 1550 1551 def test_log_gatherer2(self): 1552 # set log-gatherer-furl, then setLocation. Also, use a timed rotator. 1553 basedir = "logging/Gatherer/log_gatherer2" 1554 os.makedirs(basedir) 1555 1556 # create a gatherer, which will create its own Tub 1557 gatherer = MyGatherer(3600, False, basedir) 1558 gatherer.d = defer.Deferred() 1559 gatherer.setServiceParent(self.parent) 1560 # that will start the gatherer 1561 gatherer_furl = gatherer.my_furl 1562 starting_timestamp = gatherer._starting_timestamp 1563 1564 t = Tub() 1565 expected_tubid = t.tubID 1566 assert t.tubID is not None 1567 t.setServiceParent(self.parent) 1568 portnum = allocate_tcp_port() 1569 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1570 t.setOption("log-gatherer-furl", gatherer_furl) 1571 t.setLocation("127.0.0.1:%d" % portnum) 1572 1573 d = gatherer.d 1574 d.addCallback(self._emit_messages_and_flush, t) 1575 d.addCallback(lambda res: gatherer.do_rotate()) 1576 d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid) 1577 return d 1578 test_log_gatherer2.timeout = 20 1579 1580 def test_log_gatherer_furlfile(self): 1581 # setLocation, then set log-gatherer-furlfile 1582 basedir = "logging/Gatherer/log_gatherer_furlfile" 1583 os.makedirs(basedir) 1584 1585 # create a gatherer, which will create its own Tub 1586 gatherer = MyGatherer(None, False, basedir) 1587 gatherer.d = defer.Deferred() 1588 gatherer.setServiceParent(self.parent) 1589 # that will start the gatherer 1590 gatherer_furlfile = os.path.join(basedir, gatherer.furlFile) 1591 starting_timestamp = gatherer._starting_timestamp 1592 1593 t = Tub() 1594 expected_tubid = t.tubID 1595 assert t.tubID is not None 1596 t.setServiceParent(self.parent) 1597 portnum = allocate_tcp_port() 1598 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1599 t.setLocation("127.0.0.1:%d" % portnum) 1600 t.setOption("log-gatherer-furlfile", gatherer_furlfile) 1601 1602 d = gatherer.d 1603 d.addCallback(self._emit_messages_and_flush, t) 1604 d.addCallback(lambda res: gatherer.do_rotate()) 1605 d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid) 1606 return d 1607 test_log_gatherer_furlfile.timeout = 20 1608 1609 def test_log_gatherer_furlfile2(self): 1610 # set log-gatherer-furlfile, then setLocation 1611 basedir = "logging/Gatherer/log_gatherer_furlfile2" 1612 os.makedirs(basedir) 1613 1614 # create a gatherer, which will create its own Tub 1615 gatherer = MyGatherer(None, False, basedir) 1616 gatherer.d = defer.Deferred() 1617 gatherer.setServiceParent(self.parent) 1618 # that will start the gatherer 1619 gatherer_furlfile = os.path.join(basedir, gatherer.furlFile) 1620 starting_timestamp = gatherer._starting_timestamp 1621 1622 t = Tub() 1623 expected_tubid = t.tubID 1624 assert t.tubID is not None 1625 t.setServiceParent(self.parent) 1626 portnum = allocate_tcp_port() 1627 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1628 t.setOption("log-gatherer-furlfile", gatherer_furlfile) 1629 # one bug we had was that the log-gatherer was contacted before 1630 # setLocation had occurred, so exercise that case 1631 d = self.stall(None, 1.0) 1632 def _start(res): 1633 t.setLocation("127.0.0.1:%d" % portnum) 1634 return gatherer.d 1635 d.addCallback(_start) 1636 d.addCallback(self._emit_messages_and_flush, t) 1637 d.addCallback(lambda res: gatherer.do_rotate()) 1638 d.addCallback(self._check_gatherer, starting_timestamp, expected_tubid) 1639 return d 1640 test_log_gatherer_furlfile2.timeout = 20 1641 1642 def test_log_gatherer_furlfile_multiple(self): 1643 basedir = "logging/Gatherer/log_gatherer_furlfile_multiple" 1644 os.makedirs(basedir) 1645 1646 gatherer1_basedir = os.path.join(basedir, "gatherer1") 1647 os.makedirs(gatherer1_basedir) 1648 gatherer1 = MyGatherer(None, False, gatherer1_basedir) 1649 gatherer1.d = defer.Deferred() 1650 gatherer1.setServiceParent(self.parent) 1651 # that will start the gatherer 1652 gatherer1_furl = gatherer1.my_furl 1653 starting_timestamp1 = gatherer1._starting_timestamp 1654 1655 gatherer2_basedir = os.path.join(basedir, "gatherer2") 1656 os.makedirs(gatherer2_basedir) 1657 gatherer2 = MyGatherer(None, False, gatherer2_basedir) 1658 gatherer2.d = defer.Deferred() 1659 gatherer2.setServiceParent(self.parent) 1660 # that will start the gatherer 1661 gatherer2_furl = gatherer2.my_furl 1662 starting_timestamp2 = gatherer2._starting_timestamp 1663 1664 gatherer3_basedir = os.path.join(basedir, "gatherer3") 1665 os.makedirs(gatherer3_basedir) 1666 gatherer3 = MyGatherer(None, False, gatherer3_basedir) 1667 gatherer3.d = defer.Deferred() 1668 gatherer3.setServiceParent(self.parent) 1669 # that will start the gatherer 1670 gatherer3_furl = gatherer3.my_furl 1671 starting_timestamp3 = gatherer3._starting_timestamp 1672 1673 gatherer_furlfile = os.path.join(basedir, "log_gatherer.furl") 1674 f = open(gatherer_furlfile, "w") 1675 f.write(gatherer1_furl + "\n") 1676 f.write(gatherer2_furl + "\n") 1677 f.close() 1678 1679 t = Tub() 1680 expected_tubid = t.tubID 1681 assert t.tubID is not None 1682 t.setOption("log-gatherer-furl", gatherer3_furl) 1683 t.setServiceParent(self.parent) 1684 portnum = allocate_tcp_port() 1685 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1686 t.setLocation("127.0.0.1:%d" % portnum) 1687 t.setOption("log-gatherer-furlfile", gatherer_furlfile) 1688 # now both log gatherer connections will be being established 1689 1690 d = defer.DeferredList([gatherer1.d, gatherer2.d, gatherer3.d], 1691 fireOnOneErrback=True) 1692 d.addCallback(self._emit_messages_and_flush, t) 1693 d.addCallback(lambda res: gatherer1.do_rotate()) 1694 d.addCallback(self._check_gatherer, starting_timestamp1, expected_tubid) 1695 d.addCallback(lambda res: gatherer2.do_rotate()) 1696 d.addCallback(self._check_gatherer, starting_timestamp2, expected_tubid) 1697 d.addCallback(lambda res: gatherer3.do_rotate()) 1698 d.addCallback(self._check_gatherer, starting_timestamp3, expected_tubid) 1699 return d 1700 test_log_gatherer_furlfile_multiple.timeout = 20 1701 1702 def test_log_gatherer_empty_furlfile(self): 1703 basedir = "logging/Gatherer/log_gatherer_empty_furlfile" 1704 os.makedirs(basedir) 1705 1706 gatherer_fn = os.path.join(basedir, "lg.furl") 1707 open(gatherer_fn, "w").close() 1708 # leave the furlfile empty: use no gatherer 1709 1710 t = Tub() 1711 t.setServiceParent(self.parent) 1712 portnum = allocate_tcp_port() 1713 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1714 t.setLocation("127.0.0.1:%d" % portnum) 1715 t.setOption("log-gatherer-furlfile", gatherer_fn) 1716 1717 lp_furl = t.getLogPortFURL() 1718 del lp_furl 1719 t.log("this message shouldn't make anything explode") 1720 test_log_gatherer_empty_furlfile.timeout = 20 1721 1722 def test_log_gatherer_missing_furlfile(self): 1723 basedir = "logging/Gatherer/log_gatherer_missing_furlfile" 1724 os.makedirs(basedir) 1725 1726 gatherer_fn = os.path.join(basedir, "missing_lg.furl") 1727 open(gatherer_fn, "w").close() 1728 # leave the furlfile missing: use no gatherer 1729 1730 t = Tub() 1731 t.setServiceParent(self.parent) 1732 portnum = allocate_tcp_port() 1733 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 1734 t.setLocation("127.0.0.1:%d" % portnum) 1735 t.setOption("log-gatherer-furlfile", gatherer_fn) 1736 1737 lp_furl = t.getLogPortFURL() 1738 del lp_furl 1739 t.log("this message shouldn't make anything explode") 1740 test_log_gatherer_missing_furlfile.timeout = 20 1741 1742 1743class Tail(unittest.TestCase): 1744 def test_logprinter(self): 1745 target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v" 1746 options1 = {"save-to": None, 1747 "verbose": None, 1748 "timestamps": "short-local"} 1749 out = StringIO() 1750 lp = tail.LogPrinter(options1, target_tubid_s[:8], out) 1751 lp.got_versions({}) 1752 lp.remote_msg({"time": 1207005906.527782, 1753 "level": 25, 1754 "num": 123, 1755 "message": "howdy", 1756 }) 1757 outmsg = out.getvalue() 1758 # this contains a localtime string, so don't check the hour 1759 self.assertTrue(":06.527 L25 []#123 howdy" in outmsg) 1760 1761 lp.remote_msg({"time": 1207005907.527782, 1762 "level": 25, 1763 "num": 124, 1764 "format": "howdy %(there)s", 1765 "there": "pardner", 1766 }) 1767 outmsg = out.getvalue() 1768 # this contains a localtime string, so don't check the hour 1769 self.assertTrue(":07.527 L25 []#124 howdy pardner" in outmsg) 1770 1771 try: 1772 raise RuntimeError("fake error") 1773 except RuntimeError: 1774 f = failure.Failure() 1775 1776 lp.remote_msg({"time": 1207005950.002, 1777 "level": 30, 1778 "num": 125, 1779 "message": "oops", 1780 "failure": f, 1781 }) 1782 outmsg = out.getvalue() 1783 1784 self.assertTrue(":50.002 L30 []#125 oops\n FAILURE:\n" in outmsg, 1785 outmsg) 1786 self.assertTrue("RuntimeError" in outmsg, outmsg) 1787 self.assertTrue(": fake error" in outmsg, outmsg) 1788 self.assertTrue("--- <exception caught here> ---\n" in outmsg, outmsg) 1789 1790 def test_logprinter_verbose(self): 1791 target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v" 1792 options1 = {"save-to": None, 1793 "verbose": True, 1794 "timestamps": "short-local"} 1795 out = StringIO() 1796 lp = tail.LogPrinter(options1, target_tubid_s[:8], out) 1797 lp.got_versions({}) 1798 lp.remote_msg({"time": 1207005906.527782, 1799 "level": 25, 1800 "num": 123, 1801 "message": "howdy", 1802 }) 1803 outmsg = out.getvalue() 1804 self.assertTrue("'message': 'howdy'" in outmsg, outmsg) 1805 self.assertTrue("'time': 1207005906.527782" in outmsg, outmsg) 1806 self.assertTrue("'level': 25" in outmsg, outmsg) 1807 self.assertTrue("{" in outmsg, outmsg) 1808 1809 def test_logprinter_saveto(self): 1810 target_tubid_s = "jiijpvbge2e3c3botuzzz7la3utpl67v" 1811 saveto_filename = "test_logprinter_saveto.flog" 1812 options = {"save-to": saveto_filename, 1813 "verbose": False, 1814 "timestamps": "short-local"} 1815 out = StringIO() 1816 lp = tail.LogPrinter(options, target_tubid_s[:8], out) 1817 lp.got_versions({}) 1818 lp.remote_msg({"time": 1207005906.527782, 1819 "level": 25, 1820 "num": 123, 1821 "message": "howdy", 1822 }) 1823 outmsg = out.getvalue() 1824 del outmsg 1825 lp.saver.disconnected() # cause the file to be closed 1826 f = open(saveto_filename, "rb") 1827 expected_magic = f.read(len(flogfile.MAGIC)) 1828 self.assertEqual(expected_magic, flogfile.MAGIC) 1829 data = json.loads(f.readline().decode("utf-8")) # header 1830 self.assertEqual(data["header"]["type"], "tail") 1831 data = json.loads(f.readline().decode("utf-8")) # event 1832 self.assertEqual(data["from"], "jiijpvbg") 1833 self.assertEqual(data["d"]["message"], "howdy") 1834 self.assertEqual(data["d"]["num"], 123) 1835 1836 def test_options(self): 1837 basedir = "logging/Tail/options" 1838 os.makedirs(basedir) 1839 fn = os.path.join(basedir, "foo") 1840 f = open(fn, "w") 1841 f.write("pretend this is a furl") 1842 f.close() 1843 f = open(os.path.join(basedir, "logport.furl"), "w") 1844 f.write("this too") 1845 f.close() 1846 1847 to = tail.TailOptions() 1848 to.parseOptions(["pb:pretend-furl"]) 1849 self.assertFalse(to["verbose"]) 1850 self.assertFalse(to["catch-up"]) 1851 self.assertEqual(to.target_furl, "pb:pretend-furl") 1852 1853 to = tail.TailOptions() 1854 to.parseOptions(["--verbose", "--catch-up", basedir]) 1855 self.assertTrue(to["verbose"]) 1856 self.assertTrue(to["catch-up"]) 1857 self.assertEqual(to.target_furl, "this too") 1858 1859 to = tail.TailOptions() 1860 to.parseOptions(["--save-to", "save.flog", fn]) 1861 self.assertFalse(to["verbose"]) 1862 self.assertFalse(to["catch-up"]) 1863 self.assertEqual(to["save-to"], "save.flog") 1864 self.assertEqual(to.target_furl, "pretend this is a furl") 1865 1866 to = tail.TailOptions() 1867 self.assertRaises(RuntimeError, to.parseOptions, ["bogus.txt"]) 1868 1869# applications that provide a command-line tool may find it useful to include 1870# a "flogtool" subcommand, using something like this: 1871class WrapperOptions(usage.Options): 1872 synopsis = "Usage: wrapper flogtool <command>" 1873 subCommands = [("flogtool", None, cli.Options, "foolscap log tool")] 1874 1875def run_wrapper(argv): 1876 config = WrapperOptions() 1877 config.parseOptions(argv) 1878 command = config.subCommand 1879 if command == "flogtool": 1880 return cli.run_flogtool(argv[1:], run_by_human=False) 1881 1882class CLI(unittest.TestCase): 1883 def test_create_gatherer(self): 1884 basedir = "logging/CLI/create_gatherer" 1885 argv = ["flogtool", "create-gatherer", 1886 "--port", "tcp:3117", "--location", "tcp:localhost:3117", 1887 "--quiet", basedir] 1888 cli.run_flogtool(argv[1:], run_by_human=False) 1889 self.assertTrue(os.path.exists(basedir)) 1890 1891 basedir = "logging/CLI/create_gatherer2" 1892 argv = ["flogtool", "create-gatherer", "--rotate", "3600", 1893 "--port", "tcp:3117", "--location", "tcp:localhost:3117", 1894 "--quiet", basedir] 1895 cli.run_flogtool(argv[1:], run_by_human=False) 1896 self.assertTrue(os.path.exists(basedir)) 1897 1898 basedir = "logging/CLI/create_gatherer3" 1899 argv = ["flogtool", "create-gatherer", 1900 "--port", "tcp:3117", "--location", "tcp:localhost:3117", 1901 basedir] 1902 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 1903 self.assertTrue(os.path.exists(basedir)) 1904 self.assertTrue(("Gatherer created in directory %s" % basedir) 1905 in out, out) 1906 self.assertTrue("Now run" in out, out) 1907 self.assertTrue("to launch the daemon" in out, out) 1908 1909 def test_create_gatherer_badly(self): 1910 #basedir = "logging/CLI/create_gatherer" 1911 argv = ["flogtool", "create-gatherer", "--bogus-arg"] 1912 self.assertRaises(usage.UsageError, 1913 cli.run_flogtool, argv[1:], run_by_human=False) 1914 1915 def test_create_gatherer_no_location(self): 1916 basedir = "logging/CLI/create_gatherer_no_location" 1917 argv = ["flogtool", "create-gatherer", basedir] 1918 e = self.assertRaises(usage.UsageError, 1919 cli.run_flogtool, argv[1:], 1920 run_by_human=False) 1921 self.failUnlessIn("--location= is mandatory", str(e)) 1922 1923 def test_wrapper(self): 1924 basedir = "logging/CLI/wrapper" 1925 argv = ["wrapper", "flogtool", "create-gatherer", 1926 "--port", "tcp:3117", "--location", "tcp:localhost:3117", 1927 "--quiet", basedir] 1928 run_wrapper(argv[1:]) 1929 self.assertTrue(os.path.exists(basedir)) 1930 1931 def test_create_incident_gatherer(self): 1932 basedir = "logging/CLI/create_incident_gatherer" 1933 argv = ["flogtool", "create-incident-gatherer", 1934 "--port", "tcp:3118", "--location", "tcp:localhost:3118", 1935 "--quiet", basedir] 1936 cli.run_flogtool(argv[1:], run_by_human=False) 1937 self.assertTrue(os.path.exists(basedir)) 1938 1939 basedir = "logging/CLI/create_incident_gatherer2" 1940 argv = ["flogtool", "create-incident-gatherer", 1941 "--port", "tcp:3118", "--location", "tcp:localhost:3118", 1942 basedir] 1943 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 1944 self.assertTrue(os.path.exists(basedir)) 1945 self.assertTrue(("Incident Gatherer created in directory %s" % basedir) 1946 in out, out) 1947 self.assertTrue("Now run" in out, out) 1948 self.assertTrue("to launch the daemon" in out, out) 1949 1950class LogfileWriterMixin: 1951 1952 def create_logfile(self): 1953 if not os.path.exists(self.basedir): 1954 os.makedirs(self.basedir) 1955 fn = os.path.join(self.basedir, "dump.flog") 1956 l = log.FoolscapLogger() 1957 lfo = log.LogFileObserver(fn, level=0) 1958 l.addObserver(lfo.msg) 1959 l.msg("one", facility="big.facility") 1960 time.sleep(0.2) # give filter --after something to work with 1961 l.msg("two", level=log.OPERATIONAL-1) 1962 try: 1963 raise SampleError("whoops1") 1964 except: 1965 l.err(message="three") 1966 l.msg("four") 1967 d = fireEventually() 1968 def _done(res): 1969 lfo._stop() 1970 #events = self._read_logfile(fn) 1971 #self.failUnlessEqual(len(events), 1+3) 1972 return fn 1973 d.addCallback(_done) 1974 return d 1975 1976 def create_incident(self): 1977 if not os.path.exists(self.basedir): 1978 os.makedirs(self.basedir) 1979 l = log.FoolscapLogger() 1980 l.setLogDir(self.basedir) 1981 l.setIncidentReporterFactory(NoFollowUpReporter) 1982 1983 d = defer.Deferred() 1984 def _done(name, trigger): 1985 d.callback( (name,trigger) ) 1986 l.addImmediateIncidentObserver(_done) 1987 1988 l.msg("one") 1989 l.msg("two") 1990 l.msg("boom", level=log.WEIRD) 1991 l.msg("four") 1992 1993 d.addCallback(lambda name_trigger: 1994 os.path.join(self.basedir, name_trigger[0]+".flog.bz2")) 1995 1996 return d 1997 1998class Dumper(unittest.TestCase, LogfileWriterMixin, LogfileReaderMixin): 1999 # create a logfile, then dump it, and examine the output to make sure it 2000 # worked right. 2001 2002 def test_dump(self): 2003 self.basedir = "logging/Dumper/dump" 2004 d = self.create_logfile() 2005 def _check(fn): 2006 events = self._read_logfile(fn) 2007 2008 d = dumper.LogDumper() 2009 # initialize the LogDumper() timestamp mode 2010 d.options = dumper.DumpOptions() 2011 d.options.parseOptions([fn]) 2012 tmode = d.options["timestamps"] 2013 2014 argv = ["flogtool", "dump", fn] 2015 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2016 self.assertEqual(err, "") 2017 lines = list(StringIO(out).readlines()) 2018 self.assertTrue(lines[0].strip().startswith("Application versions"), 2019 lines[0]) 2020 mypid = os.getpid() 2021 self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3]) 2022 lines = lines[5:] 2023 line0 = "local#%d %s: one" % (events[1]["d"]["num"], 2024 format_time(events[1]["d"]["time"], 2025 tmode)) 2026 self.assertEqual(lines[0].strip(), line0) 2027 self.assertTrue("FAILURE:" in lines[3]) 2028 self.failUnlessIn("test_logging.SampleError", lines[4]) 2029 self.failUnlessIn(": whoops1", lines[4]) 2030 self.assertTrue(lines[-1].startswith("local#3 ")) 2031 2032 argv = ["flogtool", "dump", "--just-numbers", fn] 2033 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2034 self.assertEqual(err, "") 2035 lines = list(StringIO(out).readlines()) 2036 line0 = "%s %d" % (format_time(events[1]["d"]["time"], tmode), 2037 events[1]["d"]["num"]) 2038 self.assertEqual(lines[0].strip(), line0) 2039 self.assertTrue(lines[1].strip().endswith(" 1")) 2040 self.assertTrue(lines[-1].strip().endswith(" 3")) 2041 # failures are not dumped in --just-numbers 2042 self.assertEqual(len(lines), 1+3) 2043 2044 argv = ["flogtool", "dump", "--rx-time", fn] 2045 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2046 self.assertEqual(err, "") 2047 lines = list(StringIO(out).readlines()) 2048 self.assertTrue(lines[0].strip().startswith("Application versions"), 2049 lines[0]) 2050 mypid = os.getpid() 2051 self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3]) 2052 lines = lines[5:] 2053 line0 = "local#%d rx(%s) emit(%s): one" % \ 2054 (events[1]["d"]["num"], 2055 format_time(events[1]["rx_time"], tmode), 2056 format_time(events[1]["d"]["time"], tmode)) 2057 self.assertEqual(lines[0].strip(), line0) 2058 self.assertTrue(lines[-1].strip().endswith(" four")) 2059 2060 argv = ["flogtool", "dump", "--verbose", fn] 2061 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2062 self.assertEqual(err, "") 2063 lines = list(StringIO(out).readlines()) 2064 self.assertTrue("header" in lines[0]) 2065 self.assertTrue(re.search(r"u?'message': u?'one'", lines[1]), lines[1]) 2066 self.assertTrue("'level': 20" in lines[1]) 2067 self.assertTrue(": four: {" in lines[-1]) 2068 2069 d.addCallback(_check) 2070 return d 2071 2072 def test_incident(self): 2073 self.basedir = "logging/Dumper/incident" 2074 d = self.create_incident() 2075 def _check(fn): 2076 events = self._read_logfile(fn) 2077 # for sanity, make sure we created the incident correctly 2078 assert events[0]["header"]["type"] == "incident" 2079 assert events[0]["header"]["trigger"]["num"] == 2 2080 2081 argv = ["flogtool", "dump", fn] 2082 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2083 self.assertEqual(err, "") 2084 lines = list(StringIO(out).readlines()) 2085 self.assertEqual(len(lines), 8) 2086 self.assertEqual(lines[0].strip(), 2087 "Application versions (embedded in logfile):") 2088 self.assertTrue(lines[1].strip().startswith("foolscap:"), lines[1]) 2089 self.assertTrue(lines[2].strip().startswith("twisted:"), lines[2]) 2090 mypid = os.getpid() 2091 self.assertEqual(lines[3].strip(), "PID: %s" % mypid, lines[3]) 2092 self.assertEqual(lines[4].strip(), "") 2093 self.assertFalse("[INCIDENT-TRIGGER]" in lines[5]) 2094 self.assertFalse("[INCIDENT-TRIGGER]" in lines[6]) 2095 self.assertTrue(lines[7].strip().endswith(": boom [INCIDENT-TRIGGER]")) 2096 d.addCallback(_check) 2097 return d 2098 2099 def test_oops_furl(self): 2100 self.basedir = os.path.join("logging", "Dumper", "oops_furl") 2101 if not os.path.exists(self.basedir): 2102 os.makedirs(self.basedir) 2103 fn = os.path.join(self.basedir, "logport.furl") 2104 f = open(fn, "w") 2105 f.write("pb://TUBID@HINTS/SWISSNUM\n") 2106 f.close() 2107 2108 d = dumper.LogDumper() 2109 # initialize the LogDumper() timestamp mode 2110 d.options = dumper.DumpOptions() 2111 d.options.parseOptions([fn]) 2112 argv = ["flogtool", "dump", fn] 2113 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2114 self.assertEqual(err, "Error: %s appears to be a FURL file.\nPerhaps you meant to run 'flogtool tail' instead of 'flogtool dump'?\n" % fn) 2115 2116PICKLE_DUMPFILE_B64 = """ 2117KGRwMApTJ2hlYWRlcicKcDEKKGRwMgpTJ3RocmVzaG9sZCcKcDMKSTAKc1MncGlkJwpwNA 2118pJMTg3MjgKc1MndHlwZScKcDUKUydsb2ctZmlsZS1vYnNlcnZlcicKcDYKc1MndmVyc2lv 2119bnMnCnA3CihkcDgKUydmb29sc2NhcCcKcDkKUycwLjkuMSsyMi5nNzhlNWEzZC5kaXJ0eS 2120cKcDEwCnNTJ3R3aXN0ZWQnCnAxMQpTJzE1LjUuMCcKcDEyCnNzcy6AAn1xAChVBGZyb21x 2121AVUFbG9jYWxxAlUHcnhfdGltZXEDR0HVmqGrUXpjVQFkcQR9cQUoVQVsZXZlbHEGSxRVC2 2122luY2FybmF0aW9ucQdVCMZQLsaodzvDcQhOhnEJVQhmYWNpbGl0eXEKVQxiaWcuZmFjaWxp 2123dHlxC1UDbnVtcQxLAFUEdGltZXENR0HVmqGrRFtbVQdtZXNzYWdlcQ5VA29uZXEPdXUugA 2124J9cQAoVQRmcm9tcQFVBWxvY2FscQJVB3J4X3RpbWVxA0dB1Zqhq1F+s1UBZHEEfXEFKFUH 2125bWVzc2FnZXEGVQN0d29xB1UDbnVtcQhLAVUEdGltZXEJR0HVmqGrUU6cVQtpbmNhcm5hdG 2126lvbnEKVQjGUC7GqHc7w3ELToZxDFUFbGV2ZWxxDUsTdXUugAJ9cQAoVQRmcm9tcQFVBWxv 2127Y2FscQJVB3J4X3RpbWVxA0dB1Zqhq1GAiFUBZHEEfXEFKFUFbGV2ZWxxBksUVQtpbmNhcm 21285hdGlvbnEHVQjGUC7GqHc7w3EIToZxCVUDd2h5cQpOVQdmYWlsdXJlcQsoY2Zvb2xzY2Fw 2129LmNhbGwKQ29waWVkRmFpbHVyZQpxDG9xDX1xDyhVAnRicRBOVQl0cmFjZWJhY2txEVSBAw 2130AAVHJhY2ViYWNrIChtb3N0IHJlY2VudCBjYWxsIGxhc3QpOgogIEZpbGUgIi9Vc2Vycy93 2131YXJuZXIvc3R1ZmYvcHl0aG9uL2Zvb2xzY2FwL3ZlL2xpYi9weXRob24yLjcvc2l0ZS1wYW 2132NrYWdlcy90d2lzdGVkL3RyaWFsL19hc3luY3Rlc3QucHkiLCBsaW5lIDExMiwgaW4gX3J1 2133bgogICAgdXRpbHMucnVuV2l0aFdhcm5pbmdzU3VwcHJlc3NlZCwgc2VsZi5fZ2V0U3VwcH 2134Jlc3MoKSwgbWV0aG9kKQogIEZpbGUgIi9Vc2Vycy93YXJuZXIvc3R1ZmYvcHl0aG9uL2Zv 2135b2xzY2FwL3ZlL2xpYi9weXRob24yLjcvc2l0ZS1wYWNrYWdlcy90d2lzdGVkL2ludGVybm 2136V0L2RlZmVyLnB5IiwgbGluZSAxNTAsIGluIG1heWJlRGVmZXJyZWQKICAgIHJlc3VsdCA9 2137IGYoKmFyZ3MsICoqa3cpCiAgRmlsZSAiL1VzZXJzL3dhcm5lci9zdHVmZi9weXRob24vZm 21389vbHNjYXAvdmUvbGliL3B5dGhvbjIuNy9zaXRlLXBhY2thZ2VzL3R3aXN0ZWQvaW50ZXJu 2139ZXQvdXRpbHMucHkiLCBsaW5lIDE5NywgaW4gcnVuV2l0aFdhcm5pbmdzU3VwcHJlc3NlZA 2140ogICAgcmVzdWx0ID0gZigqYSwgKiprdykKICBGaWxlICIvVXNlcnMvd2FybmVyL3N0dWZm 2141L3B5dGhvbi9mb29sc2NhcC9mb29sc2NhcC90ZXN0L3Rlc3RfbG9nZ2luZy5weSIsIGxpbm 2142UgMTg4MywgaW4gdGVzdF9kdW1wCiAgICBkID0gc2VsZi5jcmVhdGVfbG9nZmlsZSgpCi0t 2143LSA8ZXhjZXB0aW9uIGNhdWdodCBoZXJlPiAtLS0KICBGaWxlICIvVXNlcnMvd2FybmVyL3 2144N0dWZmL3B5dGhvbi9mb29sc2NhcC9mb29sc2NhcC90ZXN0L3Rlc3RfbG9nZ2luZy5weSIs 2145IGxpbmUgMTg0MiwgaW4gY3JlYXRlX2xvZ2ZpbGUKICAgIHJhaXNlIFNhbXBsZUVycm9yKC 2146J3aG9vcHMxIikKZm9vbHNjYXAudGVzdC50ZXN0X2xvZ2dpbmcuU2FtcGxlRXJyb3I6IHdo 2147b29wczEKcRJVBXZhbHVlcRNVB3dob29wczFxFFUHcGFyZW50c3EVXXEWKFUmZm9vbHNjYX 2148AudGVzdC50ZXN0X2xvZ2dpbmcuU2FtcGxlRXJyb3JxF1UUZXhjZXB0aW9ucy5FeGNlcHRp 2149b25xGFUYZXhjZXB0aW9ucy5CYXNlRXhjZXB0aW9ucRlVEl9fYnVpbHRpbl9fLm9iamVjdH 2150EaZVUGZnJhbWVzcRtdcRxVBHR5cGVxHVUmZm9vbHNjYXAudGVzdC50ZXN0X2xvZ2dpbmcu 2151U2FtcGxlRXJyb3JxHlUFc3RhY2txH11xIHViVQNudW1xIUsCVQR0aW1lcSJHQdWaoatRVt 2152JVB21lc3NhZ2VxI1UFdGhyZWVxJFUHaXNFcnJvcnElSwF1dS6AAn1xAChVBGZyb21xAVUF 2153bG9jYWxxAlUHcnhfdGltZXEDR0HVmqGrUYXkVQFkcQR9cQUoVQdtZXNzYWdlcQZVBGZvdX 2154JxB1UDbnVtcQhLA1UEdGltZXEJR0HVmqGrUXU2VQtpbmNhcm5hdGlvbnEKVQjGUC7GqHc7 2155w3ELToZxDFUFbGV2ZWxxDUsUdXUu 2156""" 2157 2158PICKLE_INCIDENT_B64 = """ 2159QlpoOTFBWSZTWUOW3hEAAHjfgAAQAcl/4QkhCAS/59/iQAGdWS2BJRTNNQ2oB6gGgPU9T1 2160BoEkintKGIABiAAaAwANGhowjJoNGmgMCpJDSaNGqbSMnqGhoaAP1S5rw5GxrNlUoxLXu2 2161sZ5TYy2rVCVNHMKgeDE97TBiw1hXtCfdSCISDpSlL61KFiacqWj9apY80J2PIpO7mde+vd 2162Jz18Myu4+djYU10JPMGU5vFAcUmmyk0kmcGUSMIDUJcKkog4W2EyyQStwwSYUEohGpr6Wm 2163F4KU7qccsjPJf8dTIv3ydZM5hpkW41JjJ8j0PETxlRRVFSeZYsqFU+hufU3n5O3hmYASDC 2164DhWMHFPJE7nXCYRsz5BGjktwUQCu6d4cixrgmGYLYA7JVCM7UqkMDVD9EMaclrFuayYGBR 2165xMIwXxM9pjeUuZVv2ceR5E6FSWpVRKKD98ObK5wmGmU9vqNBKqjp0wwqZlZ3x3nA4n+LTS 2166rmhbVjNyWeh/xdyRThQkEOW3hE 2167""" 2168 2169class OldPickleDumper(unittest.TestCase): 2170 def test_dump(self): 2171 self.basedir = "logging/OldPickleDumper/dump" 2172 if not os.path.exists(self.basedir): 2173 os.makedirs(self.basedir) 2174 fn = os.path.join(self.basedir, "dump.flog") 2175 with open(fn, "wb") as f: 2176 f.write(base64.b64decode(PICKLE_DUMPFILE_B64)) 2177 2178 argv = ["flogtool", "dump", fn] 2179 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2180 self.assertEqual(out, "") 2181 self.failUnlessIn("which cannot be loaded safely", err) 2182 2183 def test_incident(self): 2184 self.basedir = "logging/OldPickleDumper/incident" 2185 if not os.path.exists(self.basedir): 2186 os.makedirs(self.basedir) 2187 fn = os.path.join(self.basedir, 2188 "incident-2015-12-11--08-18-28Z-uqyuiea.flog.bz2") 2189 with open(fn, "wb") as f: 2190 f.write(base64.b64decode(PICKLE_INCIDENT_B64)) 2191 2192 argv = ["flogtool", "dump", fn] 2193 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2194 self.assertEqual(out, "") 2195 self.failUnlessIn("which cannot be loaded safely", err) 2196 2197class Filter(unittest.TestCase, LogfileWriterMixin, LogfileReaderMixin): 2198 2199 def compare_events(self, a, b): 2200 ## # cmp(a,b) won't quite work, because two instances of CopiedFailure 2201 ## # loaded from the same pickle don't compare as equal 2202 2203 # in fact we no longer create CopiedFailure instances in logs, so a 2204 # simple failUnlessEqual will now suffice 2205 self.assertEqual(a, b) 2206 2207 2208 def test_basic(self): 2209 self.basedir = "logging/Filter/basic" 2210 d = self.create_logfile() 2211 def _check(fn): 2212 events = self._read_logfile(fn) 2213 count = len(events) 2214 assert count == 5 2215 2216 dirname,filename = os.path.split(fn) 2217 fn2 = os.path.join(dirname, "filtered-" + filename) 2218 2219 # pass-through 2220 argv = ["flogtool", "filter", fn, fn2] 2221 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2222 self.assertTrue("copied 5 of 5 events into new file" in out, out) 2223 self.compare_events(events, self._read_logfile(fn2)) 2224 2225 # convert to .bz2 while we're at it 2226 fn2bz2 = fn2 + ".bz2" 2227 argv = ["flogtool", "filter", fn, fn2bz2] 2228 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2229 self.assertTrue("copied 5 of 5 events into new file" in out, out) 2230 self.compare_events(events, self._read_logfile(fn2bz2)) 2231 2232 # modify the file in place 2233 argv = ["flogtool", "filter", "--above", "20", fn2] 2234 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2235 self.assertTrue("modifying event file in place" in out, out) 2236 self.assertTrue("--above: removing events below level 20" in out, out) 2237 self.assertTrue("copied 4 of 5 events into new file" in out, out) 2238 self.compare_events([events[0], events[1], events[3], events[4]], 2239 self._read_logfile(fn2)) 2240 2241 # modify the file in place, two-argument version 2242 argv = ["flogtool", "filter", fn2, fn2] 2243 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2244 self.assertTrue("modifying event file in place" in out, out) 2245 self.assertTrue("copied 4 of 4 events into new file" in out, out) 2246 self.compare_events([events[0], events[1], events[3], events[4]], 2247 self._read_logfile(fn2)) 2248 2249 # --above with a string argument 2250 argv = ["flogtool", "filter", "--above", "OPERATIONAL", fn, fn2] 2251 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2252 self.assertTrue("--above: removing events below level 20" in out, out) 2253 self.assertTrue("copied 4 of 5 events into new file" in out, out) 2254 self.compare_events([events[0], events[1], events[3], events[4]], 2255 self._read_logfile(fn2)) 2256 2257 t_one = events[1]["d"]["time"] 2258 # we can only pass integers into --before and --after, so we'll 2259 # just test that we get all or nothing 2260 argv = ["flogtool", "filter", "--before", str(int(t_one - 10)), 2261 fn, fn2] 2262 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2263 self.assertTrue("copied 1 of 5 events into new file" in out, out) 2264 # we always get the header, so it's 1 instead of 0 2265 self.compare_events(events[:1], self._read_logfile(fn2)) 2266 2267 argv = ["flogtool", "filter", "--after", str(int(t_one + 10)), 2268 fn, fn2] 2269 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2270 self.assertTrue("copied 1 of 5 events into new file" in out, out) 2271 self.compare_events(events[:1], self._read_logfile(fn2)) 2272 2273 # --facility 2274 argv = ["flogtool", "filter", "--strip-facility", "big", fn, fn2] 2275 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2276 self.assertTrue("--strip-facility: removing events for big and children" in out, out) 2277 self.assertTrue("copied 4 of 5 events into new file" in out, out) 2278 self.compare_events([events[0],events[2],events[3],events[4]], 2279 self._read_logfile(fn2)) 2280 2281 # pass-through, --verbose, read from .bz2 2282 argv = ["flogtool", "filter", "--verbose", fn2bz2, fn2] 2283 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2284 self.assertTrue("copied 5 of 5 events into new file" in out, out) 2285 lines = [l.strip() for l in StringIO(out).readlines()] 2286 self.assertEqual(lines, 2287 ["HEADER", "0", "1", "2", "3", 2288 "copied 5 of 5 events into new file"]) 2289 self.compare_events(events, self._read_logfile(fn2)) 2290 2291 # --from . This normally takes a base32 tubid prefix, but the 2292 # things we've logged all say ["from"]="local". So just test 2293 # all-or-nothing. 2294 argv = ["flogtool", "filter", "--from", "local", fn, fn2] 2295 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2296 self.assertTrue("--from: retaining events only from tubid prefix local" in out, out) 2297 self.assertTrue("copied 5 of 5 events into new file" in out, out) 2298 self.compare_events(events, self._read_logfile(fn2)) 2299 2300 argv = ["flogtool", "filter", "--from", "NOTlocal", fn, fn2] 2301 (out,err) = cli.run_flogtool(argv[1:], run_by_human=False) 2302 self.assertTrue("--from: retaining events only from tubid prefix NOTlocal" in out, out) 2303 self.assertTrue("copied 1 of 5 events into new file" in out, out) 2304 self.compare_events(events[:1], self._read_logfile(fn2)) 2305 2306 2307 d.addCallback(_check) 2308 return d 2309 2310 2311@inlineCallbacks 2312def getPage(url): 2313 a = client.Agent(reactor) 2314 response = yield a.request(b"GET", six.ensure_binary(url)) 2315 import warnings 2316 with warnings.catch_warnings(): 2317 warnings.simplefilter("ignore") 2318 # Twisted can emit a spurious internal warning here ("Using readBody 2319 # with a transport that does not have an abortConnection method") 2320 # which seems to be https://twistedmatrix.com/trac/ticket/8227 2321 page = yield client.readBody(response) 2322 if response.code != 200: 2323 raise ValueError("request failed (%d), page contents were: %s" % ( 2324 response.code, six.ensure_str(page))) 2325 returnValue(page) 2326 2327class Web(unittest.TestCase): 2328 def setUp(self): 2329 self.viewer = None 2330 def tearDown(self): 2331 d = defer.maybeDeferred(unittest.TestCase.tearDown, self) 2332 if self.viewer: 2333 d.addCallback(lambda res: self.viewer.stop()) 2334 return d 2335 2336 @inlineCallbacks 2337 def test_basic(self): 2338 basedir = "logging/Web/basic" 2339 os.makedirs(basedir) 2340 l = log.FoolscapLogger() 2341 fn = os.path.join(basedir, "flog.out") 2342 ob = log.LogFileObserver(fn) 2343 l.addObserver(ob.msg) 2344 l.msg("one") 2345 lp = l.msg("two") 2346 l.msg("three", parent=lp, failure=failure.Failure(RuntimeError("yo"))) 2347 l.msg("four", level=log.UNUSUAL) 2348 yield fireEventually() 2349 l.removeObserver(ob.msg) 2350 ob._stop() 2351 2352 portnum = allocate_tcp_port() 2353 argv = ["-p", "tcp:%d:interface=127.0.0.1" % portnum, 2354 "--quiet", 2355 fn] 2356 options = web.WebViewerOptions() 2357 options.parseOptions(argv) 2358 self.viewer = web.WebViewer() 2359 self.url = yield self.viewer.start(options) 2360 self.baseurl = self.url[:self.url.rfind("/")] + "/" 2361 2362 page = yield getPage(self.url) 2363 page = six.ensure_str(page) 2364 mypid = os.getpid() 2365 self.assertTrue("PID %s" % mypid in page, 2366 "didn't see 'PID %s' in '%s'" % (mypid, page)) 2367 self.assertTrue("Application Versions:" in page, page) 2368 self.assertTrue("foolscap: %s" % foolscap.__version__ in page, page) 2369 self.assertTrue("4 events covering" in page) 2370 self.assertTrue('href="summary/0-20">3 events</a> at level 20' 2371 in page) 2372 2373 page = yield getPage(self.baseurl + "summary/0-20") 2374 page = six.ensure_str(page) 2375 self.assertTrue("Events at level 20" in page) 2376 self.assertTrue(": two" in page) 2377 self.assertFalse("four" in page) 2378 2379 def check_all_events(page): 2380 page = six.ensure_str(page) 2381 self.assertTrue("3 root events" in page) 2382 self.assertTrue(": one</span>" in page) 2383 self.assertTrue(": two</span>" in page) 2384 self.assertTrue(": three FAILURE:" in page) 2385 self.assertTrue(": UNUSUAL four</span>" in page) 2386 2387 page = yield getPage(self.baseurl + "all-events") 2388 check_all_events(page) 2389 2390 page = yield getPage(self.baseurl + "all-events?sort=number") 2391 check_all_events(page) 2392 2393 page = yield getPage(self.baseurl + "all-events?sort=time") 2394 check_all_events(page) 2395 2396 page = yield getPage(self.baseurl + "all-events?sort=nested") 2397 check_all_events(page) 2398 2399 page = yield getPage(self.baseurl + "all-events?timestamps=short-local") 2400 check_all_events(page) 2401 2402 page = yield getPage(self.baseurl + "all-events?timestamps=utc") 2403 check_all_events(page) 2404 2405 2406 2407class Bridge(unittest.TestCase): 2408 def test_foolscap_to_twisted(self): 2409 fl = log.FoolscapLogger() 2410 tw = twisted_log.LogPublisher() 2411 log.bridgeLogsToTwisted(None, fl, tw) 2412 tw_out = [] 2413 tw.addObserver(tw_out.append) 2414 fl_out = [] 2415 fl.addObserver(fl_out.append) 2416 2417 fl.msg("one") 2418 fl.msg(format="two %(two)d", two=2) 2419 fl.msg("three", level=log.NOISY) # should be removed 2420 d = flushEventualQueue() 2421 def _check(res): 2422 self.assertEqual(len(fl_out), 3) 2423 self.assertEqual(fl_out[0]["message"], "one") 2424 self.assertEqual(fl_out[1]["format"], "two %(two)d") 2425 self.assertEqual(fl_out[2]["message"], "three") 2426 2427 self.assertEqual(len(tw_out), 2) 2428 self.assertEqual(tw_out[0]["message"], ("one",)) 2429 self.assertTrue(tw_out[0]["from-foolscap"]) 2430 self.assertEqual(tw_out[1]["message"], ("two 2",)) 2431 self.assertTrue(tw_out[1]["from-foolscap"]) 2432 2433 d.addCallback(_check) 2434 return d 2435 2436 def test_twisted_to_foolscap(self): 2437 fl = log.FoolscapLogger() 2438 tw = twisted_log.LogPublisher() 2439 log.bridgeLogsFromTwisted(None, tw, fl) 2440 tw_out = [] 2441 tw.addObserver(tw_out.append) 2442 fl_out = [] 2443 fl.addObserver(fl_out.append) 2444 2445 tw.msg("one") 2446 tw.msg(format="two %(two)d", two=2) 2447 # twisted now has places (e.g. Factory.doStart) where the new 2448 # Logger.info() is called with arbitrary (unserializable) kwargs for 2449 # string formatting, which are passed into the old LogPublisher(), 2450 # from which they arrive in foolscap. Make sure we can tolerate that. 2451 # The rule is that foolscap immediately stringifies all events it 2452 # gets from twisted (with log.textFromEventDict), and doesn't store 2453 # the additional arguments. So it's ok to put an *unserializable* 2454 # argument into the log.msg() call, as long as it's still 2455 # *stringifyable*. 2456 unserializable = lambda: "unserializable" 2457 tw.msg(format="three is %(evil)s", evil=unserializable) 2458 2459 d = flushEventualQueue() 2460 def _check(res): 2461 self.assertEqual(len(tw_out), 3) 2462 self.assertEqual(tw_out[0]["message"], ("one",)) 2463 self.assertEqual(tw_out[1]["format"], "two %(two)d") 2464 self.assertEqual(tw_out[1]["two"], 2) 2465 self.assertEqual(tw_out[2]["format"], "three is %(evil)s") 2466 self.assertEqual(tw_out[2]["evil"], unserializable) 2467 self.assertEqual(len(fl_out), 3) 2468 self.assertEqual(fl_out[0]["message"], "one") 2469 self.assertTrue(fl_out[0]["from-twisted"]) 2470 self.assertEqual(fl_out[1]["message"], "two 2") 2471 self.assertTrue(fl_out[1]["from-twisted"]) 2472 # str(unserializable) is like "<function <lambda> at 0xblahblah>" 2473 self.assertEqual(fl_out[2]["message"], 2474 "three is " + str(unserializable)) 2475 self.assertTrue(fl_out[2]["from-twisted"]) 2476 2477 d.addCallback(_check) 2478 return d 2479 2480 def test_twisted_logger_to_foolscap(self): 2481 if not twisted_logger: 2482 raise unittest.SkipTest("needs twisted.logger from Twisted>=15.2.0") 2483 new_pub = twisted_logger.LogPublisher() 2484 old_pub = twisted_log.LogPublisher(observerPublisher=new_pub, 2485 publishPublisher=new_pub) 2486 fl = log.FoolscapLogger() 2487 log.bridgeLogsFromTwisted(None, old_pub, fl) 2488 tw_out = [] 2489 old_pub.addObserver(tw_out.append) 2490 fl_out = [] 2491 fl.addObserver(fl_out.append) 2492 2493 tl = twisted_logger.Logger(observer=new_pub) 2494 tl.info("one") 2495 # note: new twisted logger wants PEP3101 format strings, {} not % 2496 tl.info(format="two {two}", two=2) 2497 # twisted's new Logger.info() takes arbitrary (unserializable) kwargs 2498 # for string formatting, and passes them into the old LogPublisher(), 2499 # so make sure we can tolerate that. The rule is that foolscap 2500 # stringifies all events it gets from twisted, and doesn't store the 2501 # additional arguments. 2502 unserializable = lambda: "unserializable" 2503 tl.info("three is {evil!s}", evil=unserializable) 2504 2505 d = flushEventualQueue() 2506 def _check(res): 2507 self.assertEqual(len(fl_out), 3) 2508 self.assertEqual(fl_out[0]["message"], "one") 2509 self.assertTrue(fl_out[0]["from-twisted"]) 2510 self.assertEqual(fl_out[1]["message"], "two 2") 2511 self.assertFalse("two" in fl_out[1]) 2512 self.assertTrue(fl_out[1]["from-twisted"]) 2513 # str(unserializable) is like "<function <lambda> at 0xblahblah>" 2514 self.assertEqual(fl_out[2]["message"], 2515 "three is " + str(unserializable)) 2516 self.assertTrue(fl_out[2]["from-twisted"]) 2517 2518 d.addCallback(_check) 2519 return d 2520 2521 def test_no_loops(self): 2522 fl = log.FoolscapLogger() 2523 tw = twisted_log.LogPublisher() 2524 log.bridgeLogsFromTwisted(None, tw, fl) 2525 log.bridgeLogsToTwisted(None, fl, tw) 2526 tw_out = [] 2527 tw.addObserver(tw_out.append) 2528 fl_out = [] 2529 fl.addObserver(fl_out.append) 2530 2531 tw.msg("one") 2532 fl.msg("two") 2533 2534 d = flushEventualQueue() 2535 def _check(res): 2536 self.assertEqual(len(tw_out), 2) 2537 self.assertEqual(tw_out[0]["message"], ("one",)) 2538 self.assertEqual(tw_out[1]["message"], ("two",)) 2539 2540 self.assertEqual(len(fl_out), 2) 2541 self.assertEqual(fl_out[0]["message"], "one") 2542 self.assertEqual(fl_out[1]["message"], "two") 2543 2544 d.addCallback(_check) 2545 return d 2546 2547