1# Copyright (c) 2018 gevent. See LICENSE for details.
2from __future__ import print_function, absolute_import, division
3
4import os
5import sys
6
7from weakref import ref as wref
8
9from greenlet import getcurrent
10
11from gevent import config as GEVENT_CONFIG
12from gevent.monkey import get_original
13from gevent.events import notify
14from gevent.events import EventLoopBlocked
15from gevent.events import MemoryUsageThresholdExceeded
16from gevent.events import MemoryUsageUnderThreshold
17from gevent.events import IPeriodicMonitorThread
18from gevent.events import implementer
19
20from gevent._tracer import GreenletTracer
21from gevent._compat import thread_mod_name
22from gevent._compat import perf_counter
23from gevent._compat import get_this_psutil_process
24
25
26
27__all__ = [
28    'PeriodicMonitoringThread',
29]
30
31get_thread_ident = get_original(thread_mod_name, 'get_ident')
32start_new_thread = get_original(thread_mod_name, 'start_new_thread')
33thread_sleep = get_original('time', 'sleep')
34
35
36
37class MonitorWarning(RuntimeWarning):
38    """The type of warnings we emit."""
39
40
41class _MonitorEntry(object):
42
43    __slots__ = ('function', 'period', 'last_run_time')
44
45    def __init__(self, function, period):
46        self.function = function
47        self.period = period
48        self.last_run_time = 0
49
50    def __eq__(self, other):
51        return self.function == other.function and self.period == other.period
52
53    def __repr__(self):
54        return repr((self.function, self.period, self.last_run_time))
55
56
57@implementer(IPeriodicMonitorThread)
58class PeriodicMonitoringThread(object):
59    # This doesn't extend threading.Thread because that gets monkey-patched.
60    # We use the low-level 'start_new_thread' primitive instead.
61
62    # The amount of seconds we will sleep when we think we have nothing
63    # to do.
64    inactive_sleep_time = 2.0
65
66    # The absolute minimum we will sleep, regardless of
67    # what particular monitoring functions want to say.
68    min_sleep_time = 0.005
69
70    # The minimum period in seconds at which we will check memory usage.
71    # Getting memory usage is fairly expensive.
72    min_memory_monitor_period = 2
73
74    # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
75    # The first entry is always our entry for self.monitor_blocking
76    _monitoring_functions = None
77
78    # The calculated min sleep time for the monitoring functions list.
79    _calculated_sleep_time = None
80
81    # A boolean value that also happens to capture the
82    # memory usage at the time we exceeded the threshold. Reset
83    # to 0 when we go back below.
84    _memory_exceeded = 0
85
86    # The instance of GreenletTracer we're using
87    _greenlet_tracer = None
88
89    def __init__(self, hub):
90        self._hub_wref = wref(hub, self._on_hub_gc)
91        self.should_run = True
92
93        # Must be installed in the thread that the hub is running in;
94        # the trace function is threadlocal
95        assert get_thread_ident() == hub.thread_ident
96        self._greenlet_tracer = GreenletTracer()
97
98        self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
99                                                    GEVENT_CONFIG.max_blocking_time)]
100        self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time
101        # Create the actual monitoring thread. This is effectively a "daemon"
102        # thread.
103        self.monitor_thread_ident = start_new_thread(self, ())
104
105        # We must track the PID to know if your thread has died after a fork
106        self.pid = os.getpid()
107
108    def _on_fork(self):
109        # Pseudo-standard method that resolver_ares and threadpool
110        # also have, called by hub.reinit()
111        pid = os.getpid()
112        if pid != self.pid:
113            self.pid = pid
114            self.monitor_thread_ident = start_new_thread(self, ())
115
116    @property
117    def hub(self):
118        return self._hub_wref()
119
120
121    def monitoring_functions(self):
122        # Return a list of _MonitorEntry objects
123
124        # Update max_blocking_time each time.
125        mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes.
126        if mbt != self._monitoring_functions[0].period:
127            self._monitoring_functions[0].period = mbt
128            self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
129        return self._monitoring_functions
130
131    def add_monitoring_function(self, function, period):
132        if not callable(function):
133            raise ValueError("function must be callable")
134
135        if period is None:
136            # Remove.
137            self._monitoring_functions = [
138                x for x in self._monitoring_functions
139                if x.function != function
140            ]
141        elif period <= 0:
142            raise ValueError("Period must be positive.")
143        else:
144            # Add or update period
145            entry = _MonitorEntry(function, period)
146            self._monitoring_functions = [
147                x if x.function != function else entry
148                for x in self._monitoring_functions
149            ]
150            if entry not in self._monitoring_functions:
151                self._monitoring_functions.append(entry)
152        self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
153
154    def calculate_sleep_time(self):
155        min_sleep = self._calculated_sleep_time
156        if min_sleep <= 0:
157            # Everyone wants to be disabled. Sleep for a longer period of
158            # time than usual so we don't spin unnecessarily. We might be
159            # enabled again in the future.
160            return self.inactive_sleep_time
161        return max((min_sleep, self.min_sleep_time))
162
163    def kill(self):
164        if not self.should_run:
165            # Prevent overwriting trace functions.
166            return
167        # Stop this monitoring thread from running.
168        self.should_run = False
169        # Uninstall our tracing hook
170        self._greenlet_tracer.kill()
171
172    def _on_hub_gc(self, _):
173        self.kill()
174
175    def __call__(self):
176        # The function that runs in the monitoring thread.
177        # We cannot use threading.current_thread because it would
178        # create an immortal DummyThread object.
179        getcurrent().gevent_monitoring_thread = wref(self)
180
181        try:
182            while self.should_run:
183                functions = self.monitoring_functions()
184                assert functions
185                sleep_time = self.calculate_sleep_time()
186
187                thread_sleep(sleep_time)
188
189                # Make sure the hub is still around, and still active,
190                # and keep it around while we are here.
191                hub = self.hub
192                if not hub:
193                    self.kill()
194
195                if self.should_run:
196                    this_run = perf_counter()
197                    for entry in functions:
198                        f = entry.function
199                        period = entry.period
200                        last_run = entry.last_run_time
201                        if period and last_run + period <= this_run:
202                            entry.last_run_time = this_run
203                            f(hub)
204                del hub # break our reference to hub while we sleep
205
206        except SystemExit:
207            pass
208        except: # pylint:disable=bare-except
209            # We're a daemon thread, so swallow any exceptions that get here
210            # during interpreter shutdown.
211            if not sys or not sys.stderr: # pragma: no cover
212                # Interpreter is shutting down
213                pass
214            else:
215                hub = self.hub
216                if hub is not None:
217                    # XXX: This tends to do bad things like end the process, because we
218                    # try to switch *threads*, which can't happen. Need something better.
219                    hub.handle_error(self, *sys.exc_info())
220
221    def monitor_blocking(self, hub):
222        # Called periodically to see if the trace function has
223        # fired to switch greenlets. If not, we will print
224        # the greenlet tree.
225
226        # For tests, we return a true value when we think we found something
227        # blocking
228
229        did_block = self._greenlet_tracer.did_block_hub(hub)
230        if not did_block:
231            return
232
233        active_greenlet = did_block[1] # pylint:disable=unsubscriptable-object
234        report = self._greenlet_tracer.did_block_hub_report(
235            hub, active_greenlet,
236            dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident))
237
238        stream = hub.exception_stream
239        for line in report:
240            # Printing line by line may interleave with other things,
241            # but it should also prevent a "reentrant call to print"
242            # when the report is large.
243            print(line, file=stream)
244
245        notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report))
246        return (active_greenlet, report)
247
248    def ignore_current_greenlet_blocking(self):
249        self._greenlet_tracer.ignore_current_greenlet_blocking()
250
251    def monitor_current_greenlet_blocking(self):
252        self._greenlet_tracer.monitor_current_greenlet_blocking()
253
254    def _get_process(self): # pylint:disable=method-hidden
255        proc = get_this_psutil_process()
256        self._get_process = lambda: proc
257        return proc
258
259    def can_monitor_memory_usage(self):
260        return self._get_process() is not None
261
262    def install_monitor_memory_usage(self):
263        # Start monitoring memory usage, if possible.
264        # If not possible, emit a warning.
265        if not self.can_monitor_memory_usage():
266            import warnings
267            warnings.warn("Unable to monitor memory usage. Install psutil.",
268                          MonitorWarning)
269            return
270
271        self.add_monitoring_function(self.monitor_memory_usage,
272                                     max(GEVENT_CONFIG.memory_monitor_period,
273                                         self.min_memory_monitor_period))
274
275    def monitor_memory_usage(self, _hub):
276        max_allowed = GEVENT_CONFIG.max_memory_usage
277        if not max_allowed:
278            # They disabled it.
279            return -1 # value for tests
280
281        rusage = self._get_process().memory_full_info()
282        # uss only documented available on Windows, Linux, and OS X.
283        # If not available, fall back to rss as an aproximation.
284        mem_usage = getattr(rusage, 'uss', 0) or rusage.rss
285
286        event = None # Return value for tests
287
288        if mem_usage > max_allowed:
289            if mem_usage > self._memory_exceeded:
290                # We're still growing
291                event = MemoryUsageThresholdExceeded(
292                    mem_usage, max_allowed, rusage)
293                notify(event)
294            self._memory_exceeded = mem_usage
295        else:
296            # we're below. Were we above it last time?
297            if self._memory_exceeded:
298                event = MemoryUsageUnderThreshold(
299                    mem_usage, max_allowed, rusage, self._memory_exceeded)
300                notify(event)
301            self._memory_exceeded = 0
302
303        return event
304
305    def __repr__(self):
306        return '<%s at %s in thread %s greenlet %r for %r>' % (
307            self.__class__.__name__,
308            hex(id(self)),
309            hex(self.monitor_thread_ident),
310            getcurrent(),
311            self._hub_wref())
312