1import json
2
3from pyramid.httpexceptions import HTTPException
4import pytest
5import webtest
6
7from ddtrace import config
8from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
9from ddtrace.contrib.pyramid.patch import insert_tween_if_needed
10from ddtrace.ext import http
11from ddtrace.internal import compat
12from tests.utils import TracerTestCase
13from tests.utils import assert_is_measured
14from tests.utils import assert_span_http_status_code
15
16from ...opentracer.utils import init_tracer
17from .app import create_app
18
19
20class PyramidBase(TracerTestCase):
21    """Base Pyramid test application"""
22
23    def setUp(self):
24        super(PyramidBase, self).setUp()
25        self.create_app()
26
27    def create_app(self, settings=None):
28        # get default settings or use what is provided
29        settings = settings or self.get_settings()
30        # always set the dummy tracer as a default tracer
31        settings.update({"datadog_tracer": self.tracer})
32
33        app, renderer = create_app(settings, self.instrument)
34        self.app = webtest.TestApp(app)
35        self.renderer = renderer
36
37    def get_settings(self):
38        return {}
39
40    def override_settings(self, settings):
41        self.create_app(settings)
42
43
44class PyramidTestCase(PyramidBase):
45    """Pyramid TestCase that includes tests for automatic instrumentation"""
46
47    instrument = True
48
49    def get_settings(self):
50        return {
51            "datadog_trace_service": "foobar",
52        }
53
54    def test_200(self, query_string=""):
55        if query_string:
56            fqs = "?" + query_string
57        else:
58            fqs = ""
59        res = self.app.get("/" + fqs, status=200)
60        assert b"idx" in res.body
61
62        spans = self.pop_spans()
63        assert len(spans) == 1
64        s = spans[0]
65        assert_is_measured(s)
66        assert s.service == "foobar"
67        assert s.resource == "GET index"
68        assert s.error == 0
69        assert s.span_type == "web"
70        assert s.meta.get("http.method") == "GET"
71        assert_span_http_status_code(s, 200)
72        assert s.meta.get(http.URL) == "http://localhost/"
73        if config.pyramid.trace_query_string:
74            assert s.meta.get(http.QUERY_STRING) == query_string
75        else:
76            assert http.QUERY_STRING not in s.meta
77        assert s.meta.get("pyramid.route.name") == "index"
78
79    def test_200_query_string(self):
80        return self.test_200("foo=bar")
81
82    def test_200_query_string_trace(self):
83        with self.override_http_config("pyramid", dict(trace_query_string=True)):
84            return self.test_200("foo=bar")
85
86    def test_analytics_global_on_integration_default(self):
87        """
88        When making a request
89            When an integration trace search is not event sample rate is not set and globally trace search is enabled
90                We expect the root span to have the appropriate tag
91        """
92        with self.override_global_config(dict(analytics_enabled=True)):
93            res = self.app.get("/", status=200)
94            assert b"idx" in res.body
95
96            self.assert_structure(
97                dict(name="pyramid.request", metrics={ANALYTICS_SAMPLE_RATE_KEY: 1.0}),
98            )
99
100    def test_analytics_global_on_integration_on(self):
101        """
102        When making a request
103            When an integration trace search is enabled and sample rate is set and globally trace search is enabled
104                We expect the root span to have the appropriate tag
105        """
106        with self.override_global_config(dict(analytics_enabled=True)):
107            self.override_settings(dict(datadog_analytics_enabled=True, datadog_analytics_sample_rate=0.5))
108            res = self.app.get("/", status=200)
109            assert b"idx" in res.body
110
111            self.assert_structure(
112                dict(name="pyramid.request", metrics={ANALYTICS_SAMPLE_RATE_KEY: 0.5}),
113            )
114
115    def test_analytics_global_off_integration_default(self):
116        """
117        When making a request
118            When an integration trace search is not set and sample rate is set and globally trace search is disabled
119                We expect the root span to not include tag
120        """
121        with self.override_global_config(dict(analytics_enabled=False)):
122            res = self.app.get("/", status=200)
123            assert b"idx" in res.body
124
125            root = self.get_root_span()
126            self.assertIsNone(root.get_metric(ANALYTICS_SAMPLE_RATE_KEY))
127
128    def test_analytics_global_off_integration_on(self):
129        """
130        When making a request
131            When an integration trace search is enabled and sample rate is set and globally trace search is disabled
132                We expect the root span to have the appropriate tag
133        """
134        with self.override_global_config(dict(analytics_enabled=False)):
135            self.override_settings(dict(datadog_analytics_enabled=True, datadog_analytics_sample_rate=0.5))
136            res = self.app.get("/", status=200)
137            assert b"idx" in res.body
138
139            self.assert_structure(
140                dict(name="pyramid.request", metrics={ANALYTICS_SAMPLE_RATE_KEY: 0.5}),
141            )
142
143    def test_404(self):
144        self.app.get("/404", status=404)
145
146        spans = self.pop_spans()
147        assert len(spans) == 1
148        s = spans[0]
149        assert_is_measured(s)
150        assert s.service == "foobar"
151        assert s.resource == "404"
152        assert s.error == 0
153        assert s.span_type == "web"
154        assert s.meta.get("http.method") == "GET"
155        assert_span_http_status_code(s, 404)
156        assert s.meta.get(http.URL) == "http://localhost/404"
157
158    def test_302(self):
159        self.app.get("/redirect", status=302)
160
161        spans = self.pop_spans()
162        assert len(spans) == 1
163        s = spans[0]
164        assert_is_measured(s)
165        assert s.service == "foobar"
166        assert s.resource == "GET raise_redirect"
167        assert s.error == 0
168        assert s.span_type == "web"
169        assert s.meta.get("http.method") == "GET"
170        assert_span_http_status_code(s, 302)
171        assert s.meta.get(http.URL) == "http://localhost/redirect"
172
173    def test_204(self):
174        self.app.get("/nocontent", status=204)
175
176        spans = self.pop_spans()
177        assert len(spans) == 1
178        s = spans[0]
179        assert_is_measured(s)
180        assert s.service == "foobar"
181        assert s.resource == "GET raise_no_content"
182        assert s.error == 0
183        assert s.span_type == "web"
184        assert s.meta.get("http.method") == "GET"
185        assert_span_http_status_code(s, 204)
186        assert s.meta.get(http.URL) == "http://localhost/nocontent"
187
188    def test_exception(self):
189        try:
190            self.app.get("/exception", status=500)
191        except ZeroDivisionError:
192            pass
193
194        spans = self.pop_spans()
195        assert len(spans) == 1
196        s = spans[0]
197        assert_is_measured(s)
198        assert s.service == "foobar"
199        assert s.resource == "GET exception"
200        assert s.error == 1
201        assert s.span_type == "web"
202        assert s.meta.get("http.method") == "GET"
203        assert_span_http_status_code(s, 500)
204        assert s.meta.get(http.URL) == "http://localhost/exception"
205        assert s.meta.get("pyramid.route.name") == "exception"
206
207    def test_500(self):
208        self.app.get("/error", status=500)
209
210        spans = self.pop_spans()
211        assert len(spans) == 1
212        s = spans[0]
213        assert_is_measured(s)
214        assert s.service == "foobar"
215        assert s.resource == "GET error"
216        assert s.error == 1
217        assert s.span_type == "web"
218        assert s.meta.get("http.method") == "GET"
219        assert_span_http_status_code(s, 500)
220        assert s.meta.get(http.URL) == "http://localhost/error"
221        assert s.meta.get("pyramid.route.name") == "error"
222        assert type(s.error) == int
223
224    def test_json(self):
225        res = self.app.get("/json", status=200)
226        parsed = json.loads(compat.to_unicode(res.body))
227        assert parsed == {"a": 1}
228
229        spans = self.pop_spans()
230        assert len(spans) == 2
231        spans_by_name = {s.name: s for s in spans}
232        s = spans_by_name["pyramid.request"]
233        assert_is_measured(s)
234        assert s.service == "foobar"
235        assert s.resource == "GET json"
236        assert s.error == 0
237        assert s.span_type == "web"
238        assert s.meta.get("http.method") == "GET"
239        assert_span_http_status_code(s, 200)
240        assert s.meta.get(http.URL) == "http://localhost/json"
241        assert s.meta.get("pyramid.route.name") == "json"
242
243        s = spans_by_name["pyramid.render"]
244        assert s.service == "foobar"
245        assert s.error == 0
246        assert s.span_type == "template"
247
248    def test_renderer(self):
249        self.app.get("/renderer", status=200)
250        assert self.renderer._received["request"] is not None
251
252        self.renderer.assert_(foo="bar")
253        spans = self.pop_spans()
254        assert len(spans) == 2
255        spans_by_name = {s.name: s for s in spans}
256        s = spans_by_name["pyramid.request"]
257        assert_is_measured(s)
258        assert s.service == "foobar"
259        assert s.resource == "GET renderer"
260        assert s.error == 0
261        assert s.span_type == "web"
262        assert s.meta.get("http.method") == "GET"
263        assert_span_http_status_code(s, 200)
264        assert s.meta.get(http.URL) == "http://localhost/renderer"
265        assert s.meta.get("pyramid.route.name") == "renderer"
266
267        s = spans_by_name["pyramid.render"]
268        assert s.service == "foobar"
269        assert s.error == 0
270        assert s.span_type == "template"
271
272    def test_http_exception_response(self):
273        with pytest.raises(HTTPException):
274            self.app.get("/404/raise_exception", status=404)
275
276        spans = self.pop_spans()
277        assert len(spans) == 1
278        s = spans[0]
279        assert_is_measured(s)
280        assert s.service == "foobar"
281        assert s.resource == "404"
282        assert s.error == 1
283        assert s.span_type == "web"
284        assert s.meta.get("http.method") == "GET"
285        assert_span_http_status_code(s, 404)
286        assert s.meta.get(http.URL) == "http://localhost/404/raise_exception"
287
288    def test_insert_tween_if_needed_already_set(self):
289        settings = {"pyramid.tweens": "ddtrace.contrib.pyramid:trace_tween_factory"}
290        insert_tween_if_needed(settings)
291        assert settings["pyramid.tweens"] == "ddtrace.contrib.pyramid:trace_tween_factory"
292
293    def test_insert_tween_if_needed_none(self):
294        settings = {"pyramid.tweens": ""}
295        insert_tween_if_needed(settings)
296        assert settings["pyramid.tweens"] == ""
297
298    def test_insert_tween_if_needed_excview(self):
299        settings = {"pyramid.tweens": "pyramid.tweens.excview_tween_factory"}
300        insert_tween_if_needed(settings)
301        assert (
302            settings["pyramid.tweens"]
303            == "ddtrace.contrib.pyramid:trace_tween_factory\npyramid.tweens.excview_tween_factory"
304        )
305
306    def test_insert_tween_if_needed_excview_and_other(self):
307        settings = {"pyramid.tweens": "a.first.tween\npyramid.tweens.excview_tween_factory\na.last.tween\n"}
308        insert_tween_if_needed(settings)
309        assert (
310            settings["pyramid.tweens"] == "a.first.tween\n"
311            "ddtrace.contrib.pyramid:trace_tween_factory\n"
312            "pyramid.tweens.excview_tween_factory\n"
313            "a.last.tween\n"
314        )
315
316    def test_insert_tween_if_needed_others(self):
317        settings = {"pyramid.tweens": "a.random.tween\nand.another.one"}
318        insert_tween_if_needed(settings)
319        assert (
320            settings["pyramid.tweens"] == "a.random.tween\nand.another.one\nddtrace.contrib.pyramid:trace_tween_factory"
321        )
322
323    def test_include_conflicts(self):
324        # test that includes do not create conflicts
325        self.override_settings({"pyramid.includes": "tests.contrib.pyramid.test_pyramid"})
326        self.app.get("/404", status=404)
327        spans = self.pop_spans()
328        assert len(spans) == 1
329
330    def test_200_ot(self):
331        """OpenTracing version of test_200."""
332        ot_tracer = init_tracer("pyramid_svc", self.tracer)
333
334        with ot_tracer.start_active_span("pyramid_get"):
335            res = self.app.get("/", status=200)
336            assert b"idx" in res.body
337
338        spans = self.pop_spans()
339        assert len(spans) == 2
340
341        ot_span, dd_span = spans
342
343        # confirm the parenting
344        assert ot_span.parent_id is None
345        assert dd_span.parent_id == ot_span.span_id
346
347        assert ot_span.name == "pyramid_get"
348        assert ot_span.service == "pyramid_svc"
349
350        assert_is_measured(dd_span)
351        assert dd_span.service == "foobar"
352        assert dd_span.resource == "GET index"
353        assert dd_span.error == 0
354        assert dd_span.span_type == "web"
355        assert dd_span.meta.get("http.method") == "GET"
356        assert_span_http_status_code(dd_span, 200)
357        assert dd_span.meta.get(http.URL) == "http://localhost/"
358        assert dd_span.meta.get("pyramid.route.name") == "index"
359