1from __future__ import print_function 2import os 3from collections import deque 4import six 5from zope.interface import implementer 6from twisted.python import filepath 7from foolscap.referenceable import Referenceable 8from foolscap.logging.interfaces import RISubscription, RILogPublisher 9from foolscap.logging import app_versions, flogfile 10from foolscap.eventual import eventually 11from foolscap.util import ensure_dict_binary 12 13@implementer(RISubscription) 14class Subscription(Referenceable): 15 # used as a marker, and as an unsubscribe() method. We use this to manage 16 # the outbound size-limited queue. 17 MAX_QUEUE_SIZE = 2000 18 MAX_IN_FLIGHT = 10 19 20 def __init__(self, observer, logger): 21 self.observer = observer 22 self.logger = logger 23 self.subscribed = False 24 self.queue = deque() 25 self.in_flight = 0 26 self.marked_for_sending = False 27 #self.messages_dropped = 0 28 29 def subscribe(self, catch_up): 30 self.subscribed = True 31 # If we have to discard messages, discard them as early as possible, 32 # and provide backpressure. So we add our method as an "immediate 33 # observer" instead of a regular one. 34 self.logger.addImmediateObserver(self.send) 35 self._nod_marker = self.observer.notifyOnDisconnect(self.unsubscribe) 36 if catch_up: 37 # send any catch-up events in a single batch, before we allow any 38 # other events to be generated (and sent). This lets the 39 # subscriber see events in sorted order. We bypass the bounded 40 # queue for this. 41 events = list(self.logger.get_buffered_events()) 42 events.sort(key=lambda a: a['num']) 43 for e in events: 44 self.observer.callRemoteOnly("msg", e) 45 46 def unsubscribe(self): 47 if self.subscribed: 48 self.logger.removeImmediateObserver(self.send) 49 self.observer.dontNotifyOnDisconnect(self._nod_marker) 50 self.subscribed = False 51 def remote_unsubscribe(self): 52 return self.unsubscribe() 53 54 def send(self, event): 55 if len(self.queue) < self.MAX_QUEUE_SIZE: 56 self.queue.append(event) 57 else: 58 # preserve old messages, discard new ones. 59 #self.messages_dropped += 1 60 pass 61 if not self.marked_for_sending: 62 self.marked_for_sending = True 63 eventually(self.start_sending) 64 65 def start_sending(self): 66 self.marked_for_sending = False 67 while self.queue and (self.MAX_IN_FLIGHT - self.in_flight > 0): 68 event = self.queue.popleft() 69 self.in_flight += 1 70 d = self.observer.callRemote("msg", event) 71 d.addCallback(self._event_received) 72 d.addErrback(self._error) 73 74 def _event_received(self, res): 75 self.in_flight -= 1 76 # the following would be nice to have, but requires very careful 77 # analysis to avoid recursion, reentrancy, or even more overload 78 #if self.messages_dropped and not self.queue: 79 # count = self.messages_dropped 80 # self.messages_dropped = 0 81 # log.msg(format="log-publisher: %(dropped)d messages dropped", 82 # dropped=count, 83 # facility="foolscap.log.publisher", 84 # level=log.UNUSUAL) 85 if not self.marked_for_sending: 86 self.marked_for_sending = True 87 eventually(self.start_sending) 88 89 def _error(self, f): 90 #print "PUBLISH FAILED: %s" % f 91 self.unsubscribe() 92 93@implementer(RISubscription) 94class IncidentSubscription(Referenceable): 95 def __init__(self, observer, logger, publisher): 96 self.observer = observer 97 self.logger = logger 98 self.publisher = publisher 99 self.subscribed = False 100 101 def subscribe(self, catch_up=False, since=None): 102 self.subscribed = True 103 self.logger.addImmediateIncidentObserver(self.send) 104 self._nod_marker = self.observer.notifyOnDisconnect(self.unsubscribe) 105 if catch_up: 106 self.catch_up(since) 107 108 def catch_up(self, since): 109 new = dict(self.publisher.list_incident_names(since)) 110 for name in sorted(new.keys()): 111 fn = new[name] 112 trigger = self.publisher.get_incident_trigger(fn) 113 if trigger: 114 self.observer.callRemoteOnly("new_incident", six.ensure_binary(name), trigger) 115 self.observer.callRemoteOnly("done_with_incident_catchup") 116 117 def unsubscribe(self): 118 if self.subscribed: 119 self.logger.removeImmediateIncidentObserver(self.send) 120 self.observer.dontNotifyOnDisconnect(self._nod_marker) 121 self.subscribed = False 122 def remote_unsubscribe(self): 123 return self.unsubscribe() 124 125 def send(self, name, trigger): 126 d = self.observer.callRemote("new_incident", six.ensure_binary(name), trigger) 127 d.addErrback(self._error) 128 129 def _error(self, f): 130 print("INCIDENT PUBLISH FAILED: %s" % f) 131 self.unsubscribe() 132 133 134@implementer(RILogPublisher) 135class LogPublisher(Referenceable): 136 """Publish log events to anyone subscribed to our 'logport'. 137 138 This class manages the subscriptions. 139 140 Enable this by asking the Tub for a reference to me, or by telling the 141 Tub to offer me to a log gatherer:: 142 143 lp = tub.getLogPort() 144 rref.callRemote('have_a_logport', lp) 145 print 'logport at:', tub.getLogPortFURL() 146 147 tub.setOption('log-gatherer-furl', gatherer_furl) 148 149 Running 'flogtool tail LOGPORT_FURL' will connect to the logport and 150 print all events that subsequently get logged. 151 152 To make the logport use the same furl from one run to the next, give the 153 Tub a filename where it can store the furl. Make sure you do this before 154 touching the logport:: 155 156 logport_furlfile = 'logport.furl' 157 tub.setOption('logport-furlfile', logport_furlfile) 158 159 If you're using one or more LogGatherers, pass their FURLs into the Tub 160 with tub.setOption('log-gatherer-furl'), or pass the name of a file 161 where it is stored with tub.setOption('log-gatherer-furlfile'). This 162 will cause the Tub to connect to the gatherer and grant it access to the 163 logport. 164 """ 165 166 # the 'versions' dict used to live here in LogPublisher, but now it lives 167 # in foolscap.logging.app_versions and should be accessed from there. 168 # This copy remains for backwards-compatibility. 169 versions = app_versions.versions 170 171 def __init__(self, logger): 172 self._logger = logger 173 logger.setLogPort(self) 174 175 def remote_get_versions(self): 176 return ensure_dict_binary(app_versions.versions) 177 def remote_get_pid(self): 178 return os.getpid() 179 180 181 def remote_subscribe_to_all(self, observer, catch_up=False): 182 s = Subscription(observer, self._logger) 183 eventually(s.subscribe, catch_up) 184 # allow the call to return before we send them any events 185 return s 186 187 def remote_unsubscribe(self, s): 188 return s.unsubscribe() 189 190 191 def trim(self, s, *suffixes): 192 for suffix in suffixes: 193 if s.endswith(suffix): 194 s = s[:-len(suffix)] 195 return s 196 197 def list_incident_names(self, since=""): 198 # yields (name, absfilename) pairs 199 since = six.ensure_str(since) 200 basedir = self._logger.logdir 201 for fn in os.listdir(basedir): 202 if fn.startswith("incident") and not fn.endswith(".tmp"): 203 basename = six.ensure_str(self.trim(fn, ".bz2", ".flog")) 204 if basename > since: 205 fullname = six.ensure_str(os.path.join(basedir, fn)) 206 yield (basename, fullname) 207 208 def get_incident_trigger(self, abs_fn): 209 events = flogfile.get_events(abs_fn) 210 try: 211 header = next(iter(events)) 212 except (EOFError, ValueError): 213 return None 214 assert header["header"]["type"] == "incident" 215 trigger = header["header"]["trigger"] 216 return trigger 217 218 def remote_list_incidents(self, since=""): 219 incidents = {} 220 for (name,fn) in self.list_incident_names(since): 221 trigger = self.get_incident_trigger(fn) 222 if trigger: 223 incidents[six.ensure_str(name)] = trigger 224 return incidents 225 226 def remote_get_incident(self, name): 227 name = six.ensure_str(name) 228 if not name.startswith("incident"): 229 raise KeyError("bad incident name %s" % name) 230 incident_dir = filepath.FilePath(self._logger.logdir) 231 abs_fn = incident_dir.child(name).path + ".flog" 232 try: 233 fn = abs_fn + ".bz2" 234 if not os.path.exists(fn): 235 fn = abs_fn 236 events = flogfile.get_events(fn) 237 # note the generator isn't actually cycled yet, not until next() 238 header = next(events)["header"] 239 except EnvironmentError: 240 raise KeyError("no incident named %s" % name) 241 wrapped_events = [event["d"] for event in events] 242 return (header, wrapped_events) 243 244 def remote_subscribe_to_incidents(self, observer, catch_up=False, since=""): 245 since = six.ensure_str(since) 246 s = IncidentSubscription(observer, self._logger, self) 247 eventually(s.subscribe, catch_up, since) 248 # allow the call to return before we send them any events 249 return s 250