1from __future__ import absolute_import
2
3import weakref
4
5from sentry_sdk.hub import Hub
6from sentry_sdk.integrations import Integration, DidNotEnable
7from sentry_sdk.tracing import Span
8from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
9
10
11try:
12    from rq.version import VERSION as RQ_VERSION
13    from rq.timeouts import JobTimeoutException
14    from rq.worker import Worker
15    from rq.queue import Queue
16except ImportError:
17    raise DidNotEnable("RQ not installed")
18
19from sentry_sdk._types import MYPY
20
21if MYPY:
22    from typing import Any
23    from typing import Dict
24    from typing import Callable
25
26    from rq.job import Job
27
28    from sentry_sdk.utils import ExcInfo
29    from sentry_sdk._types import EventProcessor
30
31
32class RqIntegration(Integration):
33    identifier = "rq"
34
35    @staticmethod
36    def setup_once():
37        # type: () -> None
38
39        try:
40            version = tuple(map(int, RQ_VERSION.split(".")[:3]))
41        except (ValueError, TypeError):
42            raise DidNotEnable("Unparseable RQ version: {}".format(RQ_VERSION))
43
44        if version < (0, 6):
45            raise DidNotEnable("RQ 0.6 or newer is required.")
46
47        old_perform_job = Worker.perform_job
48
49        def sentry_patched_perform_job(self, job, *args, **kwargs):
50            # type: (Any, Job, *Queue, **Any) -> bool
51            hub = Hub.current
52            integration = hub.get_integration(RqIntegration)
53
54            if integration is None:
55                return old_perform_job(self, job, *args, **kwargs)
56
57            client = hub.client
58            assert client is not None
59
60            with hub.push_scope() as scope:
61                scope.clear_breadcrumbs()
62                scope.add_event_processor(_make_event_processor(weakref.ref(job)))
63
64                span = Span.continue_from_headers(
65                    job.meta.get("_sentry_trace_headers") or {}
66                )
67                span.op = "rq.task"
68
69                with capture_internal_exceptions():
70                    span.transaction = job.func_name
71
72                with hub.start_span(span):
73                    rv = old_perform_job(self, job, *args, **kwargs)
74
75            if self.is_horse:
76                # We're inside of a forked process and RQ is
77                # about to call `os._exit`. Make sure that our
78                # events get sent out.
79                client.flush()
80
81            return rv
82
83        Worker.perform_job = sentry_patched_perform_job
84
85        old_handle_exception = Worker.handle_exception
86
87        def sentry_patched_handle_exception(self, job, *exc_info, **kwargs):
88            # type: (Worker, Any, *Any, **Any) -> Any
89            _capture_exception(exc_info)  # type: ignore
90            return old_handle_exception(self, job, *exc_info, **kwargs)
91
92        Worker.handle_exception = sentry_patched_handle_exception
93
94        old_enqueue_job = Queue.enqueue_job
95
96        def sentry_patched_enqueue_job(self, job, **kwargs):
97            # type: (Queue, Any, **Any) -> Any
98            hub = Hub.current
99            if hub.get_integration(RqIntegration) is not None:
100                job.meta["_sentry_trace_headers"] = dict(
101                    hub.iter_trace_propagation_headers()
102                )
103
104            return old_enqueue_job(self, job, **kwargs)
105
106        Queue.enqueue_job = sentry_patched_enqueue_job
107
108
109def _make_event_processor(weak_job):
110    # type: (Callable[[], Job]) -> EventProcessor
111    def event_processor(event, hint):
112        # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
113        job = weak_job()
114        if job is not None:
115            with capture_internal_exceptions():
116                extra = event.setdefault("extra", {})
117                extra["rq-job"] = {
118                    "job_id": job.id,
119                    "func": job.func_name,
120                    "args": job.args,
121                    "kwargs": job.kwargs,
122                    "description": job.description,
123                }
124
125        if "exc_info" in hint:
126            with capture_internal_exceptions():
127                if issubclass(hint["exc_info"][0], JobTimeoutException):
128                    event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name]
129
130        return event
131
132    return event_processor
133
134
135def _capture_exception(exc_info, **kwargs):
136    # type: (ExcInfo, **Any) -> None
137    hub = Hub.current
138    if hub.get_integration(RqIntegration) is None:
139        return
140
141    # If an integration is there, a client has to be there.
142    client = hub.client  # type: Any
143
144    event, hint = event_from_exception(
145        exc_info,
146        client_options=client.options,
147        mechanism={"type": "rq", "handled": False},
148    )
149
150    hub.capture_event(event, hint=hint)
151