1from __future__ import absolute_import
2
3import os.path
4import sys
5import threading
6import typing
7
8import attr
9
10from ddtrace.internal import compat
11from ddtrace.internal import nogevent
12from ddtrace.internal.utils import attr as attr_utils
13from ddtrace.internal.utils import formats
14from ddtrace.profiling import collector
15from ddtrace.profiling import event
16from ddtrace.profiling.collector import _task
17from ddtrace.profiling.collector import _threading
18from ddtrace.profiling.collector import _traceback
19from ddtrace.vendor import wrapt
20
21
22@event.event_class
23class LockEventBase(event.StackBasedEvent):
24    """Base Lock event."""
25
26    lock_name = attr.ib(default=None, type=typing.Optional[str])
27    sampling_pct = attr.ib(default=None, type=typing.Optional[int])
28
29
30@event.event_class
31class LockAcquireEvent(LockEventBase):
32    """A lock has been acquired."""
33
34    wait_time_ns = attr.ib(default=None, type=typing.Optional[int])
35
36
37@event.event_class
38class LockReleaseEvent(LockEventBase):
39    """A lock has been released."""
40
41    locked_for_ns = attr.ib(default=None, type=typing.Optional[int])
42
43
44def _current_thread():
45    # type: (...) -> typing.Tuple[int, str]
46    thread_id = nogevent.thread_get_ident()
47    return thread_id, _threading.get_thread_name(thread_id)
48
49
50# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
51# appear in the stack trace and we need to hide it.
52if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
53    WRAPT_C_EXT = False
54else:
55    try:
56        import ddtrace.vendor.wrapt._wrappers as _w  # noqa: F401
57    except ImportError:
58        WRAPT_C_EXT = False
59    else:
60        WRAPT_C_EXT = True
61        del _w
62
63
64class _ProfiledLock(wrapt.ObjectProxy):
65    def __init__(self, wrapped, recorder, tracer, max_nframes, capture_sampler, endpoint_collection_enabled):
66        wrapt.ObjectProxy.__init__(self, wrapped)
67        self._self_recorder = recorder
68        self._self_tracer = tracer
69        self._self_max_nframes = max_nframes
70        self._self_capture_sampler = capture_sampler
71        self._self_endpoint_collection_enabled = endpoint_collection_enabled
72        frame = sys._getframe(2 if WRAPT_C_EXT else 3)
73        code = frame.f_code
74        self._self_name = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
75
76    def acquire(self, *args, **kwargs):
77        if not self._self_capture_sampler.capture():
78            return self.__wrapped__.acquire(*args, **kwargs)
79
80        start = compat.monotonic_ns()
81        try:
82            return self.__wrapped__.acquire(*args, **kwargs)
83        finally:
84            try:
85                end = self._self_acquired_at = compat.monotonic_ns()
86                thread_id, thread_name = _current_thread()
87                task_id, task_name, task_frame = _task.get_task(thread_id)
88
89                if task_frame is None:
90                    frame = sys._getframe(1)
91                else:
92                    frame = task_frame
93
94                frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
95
96                event = LockAcquireEvent(
97                    lock_name=self._self_name,
98                    frames=frames,
99                    nframes=nframes,
100                    thread_id=thread_id,
101                    thread_name=thread_name,
102                    task_id=task_id,
103                    task_name=task_name,
104                    wait_time_ns=end - start,
105                    sampling_pct=self._self_capture_sampler.capture_pct,
106                )
107
108                if self._self_tracer is not None:
109                    event.set_trace_info(self._self_tracer.current_span(), self._self_endpoint_collection_enabled)
110
111                self._self_recorder.push_event(event)
112            except Exception:
113                pass
114
115    def release(
116        self,
117        *args,  # type: typing.Any
118        **kwargs  # type: typing.Any
119    ):
120        # type: (...) -> None
121        try:
122            return self.__wrapped__.release(*args, **kwargs)
123        finally:
124            try:
125                if hasattr(self, "_self_acquired_at"):
126                    try:
127                        end = compat.monotonic_ns()
128                        thread_id, thread_name = _current_thread()
129                        task_id, task_name, task_frame = _task.get_task(thread_id)
130
131                        if task_frame is None:
132                            frame = sys._getframe(1)
133                        else:
134                            frame = task_frame
135
136                        frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
137
138                        event = LockReleaseEvent(  # type: ignore[call-arg]
139                            lock_name=self._self_name,
140                            frames=frames,
141                            nframes=nframes,
142                            thread_id=thread_id,
143                            thread_name=thread_name,
144                            task_id=task_id,
145                            task_name=task_name,
146                            locked_for_ns=end - self._self_acquired_at,
147                            sampling_pct=self._self_capture_sampler.capture_pct,
148                        )
149
150                        if self._self_tracer is not None:
151                            event.set_trace_info(
152                                self._self_tracer.current_span(), self._self_endpoint_collection_enabled
153                            )
154
155                        self._self_recorder.push_event(event)
156                    finally:
157                        del self._self_acquired_at
158            except Exception:
159                pass
160
161    acquire_lock = acquire
162
163
164class FunctionWrapper(wrapt.FunctionWrapper):
165    # Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
166    # method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
167    # builtin function. Override default wrapt behavior here that tries to detect bound method.
168    def __get__(self, instance, owner=None):
169        return self
170
171
172@attr.s
173class LockCollector(collector.CaptureSamplerCollector):
174    """Record lock usage."""
175
176    nframes = attr.ib(factory=attr_utils.from_env("DD_PROFILING_MAX_FRAMES", 64, int))
177    endpoint_collection_enabled = attr.ib(
178        factory=attr_utils.from_env("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", True, formats.asbool)
179    )
180
181    tracer = attr.ib(default=None)
182
183    def _start_service(self):  # type: ignore[override]
184        # type: (...) -> None
185        """Start collecting `threading.Lock` usage."""
186        self.patch()
187        super(LockCollector, self)._start_service()
188
189    def _stop_service(self):  # type: ignore[override]
190        # type: (...) -> None
191        """Stop collecting `threading.Lock` usage."""
192        super(LockCollector, self)._stop_service()
193        self.unpatch()
194
195    def patch(self):
196        # type: (...) -> None
197        """Patch the threading module for tracking lock allocation."""
198        # We only patch the lock from the `threading` module.
199        # Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
200        self.original = threading.Lock
201
202        def _allocate_lock(wrapped, instance, args, kwargs):
203            lock = wrapped(*args, **kwargs)
204            return _ProfiledLock(
205                lock, self.recorder, self.tracer, self.nframes, self._capture_sampler, self.endpoint_collection_enabled
206            )
207
208        threading.Lock = FunctionWrapper(self.original, _allocate_lock)  # type: ignore[misc]
209
210    def unpatch(self):
211        # type: (...) -> None
212        """Unpatch the threading module for tracking lock allocation."""
213        threading.Lock = self.original  # type: ignore[misc]
214