1from __future__ import print_function
2import os
3from collections import deque
4import six
5from zope.interface import implementer
6from twisted.python import filepath
7from foolscap.referenceable import Referenceable
8from foolscap.logging.interfaces import RISubscription, RILogPublisher
9from foolscap.logging import app_versions, flogfile
10from foolscap.eventual import eventually
11from foolscap.util import ensure_dict_binary
12
13@implementer(RISubscription)
14class Subscription(Referenceable):
15    # used as a marker, and as an unsubscribe() method. We use this to manage
16    # the outbound size-limited queue.
17    MAX_QUEUE_SIZE = 2000
18    MAX_IN_FLIGHT = 10
19
20    def __init__(self, observer, logger):
21        self.observer = observer
22        self.logger = logger
23        self.subscribed = False
24        self.queue = deque()
25        self.in_flight = 0
26        self.marked_for_sending = False
27        #self.messages_dropped = 0
28
29    def subscribe(self, catch_up):
30        self.subscribed = True
31        # If we have to discard messages, discard them as early as possible,
32        # and provide backpressure. So we add our method as an "immediate
33        # observer" instead of a regular one.
34        self.logger.addImmediateObserver(self.send)
35        self._nod_marker = self.observer.notifyOnDisconnect(self.unsubscribe)
36        if catch_up:
37            # send any catch-up events in a single batch, before we allow any
38            # other events to be generated (and sent). This lets the
39            # subscriber see events in sorted order. We bypass the bounded
40            # queue for this.
41            events = list(self.logger.get_buffered_events())
42            events.sort(key=lambda a: a['num'])
43            for e in events:
44                self.observer.callRemoteOnly("msg", e)
45
46    def unsubscribe(self):
47        if self.subscribed:
48            self.logger.removeImmediateObserver(self.send)
49            self.observer.dontNotifyOnDisconnect(self._nod_marker)
50            self.subscribed = False
51    def remote_unsubscribe(self):
52        return self.unsubscribe()
53
54    def send(self, event):
55        if len(self.queue) < self.MAX_QUEUE_SIZE:
56            self.queue.append(event)
57        else:
58            # preserve old messages, discard new ones.
59            #self.messages_dropped += 1
60            pass
61        if not self.marked_for_sending:
62            self.marked_for_sending = True
63            eventually(self.start_sending)
64
65    def start_sending(self):
66        self.marked_for_sending = False
67        while self.queue and (self.MAX_IN_FLIGHT - self.in_flight > 0):
68            event = self.queue.popleft()
69            self.in_flight += 1
70            d = self.observer.callRemote("msg", event)
71            d.addCallback(self._event_received)
72            d.addErrback(self._error)
73
74    def _event_received(self, res):
75        self.in_flight -= 1
76        # the following would be nice to have, but requires very careful
77        # analysis to avoid recursion, reentrancy, or even more overload
78        #if self.messages_dropped and not self.queue:
79        #    count = self.messages_dropped
80        #    self.messages_dropped = 0
81        #    log.msg(format="log-publisher: %(dropped)d messages dropped",
82        #            dropped=count,
83        #            facility="foolscap.log.publisher",
84        #            level=log.UNUSUAL)
85        if not self.marked_for_sending:
86            self.marked_for_sending = True
87            eventually(self.start_sending)
88
89    def _error(self, f):
90        #print "PUBLISH FAILED: %s" % f
91        self.unsubscribe()
92
93@implementer(RISubscription)
94class IncidentSubscription(Referenceable):
95    def __init__(self, observer, logger, publisher):
96        self.observer = observer
97        self.logger = logger
98        self.publisher = publisher
99        self.subscribed = False
100
101    def subscribe(self, catch_up=False, since=None):
102        self.subscribed = True
103        self.logger.addImmediateIncidentObserver(self.send)
104        self._nod_marker = self.observer.notifyOnDisconnect(self.unsubscribe)
105        if catch_up:
106            self.catch_up(since)
107
108    def catch_up(self, since):
109        new = dict(self.publisher.list_incident_names(since))
110        for name in sorted(new.keys()):
111            fn = new[name]
112            trigger = self.publisher.get_incident_trigger(fn)
113            if trigger:
114                self.observer.callRemoteOnly("new_incident", six.ensure_binary(name), trigger)
115        self.observer.callRemoteOnly("done_with_incident_catchup")
116
117    def unsubscribe(self):
118        if self.subscribed:
119            self.logger.removeImmediateIncidentObserver(self.send)
120            self.observer.dontNotifyOnDisconnect(self._nod_marker)
121            self.subscribed = False
122    def remote_unsubscribe(self):
123        return self.unsubscribe()
124
125    def send(self, name, trigger):
126        d = self.observer.callRemote("new_incident", six.ensure_binary(name), trigger)
127        d.addErrback(self._error)
128
129    def _error(self, f):
130        print("INCIDENT PUBLISH FAILED: %s" % f)
131        self.unsubscribe()
132
133
134@implementer(RILogPublisher)
135class LogPublisher(Referenceable):
136    """Publish log events to anyone subscribed to our 'logport'.
137
138    This class manages the subscriptions.
139
140    Enable this by asking the Tub for a reference to me, or by telling the
141    Tub to offer me to a log gatherer::
142
143     lp = tub.getLogPort()
144     rref.callRemote('have_a_logport', lp)
145     print 'logport at:', tub.getLogPortFURL()
146
147     tub.setOption('log-gatherer-furl', gatherer_furl)
148
149    Running 'flogtool tail LOGPORT_FURL' will connect to the logport and
150    print all events that subsequently get logged.
151
152    To make the logport use the same furl from one run to the next, give the
153    Tub a filename where it can store the furl. Make sure you do this before
154    touching the logport::
155
156     logport_furlfile = 'logport.furl'
157     tub.setOption('logport-furlfile', logport_furlfile)
158
159    If you're using one or more LogGatherers, pass their FURLs into the Tub
160    with tub.setOption('log-gatherer-furl'), or pass the name of a file
161    where it is stored with tub.setOption('log-gatherer-furlfile'). This
162    will cause the Tub to connect to the gatherer and grant it access to the
163    logport.
164    """
165
166    # the 'versions' dict used to live here in LogPublisher, but now it lives
167    # in foolscap.logging.app_versions and should be accessed from there.
168    # This copy remains for backwards-compatibility.
169    versions = app_versions.versions
170
171    def __init__(self, logger):
172        self._logger = logger
173        logger.setLogPort(self)
174
175    def remote_get_versions(self):
176        return ensure_dict_binary(app_versions.versions)
177    def remote_get_pid(self):
178        return os.getpid()
179
180
181    def remote_subscribe_to_all(self, observer, catch_up=False):
182        s = Subscription(observer, self._logger)
183        eventually(s.subscribe, catch_up)
184        # allow the call to return before we send them any events
185        return s
186
187    def remote_unsubscribe(self, s):
188        return s.unsubscribe()
189
190
191    def trim(self, s, *suffixes):
192        for suffix in suffixes:
193            if s.endswith(suffix):
194                s = s[:-len(suffix)]
195        return s
196
197    def list_incident_names(self, since=""):
198        # yields (name, absfilename) pairs
199        since = six.ensure_str(since)
200        basedir = self._logger.logdir
201        for fn in os.listdir(basedir):
202            if fn.startswith("incident") and not fn.endswith(".tmp"):
203                basename = six.ensure_str(self.trim(fn, ".bz2", ".flog"))
204                if basename > since:
205                    fullname = six.ensure_str(os.path.join(basedir, fn))
206                    yield (basename, fullname)
207
208    def get_incident_trigger(self, abs_fn):
209        events = flogfile.get_events(abs_fn)
210        try:
211            header = next(iter(events))
212        except (EOFError, ValueError):
213            return None
214        assert header["header"]["type"] == "incident"
215        trigger = header["header"]["trigger"]
216        return trigger
217
218    def remote_list_incidents(self, since=""):
219        incidents = {}
220        for (name,fn) in self.list_incident_names(since):
221            trigger = self.get_incident_trigger(fn)
222            if trigger:
223                incidents[six.ensure_str(name)] = trigger
224        return incidents
225
226    def remote_get_incident(self, name):
227        name = six.ensure_str(name)
228        if not name.startswith("incident"):
229            raise KeyError("bad incident name %s" % name)
230        incident_dir = filepath.FilePath(self._logger.logdir)
231        abs_fn = incident_dir.child(name).path + ".flog"
232        try:
233            fn = abs_fn + ".bz2"
234            if not os.path.exists(fn):
235                fn = abs_fn
236            events = flogfile.get_events(fn)
237            # note the generator isn't actually cycled yet, not until next()
238            header = next(events)["header"]
239        except EnvironmentError:
240            raise KeyError("no incident named %s" % name)
241        wrapped_events = [event["d"] for event in events]
242        return (header, wrapped_events)
243
244    def remote_subscribe_to_incidents(self, observer, catch_up=False, since=""):
245        since = six.ensure_str(since)
246        s = IncidentSubscription(observer, self._logger, self)
247        eventually(s.subscribe, catch_up, since)
248        # allow the call to return before we send them any events
249        return s
250