1# -*- encoding: utf-8 -*- 2import sys 3import threading 4import time 5import typing 6 7import attr 8 9from ddtrace.internal import nogevent 10from ddtrace.internal import service 11 12from . import forksafe 13 14 15class PeriodicThread(threading.Thread): 16 """Periodic thread. 17 18 This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval` 19 seconds. 20 21 """ 22 23 _ddtrace_profiling_ignore = True 24 25 def __init__( 26 self, 27 interval, # type: float 28 target, # type: typing.Callable[[], typing.Any] 29 name=None, # type: typing.Optional[str] 30 on_shutdown=None, # type: typing.Optional[typing.Callable[[], typing.Any]] 31 ): 32 # type: (...) -> None 33 """Create a periodic thread. 34 35 :param interval: The interval in seconds to wait between execution of the periodic function. 36 :param target: The periodic function to execute every interval. 37 :param name: The name of the thread. 38 :param on_shutdown: The function to call when the thread shuts down. 39 """ 40 super(PeriodicThread, self).__init__(name=name) 41 self._target = target 42 self._on_shutdown = on_shutdown 43 self.interval = interval 44 self.quit = forksafe.Event() 45 self.daemon = True 46 47 def stop(self): 48 """Stop the thread.""" 49 # NOTE: make sure the thread is alive before using self.quit: 50 # 1. self.quit is Lock-based 51 # 2. if we're a child trying to stop a Thread, 52 # the Lock might have been locked in a parent process while forking so that'd block forever 53 if self.is_alive(): 54 self.quit.set() 55 56 def run(self): 57 """Run the target function periodically.""" 58 while not self.quit.wait(self.interval): 59 # DEV: Some frameworks, like e.g. gevent, seem to resuscitate some 60 # of the threads that were running prior to the fork of the worker 61 # processes. These threads are normally created via the native API 62 # and are exposed to the child process as _DummyThreads. We check 63 # whether the current thread is no longer an instance of the 64 # original thread class to prevent it from running in the child 65 # process while the state copied over from the parent is being 66 # cleaned up. The restarting of the thread is responsibility to the 67 # registered forksafe hooks. 68 if not isinstance(threading.current_thread(), self.__class__): 69 break 70 self._target() 71 if self._on_shutdown is not None: 72 self._on_shutdown() 73 74 75class _GeventPeriodicThread(PeriodicThread): 76 """Periodic thread. 77 78 This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval` 79 seconds. 80 81 """ 82 83 # That's the value Python 2 uses in its `threading` module 84 SLEEP_INTERVAL = 0.005 85 86 def __init__(self, interval, target, name=None, on_shutdown=None): 87 """Create a periodic thread. 88 89 :param interval: The interval in seconds to wait between execution of the periodic function. 90 :param target: The periodic function to execute every interval. 91 :param name: The name of the thread. 92 :param on_shutdown: The function to call when the thread shuts down. 93 """ 94 super(_GeventPeriodicThread, self).__init__(interval, target, name, on_shutdown) 95 self._tident = None 96 self._periodic_started = False 97 self._periodic_stopped = False 98 99 def _reset_internal_locks(self, is_alive=False): 100 # Called by Python via `threading._after_fork` 101 self._periodic_stopped = True 102 103 @property 104 def ident(self): 105 return self._tident 106 107 def start(self): 108 """Start the thread.""" 109 self.quit = False 110 if self._tident is not None: 111 raise RuntimeError("threads can only be started once") 112 self._tident = nogevent.start_new_thread(self.run, tuple()) 113 if nogevent.threading_get_native_id: 114 self._native_id = nogevent.threading_get_native_id() 115 116 # Wait for the thread to be started to avoid race conditions 117 while not self._periodic_started: 118 time.sleep(self.SLEEP_INTERVAL) 119 120 def is_alive(self): 121 return not self._periodic_stopped and self._periodic_started 122 123 def join(self, timeout=None): 124 # FIXME: handle the timeout argument 125 while self.is_alive(): 126 time.sleep(self.SLEEP_INTERVAL) 127 128 def stop(self): 129 """Stop the thread.""" 130 self.quit = True 131 132 def run(self): 133 """Run the target function periodically.""" 134 # Do not use the threading._active_limbo_lock here because it's a gevent lock 135 threading._active[self._tident] = self 136 137 self._periodic_started = True 138 139 try: 140 while self.quit is False: 141 self._target() 142 slept = 0 143 while self.quit is False and slept < self.interval: 144 nogevent.sleep(self.SLEEP_INTERVAL) 145 slept += self.SLEEP_INTERVAL 146 if self._on_shutdown is not None: 147 self._on_shutdown() 148 except Exception: 149 # Exceptions might happen during interpreter shutdown. 150 # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. 151 # See `threading.Thread._bootstrap` for details. 152 if sys is not None: 153 raise 154 finally: 155 try: 156 self._periodic_stopped = True 157 del threading._active[self._tident] 158 except Exception: 159 # Exceptions might happen during interpreter shutdown. 160 # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. 161 # See `threading.Thread._bootstrap` for details. 162 if sys is not None: 163 raise 164 165 166def PeriodicRealThreadClass(): 167 # type: () -> typing.Type[PeriodicThread] 168 """Return a PeriodicThread class based on the underlying thread implementation (native, gevent, etc). 169 170 The returned class works exactly like ``PeriodicThread``, except that it runs on a *real* OS thread. Be aware that 171 this might be tricky in e.g. the gevent case, where ``Lock`` object must not be shared with the ``MainThread`` 172 (otherwise it'd dead lock). 173 174 """ 175 if nogevent.is_module_patched("threading"): 176 return _GeventPeriodicThread 177 return PeriodicThread 178 179 180@attr.s(eq=False) 181class PeriodicService(service.Service): 182 """A service that runs periodically.""" 183 184 _interval = attr.ib(type=float) 185 _worker = attr.ib(default=None, init=False, repr=False) 186 187 _real_thread = False 188 "Class variable to override if the service should run in a real OS thread." 189 190 @property 191 def interval(self): 192 # type: (...) -> float 193 return self._interval 194 195 @interval.setter 196 def interval( 197 self, value # type: float 198 ): 199 # type: (...) -> None 200 self._interval = value 201 # Update the interval of the PeriodicThread based on ours 202 if self._worker: 203 self._worker.interval = value 204 205 def _start_service( 206 self, 207 *args, # type: typing.Any 208 **kwargs # type: typing.Any 209 ): 210 # type: (...) -> None 211 """Start the periodic service.""" 212 periodic_thread_class = PeriodicRealThreadClass() if self._real_thread else PeriodicThread 213 self._worker = periodic_thread_class( 214 self.interval, 215 target=self.periodic, 216 name="%s:%s" % (self.__class__.__module__, self.__class__.__name__), 217 on_shutdown=self.on_shutdown, 218 ) 219 self._worker.start() 220 221 def _stop_service( 222 self, 223 *args, # type: typing.Any 224 **kwargs # type: typing.Any 225 ): 226 # type: (...) -> None 227 """Stop the periodic collector.""" 228 self._worker.stop() 229 super(PeriodicService, self)._stop_service(*args, **kwargs) 230 231 def join( 232 self, timeout=None # type: typing.Optional[float] 233 ): 234 # type: (...) -> None 235 if self._worker: 236 self._worker.join(timeout) 237 238 @staticmethod 239 def on_shutdown(): 240 pass 241 242 def periodic(self): 243 # type: (...) -> None 244 pass 245