1from urllib3._collections import HTTPHeaderDict, RecentlyUsedContainer as Container
2import pytest
3
4from urllib3.exceptions import InvalidHeader
5from urllib3.packages import six
6
7xrange = six.moves.xrange
8
9
10class TestLRUContainer(object):
11    def test_maxsize(self):
12        d = Container(5)
13
14        for i in xrange(5):
15            d[i] = str(i)
16
17        assert len(d) == 5
18
19        for i in xrange(5):
20            assert d[i] == str(i)
21
22        d[i + 1] = str(i + 1)
23
24        assert len(d) == 5
25        assert 0 not in d
26        assert (i + 1) in d
27
28    def test_expire(self):
29        d = Container(5)
30
31        for i in xrange(5):
32            d[i] = str(i)
33
34        for i in xrange(5):
35            d.get(0)
36
37        # Add one more entry
38        d[5] = "5"
39
40        # Check state
41        assert list(d.keys()) == [2, 3, 4, 0, 5]
42
43    def test_same_key(self):
44        d = Container(5)
45
46        for i in xrange(10):
47            d["foo"] = i
48
49        assert list(d.keys()) == ["foo"]
50        assert len(d) == 1
51
52    def test_access_ordering(self):
53        d = Container(5)
54
55        for i in xrange(10):
56            d[i] = True
57
58        # Keys should be ordered by access time
59        assert list(d.keys()) == [5, 6, 7, 8, 9]
60
61        new_order = [7, 8, 6, 9, 5]
62        for k in new_order:
63            d[k]
64
65        assert list(d.keys()) == new_order
66
67    def test_delete(self):
68        d = Container(5)
69
70        for i in xrange(5):
71            d[i] = True
72
73        del d[0]
74        assert 0 not in d
75
76        d.pop(1)
77        assert 1 not in d
78
79        d.pop(1, None)
80
81    def test_get(self):
82        d = Container(5)
83
84        for i in xrange(5):
85            d[i] = True
86
87        r = d.get(4)
88        assert r is True
89
90        r = d.get(5)
91        assert r is None
92
93        r = d.get(5, 42)
94        assert r == 42
95
96        with pytest.raises(KeyError):
97            d[5]
98
99    def test_disposal(self):
100        evicted_items = []
101
102        def dispose_func(arg):
103            # Save the evicted datum for inspection
104            evicted_items.append(arg)
105
106        d = Container(5, dispose_func=dispose_func)
107        for i in xrange(5):
108            d[i] = i
109        assert list(d.keys()) == list(xrange(5))
110        assert evicted_items == []  # Nothing disposed
111
112        d[5] = 5
113        assert list(d.keys()) == list(xrange(1, 6))
114        assert evicted_items == [0]
115
116        del d[1]
117        assert evicted_items == [0, 1]
118
119        d.clear()
120        assert evicted_items == [0, 1, 2, 3, 4, 5]
121
122    def test_iter(self):
123        d = Container()
124
125        with pytest.raises(NotImplementedError):
126            d.__iter__()
127
128
129class NonMappingHeaderContainer(object):
130    def __init__(self, **kwargs):
131        self._data = {}
132        self._data.update(kwargs)
133
134    def keys(self):
135        return self._data.keys()
136
137    def __getitem__(self, key):
138        return self._data[key]
139
140
141@pytest.fixture()
142def d():
143    header_dict = HTTPHeaderDict(Cookie="foo")
144    header_dict.add("cookie", "bar")
145    return header_dict
146
147
148class TestHTTPHeaderDict(object):
149    def test_create_from_kwargs(self):
150        h = HTTPHeaderDict(ab=1, cd=2, ef=3, gh=4)
151        assert len(h) == 4
152        assert "ab" in h
153
154    def test_create_from_dict(self):
155        h = HTTPHeaderDict(dict(ab=1, cd=2, ef=3, gh=4))
156        assert len(h) == 4
157        assert "ab" in h
158
159    def test_create_from_iterator(self):
160        teststr = "urllib3ontherocks"
161        h = HTTPHeaderDict((c, c * 5) for c in teststr)
162        assert len(h) == len(set(teststr))
163
164    def test_create_from_list(self):
165        headers = [
166            ("ab", "A"),
167            ("cd", "B"),
168            ("cookie", "C"),
169            ("cookie", "D"),
170            ("cookie", "E"),
171        ]
172        h = HTTPHeaderDict(headers)
173        assert len(h) == 3
174        assert "ab" in h
175        clist = h.getlist("cookie")
176        assert len(clist) == 3
177        assert clist[0] == "C"
178        assert clist[-1] == "E"
179
180    def test_create_from_headerdict(self):
181        headers = [
182            ("ab", "A"),
183            ("cd", "B"),
184            ("cookie", "C"),
185            ("cookie", "D"),
186            ("cookie", "E"),
187        ]
188        org = HTTPHeaderDict(headers)
189        h = HTTPHeaderDict(org)
190        assert len(h) == 3
191        assert "ab" in h
192        clist = h.getlist("cookie")
193        assert len(clist) == 3
194        assert clist[0] == "C"
195        assert clist[-1] == "E"
196        assert h is not org
197        assert h == org
198
199    def test_setitem(self, d):
200        d["Cookie"] = "foo"
201        assert d["cookie"] == "foo"
202        d["cookie"] = "with, comma"
203        assert d.getlist("cookie") == ["with, comma"]
204
205    def test_update(self, d):
206        d.update(dict(Cookie="foo"))
207        assert d["cookie"] == "foo"
208        d.update(dict(cookie="with, comma"))
209        assert d.getlist("cookie") == ["with, comma"]
210
211    def test_delitem(self, d):
212        del d["cookie"]
213        assert "cookie" not in d
214        assert "COOKIE" not in d
215
216    def test_add_well_known_multiheader(self, d):
217        d.add("COOKIE", "asdf")
218        assert d.getlist("cookie") == ["foo", "bar", "asdf"]
219        assert d["cookie"] == "foo, bar, asdf"
220
221    def test_add_comma_separated_multiheader(self, d):
222        d.add("bar", "foo")
223        d.add("BAR", "bar")
224        d.add("Bar", "asdf")
225        assert d.getlist("bar") == ["foo", "bar", "asdf"]
226        assert d["bar"] == "foo, bar, asdf"
227
228    def test_extend_from_list(self, d):
229        d.extend([("set-cookie", "100"), ("set-cookie", "200"), ("set-cookie", "300")])
230        assert d["set-cookie"] == "100, 200, 300"
231
232    def test_extend_from_dict(self, d):
233        d.extend(dict(cookie="asdf"), b="100")
234        assert d["cookie"] == "foo, bar, asdf"
235        assert d["b"] == "100"
236        d.add("cookie", "with, comma")
237        assert d.getlist("cookie") == ["foo", "bar", "asdf", "with, comma"]
238
239    def test_extend_from_container(self, d):
240        h = NonMappingHeaderContainer(Cookie="foo", e="foofoo")
241        d.extend(h)
242        assert d["cookie"] == "foo, bar, foo"
243        assert d["e"] == "foofoo"
244        assert len(d) == 2
245
246    def test_extend_from_headerdict(self, d):
247        h = HTTPHeaderDict(Cookie="foo", e="foofoo")
248        d.extend(h)
249        assert d["cookie"] == "foo, bar, foo"
250        assert d["e"] == "foofoo"
251        assert len(d) == 2
252
253    @pytest.mark.parametrize("args", [(1, 2), (1, 2, 3, 4, 5)])
254    def test_extend_with_wrong_number_of_args_is_typeerror(self, d, args):
255        with pytest.raises(TypeError) as err:
256            d.extend(*args)
257        assert "extend() takes at most 1 positional arguments" in err.value.args[0]
258
259    def test_copy(self, d):
260        h = d.copy()
261        assert d is not h
262        assert d == h
263
264    def test_getlist(self, d):
265        assert d.getlist("cookie") == ["foo", "bar"]
266        assert d.getlist("Cookie") == ["foo", "bar"]
267        assert d.getlist("b") == []
268        d.add("b", "asdf")
269        assert d.getlist("b") == ["asdf"]
270
271    def test_getlist_after_copy(self, d):
272        assert d.getlist("cookie") == HTTPHeaderDict(d).getlist("cookie")
273
274    def test_equal(self, d):
275        b = HTTPHeaderDict(cookie="foo, bar")
276        c = NonMappingHeaderContainer(cookie="foo, bar")
277        assert d == b
278        assert d == c
279        assert d != 2
280
281    def test_not_equal(self, d):
282        b = HTTPHeaderDict(cookie="foo, bar")
283        c = NonMappingHeaderContainer(cookie="foo, bar")
284        assert not (d != b)
285        assert not (d != c)
286        assert d != 2
287
288    def test_pop(self, d):
289        key = "Cookie"
290        a = d[key]
291        b = d.pop(key)
292        assert a == b
293        assert key not in d
294        with pytest.raises(KeyError):
295            d.pop(key)
296        dummy = object()
297        assert dummy is d.pop(key, dummy)
298
299    def test_discard(self, d):
300        d.discard("cookie")
301        assert "cookie" not in d
302        d.discard("cookie")
303
304    def test_len(self, d):
305        assert len(d) == 1
306        d.add("cookie", "bla")
307        d.add("asdf", "foo")
308        # len determined by unique fieldnames
309        assert len(d) == 2
310
311    def test_repr(self, d):
312        rep = "HTTPHeaderDict({'Cookie': 'foo, bar'})"
313        assert repr(d) == rep
314
315    def test_items(self, d):
316        items = d.items()
317        assert len(items) == 2
318        assert items[0][0] == "Cookie"
319        assert items[0][1] == "foo"
320        assert items[1][0] == "Cookie"
321        assert items[1][1] == "bar"
322
323    def test_dict_conversion(self, d):
324        # Also tested in connectionpool, needs to preserve case
325        hdict = {
326            "Content-Length": "0",
327            "Content-type": "text/plain",
328            "Server": "TornadoServer/1.2.3",
329        }
330        h = dict(HTTPHeaderDict(hdict).items())
331        assert hdict == h
332        assert hdict == dict(HTTPHeaderDict(hdict))
333
334    def test_string_enforcement(self, d):
335        # This currently throws AttributeError on key.lower(), should
336        # probably be something nicer
337        with pytest.raises(Exception):
338            d[3] = 5
339        with pytest.raises(Exception):
340            d.add(3, 4)
341        with pytest.raises(Exception):
342            del d[3]
343        with pytest.raises(Exception):
344            HTTPHeaderDict({3: 3})
345
346    @pytest.mark.skipif(
347        not six.PY2, reason="python3 has a different internal header implementation"
348    )
349    def test_from_httplib_py2(self):
350        msg = """
351Server: nginx
352Content-Type: text/html; charset=windows-1251
353Connection: keep-alive
354X-Some-Multiline: asdf
355 asdf\t
356\t asdf
357Set-Cookie: bb_lastvisit=1348253375; expires=Sat, 21-Sep-2013 18:49:35 GMT; path=/
358Set-Cookie: bb_lastactivity=0; expires=Sat, 21-Sep-2013 18:49:35 GMT; path=/
359www-authenticate: asdf
360www-authenticate: bla
361
362"""
363        buffer = six.moves.StringIO(msg.lstrip().replace("\n", "\r\n"))
364        msg = six.moves.http_client.HTTPMessage(buffer)
365        d = HTTPHeaderDict.from_httplib(msg)
366        assert d["server"] == "nginx"
367        cookies = d.getlist("set-cookie")
368        assert len(cookies) == 2
369        assert cookies[0].startswith("bb_lastvisit")
370        assert cookies[1].startswith("bb_lastactivity")
371        assert d["x-some-multiline"] == "asdf asdf asdf"
372        assert d["www-authenticate"] == "asdf, bla"
373        assert d.getlist("www-authenticate") == ["asdf", "bla"]
374        with_invalid_multiline = """\tthis-is-not-a-header: but it has a pretend value
375Authorization: Bearer 123
376
377"""
378        buffer = six.moves.StringIO(with_invalid_multiline.replace("\n", "\r\n"))
379        msg = six.moves.http_client.HTTPMessage(buffer)
380        with pytest.raises(InvalidHeader):
381            HTTPHeaderDict.from_httplib(msg)
382