1from __future__ import absolute_import 2 3import os.path 4import sys 5import threading 6import typing 7 8import attr 9 10from ddtrace.internal import compat 11from ddtrace.internal import nogevent 12from ddtrace.internal.utils import attr as attr_utils 13from ddtrace.internal.utils import formats 14from ddtrace.profiling import collector 15from ddtrace.profiling import event 16from ddtrace.profiling.collector import _task 17from ddtrace.profiling.collector import _threading 18from ddtrace.profiling.collector import _traceback 19from ddtrace.vendor import wrapt 20 21 22@event.event_class 23class LockEventBase(event.StackBasedEvent): 24 """Base Lock event.""" 25 26 lock_name = attr.ib(default=None, type=typing.Optional[str]) 27 sampling_pct = attr.ib(default=None, type=typing.Optional[int]) 28 29 30@event.event_class 31class LockAcquireEvent(LockEventBase): 32 """A lock has been acquired.""" 33 34 wait_time_ns = attr.ib(default=None, type=typing.Optional[int]) 35 36 37@event.event_class 38class LockReleaseEvent(LockEventBase): 39 """A lock has been released.""" 40 41 locked_for_ns = attr.ib(default=None, type=typing.Optional[int]) 42 43 44def _current_thread(): 45 # type: (...) -> typing.Tuple[int, str] 46 thread_id = nogevent.thread_get_ident() 47 return thread_id, _threading.get_thread_name(thread_id) 48 49 50# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will 51# appear in the stack trace and we need to hide it. 52if os.environ.get("WRAPT_DISABLE_EXTENSIONS"): 53 WRAPT_C_EXT = False 54else: 55 try: 56 import ddtrace.vendor.wrapt._wrappers as _w # noqa: F401 57 except ImportError: 58 WRAPT_C_EXT = False 59 else: 60 WRAPT_C_EXT = True 61 del _w 62 63 64class _ProfiledLock(wrapt.ObjectProxy): 65 def __init__(self, wrapped, recorder, tracer, max_nframes, capture_sampler, endpoint_collection_enabled): 66 wrapt.ObjectProxy.__init__(self, wrapped) 67 self._self_recorder = recorder 68 self._self_tracer = tracer 69 self._self_max_nframes = max_nframes 70 self._self_capture_sampler = capture_sampler 71 self._self_endpoint_collection_enabled = endpoint_collection_enabled 72 frame = sys._getframe(2 if WRAPT_C_EXT else 3) 73 code = frame.f_code 74 self._self_name = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno) 75 76 def acquire(self, *args, **kwargs): 77 if not self._self_capture_sampler.capture(): 78 return self.__wrapped__.acquire(*args, **kwargs) 79 80 start = compat.monotonic_ns() 81 try: 82 return self.__wrapped__.acquire(*args, **kwargs) 83 finally: 84 try: 85 end = self._self_acquired_at = compat.monotonic_ns() 86 thread_id, thread_name = _current_thread() 87 task_id, task_name, task_frame = _task.get_task(thread_id) 88 89 if task_frame is None: 90 frame = sys._getframe(1) 91 else: 92 frame = task_frame 93 94 frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes) 95 96 event = LockAcquireEvent( 97 lock_name=self._self_name, 98 frames=frames, 99 nframes=nframes, 100 thread_id=thread_id, 101 thread_name=thread_name, 102 task_id=task_id, 103 task_name=task_name, 104 wait_time_ns=end - start, 105 sampling_pct=self._self_capture_sampler.capture_pct, 106 ) 107 108 if self._self_tracer is not None: 109 event.set_trace_info(self._self_tracer.current_span(), self._self_endpoint_collection_enabled) 110 111 self._self_recorder.push_event(event) 112 except Exception: 113 pass 114 115 def release( 116 self, 117 *args, # type: typing.Any 118 **kwargs # type: typing.Any 119 ): 120 # type: (...) -> None 121 try: 122 return self.__wrapped__.release(*args, **kwargs) 123 finally: 124 try: 125 if hasattr(self, "_self_acquired_at"): 126 try: 127 end = compat.monotonic_ns() 128 thread_id, thread_name = _current_thread() 129 task_id, task_name, task_frame = _task.get_task(thread_id) 130 131 if task_frame is None: 132 frame = sys._getframe(1) 133 else: 134 frame = task_frame 135 136 frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes) 137 138 event = LockReleaseEvent( # type: ignore[call-arg] 139 lock_name=self._self_name, 140 frames=frames, 141 nframes=nframes, 142 thread_id=thread_id, 143 thread_name=thread_name, 144 task_id=task_id, 145 task_name=task_name, 146 locked_for_ns=end - self._self_acquired_at, 147 sampling_pct=self._self_capture_sampler.capture_pct, 148 ) 149 150 if self._self_tracer is not None: 151 event.set_trace_info( 152 self._self_tracer.current_span(), self._self_endpoint_collection_enabled 153 ) 154 155 self._self_recorder.push_event(event) 156 finally: 157 del self._self_acquired_at 158 except Exception: 159 pass 160 161 acquire_lock = acquire 162 163 164class FunctionWrapper(wrapt.FunctionWrapper): 165 # Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static" 166 # method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a 167 # builtin function. Override default wrapt behavior here that tries to detect bound method. 168 def __get__(self, instance, owner=None): 169 return self 170 171 172@attr.s 173class LockCollector(collector.CaptureSamplerCollector): 174 """Record lock usage.""" 175 176 nframes = attr.ib(factory=attr_utils.from_env("DD_PROFILING_MAX_FRAMES", 64, int)) 177 endpoint_collection_enabled = attr.ib( 178 factory=attr_utils.from_env("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", True, formats.asbool) 179 ) 180 181 tracer = attr.ib(default=None) 182 183 def _start_service(self): # type: ignore[override] 184 # type: (...) -> None 185 """Start collecting `threading.Lock` usage.""" 186 self.patch() 187 super(LockCollector, self)._start_service() 188 189 def _stop_service(self): # type: ignore[override] 190 # type: (...) -> None 191 """Stop collecting `threading.Lock` usage.""" 192 super(LockCollector, self)._stop_service() 193 self.unpatch() 194 195 def patch(self): 196 # type: (...) -> None 197 """Patch the threading module for tracking lock allocation.""" 198 # We only patch the lock from the `threading` module. 199 # Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile. 200 self.original = threading.Lock 201 202 def _allocate_lock(wrapped, instance, args, kwargs): 203 lock = wrapped(*args, **kwargs) 204 return _ProfiledLock( 205 lock, self.recorder, self.tracer, self.nframes, self._capture_sampler, self.endpoint_collection_enabled 206 ) 207 208 threading.Lock = FunctionWrapper(self.original, _allocate_lock) # type: ignore[misc] 209 210 def unpatch(self): 211 # type: (...) -> None 212 """Unpatch the threading module for tracking lock allocation.""" 213 threading.Lock = self.original # type: ignore[misc] 214