1from __future__ import absolute_import 2 3import weakref 4 5from sentry_sdk.hub import Hub 6from sentry_sdk.integrations import Integration, DidNotEnable 7from sentry_sdk.tracing import Span 8from sentry_sdk.utils import capture_internal_exceptions, event_from_exception 9 10 11try: 12 from rq.version import VERSION as RQ_VERSION 13 from rq.timeouts import JobTimeoutException 14 from rq.worker import Worker 15 from rq.queue import Queue 16except ImportError: 17 raise DidNotEnable("RQ not installed") 18 19from sentry_sdk._types import MYPY 20 21if MYPY: 22 from typing import Any 23 from typing import Dict 24 from typing import Callable 25 26 from rq.job import Job 27 28 from sentry_sdk.utils import ExcInfo 29 from sentry_sdk._types import EventProcessor 30 31 32class RqIntegration(Integration): 33 identifier = "rq" 34 35 @staticmethod 36 def setup_once(): 37 # type: () -> None 38 39 try: 40 version = tuple(map(int, RQ_VERSION.split(".")[:3])) 41 except (ValueError, TypeError): 42 raise DidNotEnable("Unparseable RQ version: {}".format(RQ_VERSION)) 43 44 if version < (0, 6): 45 raise DidNotEnable("RQ 0.6 or newer is required.") 46 47 old_perform_job = Worker.perform_job 48 49 def sentry_patched_perform_job(self, job, *args, **kwargs): 50 # type: (Any, Job, *Queue, **Any) -> bool 51 hub = Hub.current 52 integration = hub.get_integration(RqIntegration) 53 54 if integration is None: 55 return old_perform_job(self, job, *args, **kwargs) 56 57 client = hub.client 58 assert client is not None 59 60 with hub.push_scope() as scope: 61 scope.clear_breadcrumbs() 62 scope.add_event_processor(_make_event_processor(weakref.ref(job))) 63 64 span = Span.continue_from_headers( 65 job.meta.get("_sentry_trace_headers") or {} 66 ) 67 span.op = "rq.task" 68 69 with capture_internal_exceptions(): 70 span.transaction = job.func_name 71 72 with hub.start_span(span): 73 rv = old_perform_job(self, job, *args, **kwargs) 74 75 if self.is_horse: 76 # We're inside of a forked process and RQ is 77 # about to call `os._exit`. Make sure that our 78 # events get sent out. 79 client.flush() 80 81 return rv 82 83 Worker.perform_job = sentry_patched_perform_job 84 85 old_handle_exception = Worker.handle_exception 86 87 def sentry_patched_handle_exception(self, job, *exc_info, **kwargs): 88 # type: (Worker, Any, *Any, **Any) -> Any 89 _capture_exception(exc_info) # type: ignore 90 return old_handle_exception(self, job, *exc_info, **kwargs) 91 92 Worker.handle_exception = sentry_patched_handle_exception 93 94 old_enqueue_job = Queue.enqueue_job 95 96 def sentry_patched_enqueue_job(self, job, **kwargs): 97 # type: (Queue, Any, **Any) -> Any 98 hub = Hub.current 99 if hub.get_integration(RqIntegration) is not None: 100 job.meta["_sentry_trace_headers"] = dict( 101 hub.iter_trace_propagation_headers() 102 ) 103 104 return old_enqueue_job(self, job, **kwargs) 105 106 Queue.enqueue_job = sentry_patched_enqueue_job 107 108 109def _make_event_processor(weak_job): 110 # type: (Callable[[], Job]) -> EventProcessor 111 def event_processor(event, hint): 112 # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any] 113 job = weak_job() 114 if job is not None: 115 with capture_internal_exceptions(): 116 extra = event.setdefault("extra", {}) 117 extra["rq-job"] = { 118 "job_id": job.id, 119 "func": job.func_name, 120 "args": job.args, 121 "kwargs": job.kwargs, 122 "description": job.description, 123 } 124 125 if "exc_info" in hint: 126 with capture_internal_exceptions(): 127 if issubclass(hint["exc_info"][0], JobTimeoutException): 128 event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name] 129 130 return event 131 132 return event_processor 133 134 135def _capture_exception(exc_info, **kwargs): 136 # type: (ExcInfo, **Any) -> None 137 hub = Hub.current 138 if hub.get_integration(RqIntegration) is None: 139 return 140 141 # If an integration is there, a client has to be there. 142 client = hub.client # type: Any 143 144 event, hint = event_from_exception( 145 exc_info, 146 client_options=client.options, 147 mechanism={"type": "rq", "handled": False}, 148 ) 149 150 hub.capture_event(event, hint=hint) 151