1# -*- encoding: utf-8 -*-
2import sys
3import threading
4import time
5import typing
6
7import attr
8
9from ddtrace.internal import nogevent
10from ddtrace.internal import service
11
12from . import forksafe
13
14
15class PeriodicThread(threading.Thread):
16    """Periodic thread.
17
18    This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval`
19    seconds.
20
21    """
22
23    _ddtrace_profiling_ignore = True
24
25    def __init__(
26        self,
27        interval,  # type: float
28        target,  # type: typing.Callable[[], typing.Any]
29        name=None,  # type: typing.Optional[str]
30        on_shutdown=None,  # type: typing.Optional[typing.Callable[[], typing.Any]]
31    ):
32        # type: (...) -> None
33        """Create a periodic thread.
34
35        :param interval: The interval in seconds to wait between execution of the periodic function.
36        :param target: The periodic function to execute every interval.
37        :param name: The name of the thread.
38        :param on_shutdown: The function to call when the thread shuts down.
39        """
40        super(PeriodicThread, self).__init__(name=name)
41        self._target = target
42        self._on_shutdown = on_shutdown
43        self.interval = interval
44        self.quit = forksafe.Event()
45        self.daemon = True
46
47    def stop(self):
48        """Stop the thread."""
49        # NOTE: make sure the thread is alive before using self.quit:
50        # 1. self.quit is Lock-based
51        # 2. if we're a child trying to stop a Thread,
52        #    the Lock might have been locked in a parent process while forking so that'd block forever
53        if self.is_alive():
54            self.quit.set()
55
56    def run(self):
57        """Run the target function periodically."""
58        while not self.quit.wait(self.interval):
59            # DEV: Some frameworks, like e.g. gevent, seem to resuscitate some
60            # of the threads that were running prior to the fork of the worker
61            # processes. These threads are normally created via the native API
62            # and are exposed to the child process as _DummyThreads. We check
63            # whether the current thread is no longer an instance of the
64            # original thread class to prevent it from running in the child
65            # process while the state copied over from the parent is being
66            # cleaned up. The restarting of the thread is responsibility to the
67            # registered forksafe hooks.
68            if not isinstance(threading.current_thread(), self.__class__):
69                break
70            self._target()
71        if self._on_shutdown is not None:
72            self._on_shutdown()
73
74
75class _GeventPeriodicThread(PeriodicThread):
76    """Periodic thread.
77
78    This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval`
79    seconds.
80
81    """
82
83    # That's the value Python 2 uses in its `threading` module
84    SLEEP_INTERVAL = 0.005
85
86    def __init__(self, interval, target, name=None, on_shutdown=None):
87        """Create a periodic thread.
88
89        :param interval: The interval in seconds to wait between execution of the periodic function.
90        :param target: The periodic function to execute every interval.
91        :param name: The name of the thread.
92        :param on_shutdown: The function to call when the thread shuts down.
93        """
94        super(_GeventPeriodicThread, self).__init__(interval, target, name, on_shutdown)
95        self._tident = None
96        self._periodic_started = False
97        self._periodic_stopped = False
98
99    def _reset_internal_locks(self, is_alive=False):
100        # Called by Python via `threading._after_fork`
101        self._periodic_stopped = True
102
103    @property
104    def ident(self):
105        return self._tident
106
107    def start(self):
108        """Start the thread."""
109        self.quit = False
110        if self._tident is not None:
111            raise RuntimeError("threads can only be started once")
112        self._tident = nogevent.start_new_thread(self.run, tuple())
113        if nogevent.threading_get_native_id:
114            self._native_id = nogevent.threading_get_native_id()
115
116        # Wait for the thread to be started to avoid race conditions
117        while not self._periodic_started:
118            time.sleep(self.SLEEP_INTERVAL)
119
120    def is_alive(self):
121        return not self._periodic_stopped and self._periodic_started
122
123    def join(self, timeout=None):
124        # FIXME: handle the timeout argument
125        while self.is_alive():
126            time.sleep(self.SLEEP_INTERVAL)
127
128    def stop(self):
129        """Stop the thread."""
130        self.quit = True
131
132    def run(self):
133        """Run the target function periodically."""
134        # Do not use the threading._active_limbo_lock here because it's a gevent lock
135        threading._active[self._tident] = self
136
137        self._periodic_started = True
138
139        try:
140            while self.quit is False:
141                self._target()
142                slept = 0
143                while self.quit is False and slept < self.interval:
144                    nogevent.sleep(self.SLEEP_INTERVAL)
145                    slept += self.SLEEP_INTERVAL
146            if self._on_shutdown is not None:
147                self._on_shutdown()
148        except Exception:
149            # Exceptions might happen during interpreter shutdown.
150            # We're mimicking what `threading.Thread` does in daemon mode, we ignore them.
151            # See `threading.Thread._bootstrap` for details.
152            if sys is not None:
153                raise
154        finally:
155            try:
156                self._periodic_stopped = True
157                del threading._active[self._tident]
158            except Exception:
159                # Exceptions might happen during interpreter shutdown.
160                # We're mimicking what `threading.Thread` does in daemon mode, we ignore them.
161                # See `threading.Thread._bootstrap` for details.
162                if sys is not None:
163                    raise
164
165
166def PeriodicRealThreadClass():
167    # type: () -> typing.Type[PeriodicThread]
168    """Return a PeriodicThread class based on the underlying thread implementation (native, gevent, etc).
169
170    The returned class works exactly like ``PeriodicThread``, except that it runs on a *real* OS thread. Be aware that
171    this might be tricky in e.g. the gevent case, where ``Lock`` object must not be shared with the ``MainThread``
172    (otherwise it'd dead lock).
173
174    """
175    if nogevent.is_module_patched("threading"):
176        return _GeventPeriodicThread
177    return PeriodicThread
178
179
180@attr.s(eq=False)
181class PeriodicService(service.Service):
182    """A service that runs periodically."""
183
184    _interval = attr.ib(type=float)
185    _worker = attr.ib(default=None, init=False, repr=False)
186
187    _real_thread = False
188    "Class variable to override if the service should run in a real OS thread."
189
190    @property
191    def interval(self):
192        # type: (...) -> float
193        return self._interval
194
195    @interval.setter
196    def interval(
197        self, value  # type: float
198    ):
199        # type: (...) -> None
200        self._interval = value
201        # Update the interval of the PeriodicThread based on ours
202        if self._worker:
203            self._worker.interval = value
204
205    def _start_service(
206        self,
207        *args,  # type: typing.Any
208        **kwargs  # type: typing.Any
209    ):
210        # type: (...) -> None
211        """Start the periodic service."""
212        periodic_thread_class = PeriodicRealThreadClass() if self._real_thread else PeriodicThread
213        self._worker = periodic_thread_class(
214            self.interval,
215            target=self.periodic,
216            name="%s:%s" % (self.__class__.__module__, self.__class__.__name__),
217            on_shutdown=self.on_shutdown,
218        )
219        self._worker.start()
220
221    def _stop_service(
222        self,
223        *args,  # type: typing.Any
224        **kwargs  # type: typing.Any
225    ):
226        # type: (...) -> None
227        """Stop the periodic collector."""
228        self._worker.stop()
229        super(PeriodicService, self)._stop_service(*args, **kwargs)
230
231    def join(
232        self, timeout=None  # type: typing.Optional[float]
233    ):
234        # type: (...) -> None
235        if self._worker:
236            self._worker.join(timeout)
237
238    @staticmethod
239    def on_shutdown():
240        pass
241
242    def periodic(self):
243        # type: (...) -> None
244        pass
245