1# Copyright (c) 2018 gevent. See LICENSE for details. 2from __future__ import print_function, absolute_import, division 3 4import os 5import sys 6 7from weakref import ref as wref 8 9from greenlet import getcurrent 10 11from gevent import config as GEVENT_CONFIG 12from gevent.monkey import get_original 13from gevent.events import notify 14from gevent.events import EventLoopBlocked 15from gevent.events import MemoryUsageThresholdExceeded 16from gevent.events import MemoryUsageUnderThreshold 17from gevent.events import IPeriodicMonitorThread 18from gevent.events import implementer 19 20from gevent._tracer import GreenletTracer 21from gevent._compat import thread_mod_name 22from gevent._compat import perf_counter 23from gevent._compat import get_this_psutil_process 24 25 26 27__all__ = [ 28 'PeriodicMonitoringThread', 29] 30 31get_thread_ident = get_original(thread_mod_name, 'get_ident') 32start_new_thread = get_original(thread_mod_name, 'start_new_thread') 33thread_sleep = get_original('time', 'sleep') 34 35 36 37class MonitorWarning(RuntimeWarning): 38 """The type of warnings we emit.""" 39 40 41class _MonitorEntry(object): 42 43 __slots__ = ('function', 'period', 'last_run_time') 44 45 def __init__(self, function, period): 46 self.function = function 47 self.period = period 48 self.last_run_time = 0 49 50 def __eq__(self, other): 51 return self.function == other.function and self.period == other.period 52 53 def __repr__(self): 54 return repr((self.function, self.period, self.last_run_time)) 55 56 57@implementer(IPeriodicMonitorThread) 58class PeriodicMonitoringThread(object): 59 # This doesn't extend threading.Thread because that gets monkey-patched. 60 # We use the low-level 'start_new_thread' primitive instead. 61 62 # The amount of seconds we will sleep when we think we have nothing 63 # to do. 64 inactive_sleep_time = 2.0 65 66 # The absolute minimum we will sleep, regardless of 67 # what particular monitoring functions want to say. 68 min_sleep_time = 0.005 69 70 # The minimum period in seconds at which we will check memory usage. 71 # Getting memory usage is fairly expensive. 72 min_memory_monitor_period = 2 73 74 # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))] 75 # The first entry is always our entry for self.monitor_blocking 76 _monitoring_functions = None 77 78 # The calculated min sleep time for the monitoring functions list. 79 _calculated_sleep_time = None 80 81 # A boolean value that also happens to capture the 82 # memory usage at the time we exceeded the threshold. Reset 83 # to 0 when we go back below. 84 _memory_exceeded = 0 85 86 # The instance of GreenletTracer we're using 87 _greenlet_tracer = None 88 89 def __init__(self, hub): 90 self._hub_wref = wref(hub, self._on_hub_gc) 91 self.should_run = True 92 93 # Must be installed in the thread that the hub is running in; 94 # the trace function is threadlocal 95 assert get_thread_ident() == hub.thread_ident 96 self._greenlet_tracer = GreenletTracer() 97 98 self._monitoring_functions = [_MonitorEntry(self.monitor_blocking, 99 GEVENT_CONFIG.max_blocking_time)] 100 self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time 101 # Create the actual monitoring thread. This is effectively a "daemon" 102 # thread. 103 self.monitor_thread_ident = start_new_thread(self, ()) 104 105 # We must track the PID to know if your thread has died after a fork 106 self.pid = os.getpid() 107 108 def _on_fork(self): 109 # Pseudo-standard method that resolver_ares and threadpool 110 # also have, called by hub.reinit() 111 pid = os.getpid() 112 if pid != self.pid: 113 self.pid = pid 114 self.monitor_thread_ident = start_new_thread(self, ()) 115 116 @property 117 def hub(self): 118 return self._hub_wref() 119 120 121 def monitoring_functions(self): 122 # Return a list of _MonitorEntry objects 123 124 # Update max_blocking_time each time. 125 mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes. 126 if mbt != self._monitoring_functions[0].period: 127 self._monitoring_functions[0].period = mbt 128 self._calculated_sleep_time = min(x.period for x in self._monitoring_functions) 129 return self._monitoring_functions 130 131 def add_monitoring_function(self, function, period): 132 if not callable(function): 133 raise ValueError("function must be callable") 134 135 if period is None: 136 # Remove. 137 self._monitoring_functions = [ 138 x for x in self._monitoring_functions 139 if x.function != function 140 ] 141 elif period <= 0: 142 raise ValueError("Period must be positive.") 143 else: 144 # Add or update period 145 entry = _MonitorEntry(function, period) 146 self._monitoring_functions = [ 147 x if x.function != function else entry 148 for x in self._monitoring_functions 149 ] 150 if entry not in self._monitoring_functions: 151 self._monitoring_functions.append(entry) 152 self._calculated_sleep_time = min(x.period for x in self._monitoring_functions) 153 154 def calculate_sleep_time(self): 155 min_sleep = self._calculated_sleep_time 156 if min_sleep <= 0: 157 # Everyone wants to be disabled. Sleep for a longer period of 158 # time than usual so we don't spin unnecessarily. We might be 159 # enabled again in the future. 160 return self.inactive_sleep_time 161 return max((min_sleep, self.min_sleep_time)) 162 163 def kill(self): 164 if not self.should_run: 165 # Prevent overwriting trace functions. 166 return 167 # Stop this monitoring thread from running. 168 self.should_run = False 169 # Uninstall our tracing hook 170 self._greenlet_tracer.kill() 171 172 def _on_hub_gc(self, _): 173 self.kill() 174 175 def __call__(self): 176 # The function that runs in the monitoring thread. 177 # We cannot use threading.current_thread because it would 178 # create an immortal DummyThread object. 179 getcurrent().gevent_monitoring_thread = wref(self) 180 181 try: 182 while self.should_run: 183 functions = self.monitoring_functions() 184 assert functions 185 sleep_time = self.calculate_sleep_time() 186 187 thread_sleep(sleep_time) 188 189 # Make sure the hub is still around, and still active, 190 # and keep it around while we are here. 191 hub = self.hub 192 if not hub: 193 self.kill() 194 195 if self.should_run: 196 this_run = perf_counter() 197 for entry in functions: 198 f = entry.function 199 period = entry.period 200 last_run = entry.last_run_time 201 if period and last_run + period <= this_run: 202 entry.last_run_time = this_run 203 f(hub) 204 del hub # break our reference to hub while we sleep 205 206 except SystemExit: 207 pass 208 except: # pylint:disable=bare-except 209 # We're a daemon thread, so swallow any exceptions that get here 210 # during interpreter shutdown. 211 if not sys or not sys.stderr: # pragma: no cover 212 # Interpreter is shutting down 213 pass 214 else: 215 hub = self.hub 216 if hub is not None: 217 # XXX: This tends to do bad things like end the process, because we 218 # try to switch *threads*, which can't happen. Need something better. 219 hub.handle_error(self, *sys.exc_info()) 220 221 def monitor_blocking(self, hub): 222 # Called periodically to see if the trace function has 223 # fired to switch greenlets. If not, we will print 224 # the greenlet tree. 225 226 # For tests, we return a true value when we think we found something 227 # blocking 228 229 did_block = self._greenlet_tracer.did_block_hub(hub) 230 if not did_block: 231 return 232 233 active_greenlet = did_block[1] # pylint:disable=unsubscriptable-object 234 report = self._greenlet_tracer.did_block_hub_report( 235 hub, active_greenlet, 236 dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident)) 237 238 stream = hub.exception_stream 239 for line in report: 240 # Printing line by line may interleave with other things, 241 # but it should also prevent a "reentrant call to print" 242 # when the report is large. 243 print(line, file=stream) 244 245 notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report)) 246 return (active_greenlet, report) 247 248 def ignore_current_greenlet_blocking(self): 249 self._greenlet_tracer.ignore_current_greenlet_blocking() 250 251 def monitor_current_greenlet_blocking(self): 252 self._greenlet_tracer.monitor_current_greenlet_blocking() 253 254 def _get_process(self): # pylint:disable=method-hidden 255 proc = get_this_psutil_process() 256 self._get_process = lambda: proc 257 return proc 258 259 def can_monitor_memory_usage(self): 260 return self._get_process() is not None 261 262 def install_monitor_memory_usage(self): 263 # Start monitoring memory usage, if possible. 264 # If not possible, emit a warning. 265 if not self.can_monitor_memory_usage(): 266 import warnings 267 warnings.warn("Unable to monitor memory usage. Install psutil.", 268 MonitorWarning) 269 return 270 271 self.add_monitoring_function(self.monitor_memory_usage, 272 max(GEVENT_CONFIG.memory_monitor_period, 273 self.min_memory_monitor_period)) 274 275 def monitor_memory_usage(self, _hub): 276 max_allowed = GEVENT_CONFIG.max_memory_usage 277 if not max_allowed: 278 # They disabled it. 279 return -1 # value for tests 280 281 rusage = self._get_process().memory_full_info() 282 # uss only documented available on Windows, Linux, and OS X. 283 # If not available, fall back to rss as an aproximation. 284 mem_usage = getattr(rusage, 'uss', 0) or rusage.rss 285 286 event = None # Return value for tests 287 288 if mem_usage > max_allowed: 289 if mem_usage > self._memory_exceeded: 290 # We're still growing 291 event = MemoryUsageThresholdExceeded( 292 mem_usage, max_allowed, rusage) 293 notify(event) 294 self._memory_exceeded = mem_usage 295 else: 296 # we're below. Were we above it last time? 297 if self._memory_exceeded: 298 event = MemoryUsageUnderThreshold( 299 mem_usage, max_allowed, rusage, self._memory_exceeded) 300 notify(event) 301 self._memory_exceeded = 0 302 303 return event 304 305 def __repr__(self): 306 return '<%s at %s in thread %s greenlet %r for %r>' % ( 307 self.__class__.__name__, 308 hex(id(self)), 309 hex(self.monitor_thread_ident), 310 getcurrent(), 311 self._hub_wref()) 312