1from decimal import Decimal
2
3from twisted.trial.unittest import SynchronousTestCase
4
5from twisted.python.failure import Failure
6from twisted.web.client import ResponseDone
7from twisted.web.iweb import UNKNOWN_LENGTH
8from twisted.web.http_headers import Headers
9
10from treq.response import _Response
11
12
13class FakeResponse(object):
14    def __init__(self, code, headers, body=()):
15        self.code = code
16        self.headers = headers
17        self.previousResponse = None
18        self._body = body
19        self.length = sum(len(c) for c in body)
20
21    def setPreviousResponse(self, response):
22        self.previousResponse = response
23
24    def deliverBody(self, protocol):
25        for chunk in self._body:
26            protocol.dataReceived(chunk)
27        protocol.connectionLost(Failure(ResponseDone()))
28
29
30class ResponseTests(SynchronousTestCase):
31    def test_repr_content_type(self):
32        """
33        When the response has a Content-Type header its value is included in
34        the response.
35        """
36        headers = Headers({'Content-Type': ['text/html']})
37        original = FakeResponse(200, headers, body=[b'<!DOCTYPE html>'])
38        self.assertEqual(
39            "<treq.response._Response 200 'text/html' 15 bytes>",
40            repr(_Response(original, None)),
41        )
42
43    def test_repr_content_type_missing(self):
44        """
45        A request with no Content-Type just displays an empty field.
46        """
47        original = FakeResponse(204, Headers(), body=[b''])
48        self.assertEqual(
49            "<treq.response._Response 204 '' 0 bytes>",
50            repr(_Response(original, None)),
51        )
52
53    def test_repr_content_type_hostile(self):
54        """
55        Garbage in the Content-Type still produces a reasonable representation.
56        """
57        headers = Headers({'Content-Type': [u'\u2e18', ' x/y']})
58        original = FakeResponse(418, headers, body=[b''])
59        self.assertEqual(
60            r"<treq.response._Response 418 '\xe2\xb8\x98,  x/y' 0 bytes>",
61            repr(_Response(original, None)),
62        )
63
64    def test_repr_unknown_length(self):
65        """
66        A HTTP 1.0 or chunked response displays an unknown length.
67        """
68        headers = Headers({'Content-Type': ['text/event-stream']})
69        original = FakeResponse(200, headers)
70        original.length = UNKNOWN_LENGTH
71        self.assertEqual(
72            "<treq.response._Response 200 'text/event-stream' unknown size>",
73            repr(_Response(original, None)),
74        )
75
76    def test_collect(self):
77        original = FakeResponse(200, Headers(), body=[b'foo', b'bar', b'baz'])
78        calls = []
79        _Response(original, None).collect(calls.append)
80        self.assertEqual([b'foo', b'bar', b'baz'], calls)
81
82    def test_content(self):
83        original = FakeResponse(200, Headers(), body=[b'foo', b'bar', b'baz'])
84        self.assertEqual(
85            b'foobarbaz',
86            self.successResultOf(_Response(original, None).content()),
87        )
88
89    def test_json(self):
90        original = FakeResponse(200, Headers(), body=[b'{"foo": ', b'"bar"}'])
91        self.assertEqual(
92            {'foo': 'bar'},
93            self.successResultOf(_Response(original, None).json()),
94        )
95
96    def test_json_customized(self):
97        original = FakeResponse(200, Headers(), body=[b'{"foo": ',
98                                                      b'1.0000000000000001}'])
99        self.assertEqual(
100            self.successResultOf(_Response(original, None).json(
101                parse_float=Decimal)
102            )["foo"],
103            Decimal("1.0000000000000001")
104        )
105
106    def test_text(self):
107        headers = Headers({b'content-type': [b'text/plain;charset=utf-8']})
108        original = FakeResponse(200, headers, body=[b'\xe2\x98', b'\x83'])
109        self.assertEqual(
110            u'\u2603',
111            self.successResultOf(_Response(original, None).text()),
112        )
113
114    def test_history(self):
115        redirect1 = FakeResponse(
116            301,
117            Headers({'location': ['http://example.com/']})
118        )
119
120        redirect2 = FakeResponse(
121            302,
122            Headers({'location': ['https://example.com/']})
123        )
124        redirect2.setPreviousResponse(redirect1)
125
126        final = FakeResponse(200, Headers({}))
127        final.setPreviousResponse(redirect2)
128
129        wrapper = _Response(final, None)
130
131        history = wrapper.history()
132
133        self.assertEqual(wrapper.code, 200)
134        self.assertEqual(history[0].code, 301)
135        self.assertEqual(history[1].code, 302)
136
137    def test_no_history(self):
138        wrapper = _Response(FakeResponse(200, Headers({})), None)
139        self.assertEqual(wrapper.history(), [])
140