1from __future__ import print_function, unicode_literals 2import six, os, sys, time 3from zope.interface import implementer 4from twisted.internet import reactor 5from twisted.python import usage 6from foolscap import base32 7from foolscap.api import Tub, Referenceable, fireEventually 8from foolscap.logging import log, flogfile 9from foolscap.referenceable import SturdyRef 10from foolscap.util import format_time, FORMAT_TIME_MODES, ensure_dict_str, ensure_dict_str_keys 11from .interfaces import RILogObserver 12 13def short_tubid_b2a(tubid): 14 return base32.encode(tubid)[:8] 15 16@implementer(RILogObserver) 17class LogSaver(Referenceable): 18 def __init__(self, nodeid_s, savefile): 19 self.nodeid_s = nodeid_s 20 self.f = savefile # we own this, and may close it 21 self.f.write(flogfile.MAGIC) 22 23 def emit_header(self, versions, pid): 24 flogfile.serialize_header(self.f, "tail", versions=versions, pid=pid) 25 26 def remote_msg(self, d): 27 try: 28 flogfile.serialize_wrapper(self.f, d, 29 from_=self.nodeid_s, 30 rx_time=time.time()) 31 except Exception as ex: 32 print("GATHERER: unable to serialize %s: %s" % (d, ex)) 33 34 def disconnected(self): 35 self.f.close() 36 del self.f 37 38class TailOptions(usage.Options): 39 synopsis = "Usage: flogtool tail (LOGPORT.furl/furlfile/nodedir)" 40 41 optFlags = [ 42 ("verbose", "v", "Show all event arguments"), 43 ("catch-up", "c", "Catch up with recent events"), 44 ] 45 optParameters = [ 46 ("save-to", "s", None, 47 "Save events to the given file. The file will be overwritten."), 48 ("timestamps", "t", "short-local", 49 "Format for timestamps: " + " ".join(FORMAT_TIME_MODES)), 50 ] 51 52 def opt_timestamps(self, arg): 53 if arg not in FORMAT_TIME_MODES: 54 raise usage.UsageError("--timestamps= must be one of (%s)" % 55 ", ".join(FORMAT_TIME_MODES)) 56 self["timestamps"] = arg 57 58 def parseArgs(self, target): 59 if target.startswith("pb:"): 60 self.target_furl = target 61 elif os.path.isfile(target): 62 self.target_furl = open(target, "r").read().strip() 63 elif os.path.isdir(target): 64 fn = os.path.join(target, "logport.furl") 65 self.target_furl = open(fn, "r").read().strip() 66 else: 67 raise RuntimeError("Can't use tail target: %s" % target) 68 69@implementer(RILogObserver) 70class LogPrinter(Referenceable): 71 def __init__(self, options, target_tubid_s, output=sys.stdout): 72 self.options = options 73 self.saver = None 74 if options["save-to"]: 75 self.saver = LogSaver(target_tubid_s[:8], 76 open(options["save-to"], "wb")) 77 self.output = output 78 79 def got_versions(self, versions, pid=None): 80 versions = ensure_dict_str(versions) 81 print("Remote Versions:", file=self.output) 82 for k in sorted(versions.keys()): 83 print(" %s: %s" % (k, versions[k]), file=self.output) 84 if self.saver: 85 self.saver.emit_header(versions, pid) 86 87 def remote_msg(self, d): 88 d = ensure_dict_str_keys(d) 89 if self.options['verbose']: 90 self.simple_print(d) 91 else: 92 self.formatted_print(d) 93 if self.saver: 94 self.saver.remote_msg(d) 95 96 def simple_print(self, d): 97 print(six.text_type(d), file=self.output) 98 99 def formatted_print(self, d): 100 time_s = format_time(d['time'], self.options["timestamps"]) 101 102 msg = log.format_message(d) 103 level = d.get('level', log.OPERATIONAL) 104 105 tubid = "" # TODO 106 print("%s L%d [%s]#%d %s" % (time_s, level, tubid, 107 d["num"], msg), file=self.output) 108 if 'failure' in d: 109 print(" FAILURE:", file=self.output) 110 lines = str(d['failure']).split("\n") 111 for line in lines: 112 print(" %s" % (line,), file=self.output) 113 114 115class LogTail: 116 def __init__(self, options): 117 self.options = options 118 119 def run(self, target_furl): 120 target_tubid = SturdyRef(target_furl).getTubRef().getTubID() 121 d = fireEventually(target_furl) 122 d.addCallback(self.start, target_tubid) 123 d.addErrback(self._error) 124 print("starting..") 125 reactor.run() 126 127 def _error(self, f): 128 print("ERROR", f) 129 reactor.stop() 130 131 def start(self, target_furl, target_tubid): 132 print("Connecting..") 133 self._tub = Tub() 134 self._tub.startService() 135 self._tub.connectTo(target_furl, self._got_logpublisher, target_tubid) 136 137 def _got_logpublisher(self, publisher, target_tubid): 138 d = publisher.callRemote("get_pid") 139 def _announce(pid_or_failure): 140 if isinstance(pid_or_failure, int): 141 print("Connected (to pid %d)" % pid_or_failure) 142 return pid_or_failure 143 else: 144 # the logport is probably foolscap-0.2.8 or earlier and 145 # doesn't offer get_pid() 146 print("Connected (unable to get pid)") 147 return None 148 d.addBoth(_announce) 149 publisher.notifyOnDisconnect(self._lost_logpublisher) 150 lp = LogPrinter(self.options, target_tubid) 151 def _ask_for_versions(pid): 152 d = publisher.callRemote("get_versions") 153 d.addCallback(lp.got_versions, pid) 154 return d 155 d.addCallback(_ask_for_versions) 156 catch_up = bool(self.options["catch-up"]) 157 if catch_up: 158 d.addCallback(lambda res: 159 publisher.callRemote("subscribe_to_all", lp, True)) 160 else: 161 # provide compatibility with foolscap-0.2.4 and earlier, which 162 # didn't accept a catchup= argument 163 d.addCallback(lambda res: 164 publisher.callRemote("subscribe_to_all", lp)) 165 d.addErrback(self._error) 166 return d 167 168 def _lost_logpublisher(publisher): 169 print("Disconnected") 170 171 172