1# -*- coding: utf-8 -*-
2
3import io
4import json
5import logging
6import os
7import platform
8import socket
9import sys
10import time
11import warnings
12from test import LONG_TIMEOUT, SHORT_TIMEOUT, onlyPy2
13from threading import Event
14
15import mock
16import pytest
17import six
18
19from dummyserver.server import HAS_IPV6_AND_DNS, NoIPv6Warning
20from dummyserver.testcase import HTTPDummyServerTestCase, SocketDummyServerTestCase
21from urllib3 import HTTPConnectionPool, encode_multipart_formdata
22from urllib3._collections import HTTPHeaderDict
23from urllib3.connection import _get_default_user_agent
24from urllib3.exceptions import (
25    ConnectTimeoutError,
26    DecodeError,
27    EmptyPoolError,
28    MaxRetryError,
29    NewConnectionError,
30    ReadTimeoutError,
31    UnrewindableBodyError,
32)
33from urllib3.packages.six import b, u
34from urllib3.packages.six.moves.urllib.parse import urlencode
35from urllib3.util import SKIP_HEADER, SKIPPABLE_HEADERS
36from urllib3.util.retry import RequestHistory, Retry
37from urllib3.util.timeout import Timeout
38
39from .. import INVALID_SOURCE_ADDRESSES, TARPIT_HOST, VALID_SOURCE_ADDRESSES
40from ..port_helpers import find_unused_port
41
42pytestmark = pytest.mark.flaky
43
44log = logging.getLogger("urllib3.connectionpool")
45log.setLevel(logging.NOTSET)
46log.addHandler(logging.StreamHandler(sys.stdout))
47
48
49def wait_for_socket(ready_event):
50    ready_event.wait()
51    ready_event.clear()
52
53
54class TestConnectionPoolTimeouts(SocketDummyServerTestCase):
55    def test_timeout_float(self):
56        block_event = Event()
57        ready_event = self.start_basic_handler(block_send=block_event, num=2)
58
59        with HTTPConnectionPool(self.host, self.port, retries=False) as pool:
60            wait_for_socket(ready_event)
61            with pytest.raises(ReadTimeoutError):
62                pool.request("GET", "/", timeout=SHORT_TIMEOUT)
63            block_event.set()  # Release block
64
65            # Shouldn't raise this time
66            wait_for_socket(ready_event)
67            block_event.set()  # Pre-release block
68            pool.request("GET", "/", timeout=LONG_TIMEOUT)
69
70    def test_conn_closed(self):
71        block_event = Event()
72        self.start_basic_handler(block_send=block_event, num=1)
73
74        with HTTPConnectionPool(
75            self.host, self.port, timeout=SHORT_TIMEOUT, retries=False
76        ) as pool:
77            conn = pool._get_conn()
78            pool._put_conn(conn)
79            try:
80                with pytest.raises(ReadTimeoutError):
81                    pool.urlopen("GET", "/")
82                if conn.sock:
83                    with pytest.raises(socket.error):
84                        conn.sock.recv(1024)
85            finally:
86                pool._put_conn(conn)
87
88            block_event.set()
89
90    def test_timeout(self):
91        # Requests should time out when expected
92        block_event = Event()
93        ready_event = self.start_basic_handler(block_send=block_event, num=3)
94
95        # Pool-global timeout
96        short_timeout = Timeout(read=SHORT_TIMEOUT)
97        with HTTPConnectionPool(
98            self.host, self.port, timeout=short_timeout, retries=False
99        ) as pool:
100            wait_for_socket(ready_event)
101            block_event.clear()
102            with pytest.raises(ReadTimeoutError):
103                pool.request("GET", "/")
104            block_event.set()  # Release request
105
106        # Request-specific timeouts should raise errors
107        with HTTPConnectionPool(
108            self.host, self.port, timeout=short_timeout, retries=False
109        ) as pool:
110            wait_for_socket(ready_event)
111            now = time.time()
112            with pytest.raises(ReadTimeoutError):
113                pool.request("GET", "/", timeout=LONG_TIMEOUT)
114            delta = time.time() - now
115
116            message = "timeout was pool-level SHORT_TIMEOUT rather than request-level LONG_TIMEOUT"
117            assert delta >= LONG_TIMEOUT, message
118            block_event.set()  # Release request
119
120            # Timeout passed directly to request should raise a request timeout
121            wait_for_socket(ready_event)
122            with pytest.raises(ReadTimeoutError):
123                pool.request("GET", "/", timeout=SHORT_TIMEOUT)
124            block_event.set()  # Release request
125
126    def test_connect_timeout(self):
127        url = "/"
128        host, port = TARPIT_HOST, 80
129        timeout = Timeout(connect=SHORT_TIMEOUT)
130
131        # Pool-global timeout
132        with HTTPConnectionPool(host, port, timeout=timeout) as pool:
133            conn = pool._get_conn()
134            with pytest.raises(ConnectTimeoutError):
135                pool._make_request(conn, "GET", url)
136
137            # Retries
138            retries = Retry(connect=0)
139            with pytest.raises(MaxRetryError):
140                pool.request("GET", url, retries=retries)
141
142        # Request-specific connection timeouts
143        big_timeout = Timeout(read=LONG_TIMEOUT, connect=LONG_TIMEOUT)
144        with HTTPConnectionPool(host, port, timeout=big_timeout, retries=False) as pool:
145            conn = pool._get_conn()
146            with pytest.raises(ConnectTimeoutError):
147                pool._make_request(conn, "GET", url, timeout=timeout)
148
149            pool._put_conn(conn)
150            with pytest.raises(ConnectTimeoutError):
151                pool.request("GET", url, timeout=timeout)
152
153    def test_total_applies_connect(self):
154        host, port = TARPIT_HOST, 80
155
156        timeout = Timeout(total=None, connect=SHORT_TIMEOUT)
157        with HTTPConnectionPool(host, port, timeout=timeout) as pool:
158            conn = pool._get_conn()
159            try:
160                with pytest.raises(ConnectTimeoutError):
161                    pool._make_request(conn, "GET", "/")
162            finally:
163                conn.close()
164
165        timeout = Timeout(connect=3, read=5, total=SHORT_TIMEOUT)
166        with HTTPConnectionPool(host, port, timeout=timeout) as pool:
167            conn = pool._get_conn()
168            try:
169                with pytest.raises(ConnectTimeoutError):
170                    pool._make_request(conn, "GET", "/")
171            finally:
172                conn.close()
173
174    def test_total_timeout(self):
175        block_event = Event()
176        ready_event = self.start_basic_handler(block_send=block_event, num=2)
177
178        wait_for_socket(ready_event)
179        # This will get the socket to raise an EAGAIN on the read
180        timeout = Timeout(connect=3, read=SHORT_TIMEOUT)
181        with HTTPConnectionPool(
182            self.host, self.port, timeout=timeout, retries=False
183        ) as pool:
184            with pytest.raises(ReadTimeoutError):
185                pool.request("GET", "/")
186
187            block_event.set()
188            wait_for_socket(ready_event)
189            block_event.clear()
190
191        # The connect should succeed and this should hit the read timeout
192        timeout = Timeout(connect=3, read=5, total=SHORT_TIMEOUT)
193        with HTTPConnectionPool(
194            self.host, self.port, timeout=timeout, retries=False
195        ) as pool:
196            with pytest.raises(ReadTimeoutError):
197                pool.request("GET", "/")
198
199    def test_create_connection_timeout(self):
200        self.start_basic_handler(block_send=Event(), num=0)  # needed for self.port
201
202        timeout = Timeout(connect=SHORT_TIMEOUT, total=LONG_TIMEOUT)
203        with HTTPConnectionPool(
204            TARPIT_HOST, self.port, timeout=timeout, retries=False
205        ) as pool:
206            conn = pool._new_conn()
207            with pytest.raises(ConnectTimeoutError):
208                conn.connect()
209
210
211class TestConnectionPool(HTTPDummyServerTestCase):
212    def test_get(self):
213        with HTTPConnectionPool(self.host, self.port) as pool:
214            r = pool.request("GET", "/specific_method", fields={"method": "GET"})
215            assert r.status == 200, r.data
216
217    def test_post_url(self):
218        with HTTPConnectionPool(self.host, self.port) as pool:
219            r = pool.request("POST", "/specific_method", fields={"method": "POST"})
220            assert r.status == 200, r.data
221
222    def test_urlopen_put(self):
223        with HTTPConnectionPool(self.host, self.port) as pool:
224            r = pool.urlopen("PUT", "/specific_method?method=PUT")
225            assert r.status == 200, r.data
226
227    def test_wrong_specific_method(self):
228        # To make sure the dummy server is actually returning failed responses
229        with HTTPConnectionPool(self.host, self.port) as pool:
230            r = pool.request("GET", "/specific_method", fields={"method": "POST"})
231            assert r.status == 400, r.data
232
233        with HTTPConnectionPool(self.host, self.port) as pool:
234            r = pool.request("POST", "/specific_method", fields={"method": "GET"})
235            assert r.status == 400, r.data
236
237    def test_upload(self):
238        data = "I'm in ur multipart form-data, hazing a cheezburgr"
239        fields = {
240            "upload_param": "filefield",
241            "upload_filename": "lolcat.txt",
242            "upload_size": len(data),
243            "filefield": ("lolcat.txt", data),
244        }
245
246        with HTTPConnectionPool(self.host, self.port) as pool:
247            r = pool.request("POST", "/upload", fields=fields)
248            assert r.status == 200, r.data
249
250    def test_one_name_multiple_values(self):
251        fields = [("foo", "a"), ("foo", "b")]
252
253        with HTTPConnectionPool(self.host, self.port) as pool:
254            # urlencode
255            r = pool.request("GET", "/echo", fields=fields)
256            assert r.data == b"foo=a&foo=b"
257
258            # multipart
259            r = pool.request("POST", "/echo", fields=fields)
260            assert r.data.count(b'name="foo"') == 2
261
262    def test_request_method_body(self):
263        with HTTPConnectionPool(self.host, self.port) as pool:
264            body = b"hi"
265            r = pool.request("POST", "/echo", body=body)
266            assert r.data == body
267
268            fields = [("hi", "hello")]
269            with pytest.raises(TypeError):
270                pool.request("POST", "/echo", body=body, fields=fields)
271
272    def test_unicode_upload(self):
273        fieldname = u("myfile")
274        filename = u("\xe2\x99\xa5.txt")
275        data = u("\xe2\x99\xa5").encode("utf8")
276        size = len(data)
277
278        fields = {
279            u("upload_param"): fieldname,
280            u("upload_filename"): filename,
281            u("upload_size"): size,
282            fieldname: (filename, data),
283        }
284        with HTTPConnectionPool(self.host, self.port) as pool:
285            r = pool.request("POST", "/upload", fields=fields)
286            assert r.status == 200, r.data
287
288    def test_nagle(self):
289        """Test that connections have TCP_NODELAY turned on"""
290        # This test needs to be here in order to be run. socket.create_connection actually tries
291        # to connect to the host provided so we need a dummyserver to be running.
292        with HTTPConnectionPool(self.host, self.port) as pool:
293            conn = pool._get_conn()
294            try:
295                pool._make_request(conn, "GET", "/")
296                tcp_nodelay_setting = conn.sock.getsockopt(
297                    socket.IPPROTO_TCP, socket.TCP_NODELAY
298                )
299                assert tcp_nodelay_setting
300            finally:
301                conn.close()
302
303    def test_socket_options(self):
304        """Test that connections accept socket options."""
305        # This test needs to be here in order to be run. socket.create_connection actually tries to
306        # connect to the host provided so we need a dummyserver to be running.
307        with HTTPConnectionPool(
308            self.host,
309            self.port,
310            socket_options=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)],
311        ) as pool:
312            s = pool._new_conn()._new_conn()  # Get the socket
313            try:
314                using_keepalive = (
315                    s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
316                )
317                assert using_keepalive
318            finally:
319                s.close()
320
321    def test_disable_default_socket_options(self):
322        """Test that passing None disables all socket options."""
323        # This test needs to be here in order to be run. socket.create_connection actually tries
324        # to connect to the host provided so we need a dummyserver to be running.
325        with HTTPConnectionPool(self.host, self.port, socket_options=None) as pool:
326            s = pool._new_conn()._new_conn()
327            try:
328                using_nagle = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) == 0
329                assert using_nagle
330            finally:
331                s.close()
332
333    def test_defaults_are_applied(self):
334        """Test that modifying the default socket options works."""
335        # This test needs to be here in order to be run. socket.create_connection actually tries
336        # to connect to the host provided so we need a dummyserver to be running.
337        with HTTPConnectionPool(self.host, self.port) as pool:
338            # Get the HTTPConnection instance
339            conn = pool._new_conn()
340            try:
341                # Update the default socket options
342                conn.default_socket_options += [
343                    (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
344                ]
345                s = conn._new_conn()
346                nagle_disabled = (
347                    s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) > 0
348                )
349                using_keepalive = (
350                    s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
351                )
352                assert nagle_disabled
353                assert using_keepalive
354            finally:
355                conn.close()
356                s.close()
357
358    def test_connection_error_retries(self):
359        """ECONNREFUSED error should raise a connection error, with retries"""
360        port = find_unused_port()
361        with HTTPConnectionPool(self.host, port) as pool:
362            with pytest.raises(MaxRetryError) as e:
363                pool.request("GET", "/", retries=Retry(connect=3))
364            assert type(e.value.reason) == NewConnectionError
365
366    def test_timeout_success(self):
367        timeout = Timeout(connect=3, read=5, total=None)
368        with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool:
369            pool.request("GET", "/")
370            # This should not raise a "Timeout already started" error
371            pool.request("GET", "/")
372
373        with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool:
374            # This should also not raise a "Timeout already started" error
375            pool.request("GET", "/")
376
377        timeout = Timeout(total=None)
378        with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool:
379            pool.request("GET", "/")
380
381    def test_tunnel(self):
382        # note the actual httplib.py has no tests for this functionality
383        timeout = Timeout(total=None)
384        with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool:
385            conn = pool._get_conn()
386            try:
387                conn.set_tunnel(self.host, self.port)
388                conn._tunnel = mock.Mock(return_value=None)
389                pool._make_request(conn, "GET", "/")
390                conn._tunnel.assert_called_once_with()
391            finally:
392                conn.close()
393
394        # test that it's not called when tunnel is not set
395        timeout = Timeout(total=None)
396        with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool:
397            conn = pool._get_conn()
398            try:
399                conn._tunnel = mock.Mock(return_value=None)
400                pool._make_request(conn, "GET", "/")
401                assert not conn._tunnel.called
402            finally:
403                conn.close()
404
405    def test_redirect(self):
406        with HTTPConnectionPool(self.host, self.port) as pool:
407            r = pool.request("GET", "/redirect", fields={"target": "/"}, redirect=False)
408            assert r.status == 303
409
410            r = pool.request("GET", "/redirect", fields={"target": "/"})
411            assert r.status == 200
412            assert r.data == b"Dummy server!"
413
414    def test_bad_connect(self):
415        with HTTPConnectionPool("badhost.invalid", self.port) as pool:
416            with pytest.raises(MaxRetryError) as e:
417                pool.request("GET", "/", retries=5)
418            assert type(e.value.reason) == NewConnectionError
419
420    def test_keepalive(self):
421        with HTTPConnectionPool(self.host, self.port, block=True, maxsize=1) as pool:
422            r = pool.request("GET", "/keepalive?close=0")
423            r = pool.request("GET", "/keepalive?close=0")
424
425            assert r.status == 200
426            assert pool.num_connections == 1
427            assert pool.num_requests == 2
428
429    def test_keepalive_close(self):
430        with HTTPConnectionPool(
431            self.host, self.port, block=True, maxsize=1, timeout=2
432        ) as pool:
433            r = pool.request(
434                "GET", "/keepalive?close=1", retries=0, headers={"Connection": "close"}
435            )
436
437            assert pool.num_connections == 1
438
439            # The dummyserver will have responded with Connection:close,
440            # and httplib will properly cleanup the socket.
441
442            # We grab the HTTPConnection object straight from the Queue,
443            # because _get_conn() is where the check & reset occurs
444            # pylint: disable-msg=W0212
445            conn = pool.pool.get()
446            assert conn.sock is None
447            pool._put_conn(conn)
448
449            # Now with keep-alive
450            r = pool.request(
451                "GET",
452                "/keepalive?close=0",
453                retries=0,
454                headers={"Connection": "keep-alive"},
455            )
456
457            # The dummyserver responded with Connection:keep-alive, the connection
458            # persists.
459            conn = pool.pool.get()
460            assert conn.sock is not None
461            pool._put_conn(conn)
462
463            # Another request asking the server to close the connection. This one
464            # should get cleaned up for the next request.
465            r = pool.request(
466                "GET", "/keepalive?close=1", retries=0, headers={"Connection": "close"}
467            )
468
469            assert r.status == 200
470
471            conn = pool.pool.get()
472            assert conn.sock is None
473            pool._put_conn(conn)
474
475            # Next request
476            r = pool.request("GET", "/keepalive?close=0")
477
478    def test_post_with_urlencode(self):
479        with HTTPConnectionPool(self.host, self.port) as pool:
480            data = {"banana": "hammock", "lol": "cat"}
481            r = pool.request("POST", "/echo", fields=data, encode_multipart=False)
482            assert r.data.decode("utf-8") == urlencode(data)
483
484    def test_post_with_multipart(self):
485        with HTTPConnectionPool(self.host, self.port) as pool:
486            data = {"banana": "hammock", "lol": "cat"}
487            r = pool.request("POST", "/echo", fields=data, encode_multipart=True)
488            body = r.data.split(b"\r\n")
489
490            encoded_data = encode_multipart_formdata(data)[0]
491            expected_body = encoded_data.split(b"\r\n")
492
493            # TODO: Get rid of extra parsing stuff when you can specify
494            # a custom boundary to encode_multipart_formdata
495            """
496            We need to loop the return lines because a timestamp is attached
497            from within encode_multipart_formdata. When the server echos back
498            the data, it has the timestamp from when the data was encoded, which
499            is not equivalent to when we run encode_multipart_formdata on
500            the data again.
501            """
502            for i, line in enumerate(body):
503                if line.startswith(b"--"):
504                    continue
505
506                assert body[i] == expected_body[i]
507
508    def test_post_with_multipart__iter__(self):
509        with HTTPConnectionPool(self.host, self.port) as pool:
510            data = {"hello": "world"}
511            r = pool.request(
512                "POST",
513                "/echo",
514                fields=data,
515                preload_content=False,
516                multipart_boundary="boundary",
517                encode_multipart=True,
518            )
519
520            chunks = [chunk for chunk in r]
521            assert chunks == [
522                b"--boundary\r\n",
523                b'Content-Disposition: form-data; name="hello"\r\n',
524                b"\r\n",
525                b"world\r\n",
526                b"--boundary--\r\n",
527            ]
528
529    def test_check_gzip(self):
530        with HTTPConnectionPool(self.host, self.port) as pool:
531            r = pool.request(
532                "GET", "/encodingrequest", headers={"accept-encoding": "gzip"}
533            )
534            assert r.headers.get("content-encoding") == "gzip"
535            assert r.data == b"hello, world!"
536
537    def test_check_deflate(self):
538        with HTTPConnectionPool(self.host, self.port) as pool:
539            r = pool.request(
540                "GET", "/encodingrequest", headers={"accept-encoding": "deflate"}
541            )
542            assert r.headers.get("content-encoding") == "deflate"
543            assert r.data == b"hello, world!"
544
545    def test_bad_decode(self):
546        with HTTPConnectionPool(self.host, self.port) as pool:
547            with pytest.raises(DecodeError):
548                pool.request(
549                    "GET",
550                    "/encodingrequest",
551                    headers={"accept-encoding": "garbage-deflate"},
552                )
553
554            with pytest.raises(DecodeError):
555                pool.request(
556                    "GET",
557                    "/encodingrequest",
558                    headers={"accept-encoding": "garbage-gzip"},
559                )
560
561    def test_connection_count(self):
562        with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool:
563            pool.request("GET", "/")
564            pool.request("GET", "/")
565            pool.request("GET", "/")
566
567            assert pool.num_connections == 1
568            assert pool.num_requests == 3
569
570    def test_connection_count_bigpool(self):
571        with HTTPConnectionPool(self.host, self.port, maxsize=16) as http_pool:
572            http_pool.request("GET", "/")
573            http_pool.request("GET", "/")
574            http_pool.request("GET", "/")
575
576            assert http_pool.num_connections == 1
577            assert http_pool.num_requests == 3
578
579    def test_partial_response(self):
580        with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool:
581            req_data = {"lol": "cat"}
582            resp_data = urlencode(req_data).encode("utf-8")
583
584            r = pool.request("GET", "/echo", fields=req_data, preload_content=False)
585
586            assert r.read(5) == resp_data[:5]
587            assert r.read() == resp_data[5:]
588
589    def test_lazy_load_twice(self):
590        # This test is sad and confusing. Need to figure out what's
591        # going on with partial reads and socket reuse.
592
593        with HTTPConnectionPool(
594            self.host, self.port, block=True, maxsize=1, timeout=2
595        ) as pool:
596            payload_size = 1024 * 2
597            first_chunk = 512
598
599            boundary = "foo"
600
601            req_data = {"count": "a" * payload_size}
602            resp_data = encode_multipart_formdata(req_data, boundary=boundary)[0]
603
604            req2_data = {"count": "b" * payload_size}
605            resp2_data = encode_multipart_formdata(req2_data, boundary=boundary)[0]
606
607            r1 = pool.request(
608                "POST",
609                "/echo",
610                fields=req_data,
611                multipart_boundary=boundary,
612                preload_content=False,
613            )
614
615            assert r1.read(first_chunk) == resp_data[:first_chunk]
616
617            try:
618                r2 = pool.request(
619                    "POST",
620                    "/echo",
621                    fields=req2_data,
622                    multipart_boundary=boundary,
623                    preload_content=False,
624                    pool_timeout=0.001,
625                )
626
627                # This branch should generally bail here, but maybe someday it will
628                # work? Perhaps by some sort of magic. Consider it a TODO.
629
630                assert r2.read(first_chunk) == resp2_data[:first_chunk]
631
632                assert r1.read() == resp_data[first_chunk:]
633                assert r2.read() == resp2_data[first_chunk:]
634                assert pool.num_requests == 2
635
636            except EmptyPoolError:
637                assert r1.read() == resp_data[first_chunk:]
638                assert pool.num_requests == 1
639
640            assert pool.num_connections == 1
641
642    def test_for_double_release(self):
643        MAXSIZE = 5
644
645        # Check default state
646        with HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE) as pool:
647            assert pool.num_connections == 0
648            assert pool.pool.qsize() == MAXSIZE
649
650            # Make an empty slot for testing
651            pool.pool.get()
652            assert pool.pool.qsize() == MAXSIZE - 1
653
654            # Check state after simple request
655            pool.urlopen("GET", "/")
656            assert pool.pool.qsize() == MAXSIZE - 1
657
658            # Check state without release
659            pool.urlopen("GET", "/", preload_content=False)
660            assert pool.pool.qsize() == MAXSIZE - 2
661
662            pool.urlopen("GET", "/")
663            assert pool.pool.qsize() == MAXSIZE - 2
664
665            # Check state after read
666            pool.urlopen("GET", "/").data
667            assert pool.pool.qsize() == MAXSIZE - 2
668
669            pool.urlopen("GET", "/")
670            assert pool.pool.qsize() == MAXSIZE - 2
671
672    def test_release_conn_parameter(self):
673        MAXSIZE = 5
674        with HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE) as pool:
675            assert pool.pool.qsize() == MAXSIZE
676
677            # Make request without releasing connection
678            pool.request("GET", "/", release_conn=False, preload_content=False)
679            assert pool.pool.qsize() == MAXSIZE - 1
680
681    def test_dns_error(self):
682        with HTTPConnectionPool(
683            "thishostdoesnotexist.invalid", self.port, timeout=0.001
684        ) as pool:
685            with pytest.raises(MaxRetryError):
686                pool.request("GET", "/test", retries=2)
687
688    @pytest.mark.parametrize("char", [" ", "\r", "\n", "\x00"])
689    def test_invalid_method_not_allowed(self, char):
690        with pytest.raises(ValueError):
691            with HTTPConnectionPool(self.host, self.port) as pool:
692                pool.request("GET" + char, "/")
693
694    def test_percent_encode_invalid_target_chars(self):
695        with HTTPConnectionPool(self.host, self.port) as pool:
696            r = pool.request("GET", "/echo_params?q=\r&k=\n \n")
697            assert r.data == b"[('k', '\\n \\n'), ('q', '\\r')]"
698
699    @pytest.mark.skipif(
700        six.PY2
701        and platform.system() == "Darwin"
702        and os.environ.get("GITHUB_ACTIONS") == "true",
703        reason="fails on macOS 2.7 in GitHub Actions for an unknown reason",
704    )
705    def test_source_address(self):
706        for addr, is_ipv6 in VALID_SOURCE_ADDRESSES:
707            if is_ipv6 and not HAS_IPV6_AND_DNS:
708                warnings.warn("No IPv6 support: skipping.", NoIPv6Warning)
709                continue
710            with HTTPConnectionPool(
711                self.host, self.port, source_address=addr, retries=False
712            ) as pool:
713                r = pool.request("GET", "/source_address")
714                assert r.data == b(addr[0])
715
716    @pytest.mark.skipif(
717        six.PY2
718        and platform.system() == "Darwin"
719        and os.environ.get("GITHUB_ACTIONS") == "true",
720        reason="fails on macOS 2.7 in GitHub Actions for an unknown reason",
721    )
722    def test_source_address_error(self):
723        for addr in INVALID_SOURCE_ADDRESSES:
724            with HTTPConnectionPool(
725                self.host, self.port, source_address=addr, retries=False
726            ) as pool:
727                with pytest.raises(NewConnectionError):
728                    pool.request("GET", "/source_address?{0}".format(addr))
729
730    def test_stream_keepalive(self):
731        x = 2
732
733        with HTTPConnectionPool(self.host, self.port) as pool:
734            for _ in range(x):
735                response = pool.request(
736                    "GET",
737                    "/chunked",
738                    headers={"Connection": "keep-alive"},
739                    preload_content=False,
740                    retries=False,
741                )
742                for chunk in response.stream():
743                    assert chunk == b"123"
744
745            assert pool.num_connections == 1
746            assert pool.num_requests == x
747
748    def test_read_chunked_short_circuit(self):
749        with HTTPConnectionPool(self.host, self.port) as pool:
750            response = pool.request("GET", "/chunked", preload_content=False)
751            response.read()
752            with pytest.raises(StopIteration):
753                next(response.read_chunked())
754
755    def test_read_chunked_on_closed_response(self):
756        with HTTPConnectionPool(self.host, self.port) as pool:
757            response = pool.request("GET", "/chunked", preload_content=False)
758            response.close()
759            with pytest.raises(StopIteration):
760                next(response.read_chunked())
761
762    def test_chunked_gzip(self):
763        with HTTPConnectionPool(self.host, self.port) as pool:
764            response = pool.request(
765                "GET", "/chunked_gzip", preload_content=False, decode_content=True
766            )
767
768            assert b"123" * 4 == response.read()
769
770    def test_cleanup_on_connection_error(self):
771        """
772        Test that connections are recycled to the pool on
773        connection errors where no http response is received.
774        """
775        poolsize = 3
776        with HTTPConnectionPool(
777            self.host, self.port, maxsize=poolsize, block=True
778        ) as http:
779            assert http.pool.qsize() == poolsize
780
781            # force a connection error by supplying a non-existent
782            # url. We won't get a response for this  and so the
783            # conn won't be implicitly returned to the pool.
784            with pytest.raises(MaxRetryError):
785                http.request(
786                    "GET",
787                    "/redirect",
788                    fields={"target": "/"},
789                    release_conn=False,
790                    retries=0,
791                )
792
793            r = http.request(
794                "GET",
795                "/redirect",
796                fields={"target": "/"},
797                release_conn=False,
798                retries=1,
799            )
800            r.release_conn()
801
802            # the pool should still contain poolsize elements
803            assert http.pool.qsize() == http.pool.maxsize
804
805    def test_mixed_case_hostname(self):
806        with HTTPConnectionPool("LoCaLhOsT", self.port) as pool:
807            response = pool.request("GET", "http://LoCaLhOsT:%d/" % self.port)
808            assert response.status == 200
809
810    def test_preserves_path_dot_segments(self):
811        """ConnectionPool preserves dot segments in the URI"""
812        with HTTPConnectionPool(self.host, self.port) as pool:
813            response = pool.request("GET", "/echo_uri/seg0/../seg2")
814            assert response.data == b"/echo_uri/seg0/../seg2"
815
816    def test_default_user_agent_header(self):
817        """ConnectionPool has a default user agent"""
818        default_ua = _get_default_user_agent()
819        custom_ua = "I'm not a web scraper, what are you talking about?"
820        custom_ua2 = "Yet Another User Agent"
821        with HTTPConnectionPool(self.host, self.port) as pool:
822            # Use default user agent if no user agent was specified.
823            r = pool.request("GET", "/headers")
824            request_headers = json.loads(r.data.decode("utf8"))
825            assert request_headers.get("User-Agent") == _get_default_user_agent()
826
827            # Prefer the request user agent over the default.
828            headers = {"UsEr-AGENt": custom_ua}
829            r = pool.request("GET", "/headers", headers=headers)
830            request_headers = json.loads(r.data.decode("utf8"))
831            assert request_headers.get("User-Agent") == custom_ua
832
833            # Do not modify pool headers when using the default user agent.
834            pool_headers = {"foo": "bar"}
835            pool.headers = pool_headers
836            r = pool.request("GET", "/headers")
837            request_headers = json.loads(r.data.decode("utf8"))
838            assert request_headers.get("User-Agent") == default_ua
839            assert "User-Agent" not in pool_headers
840
841            pool.headers.update({"User-Agent": custom_ua2})
842            r = pool.request("GET", "/headers")
843            request_headers = json.loads(r.data.decode("utf8"))
844            assert request_headers.get("User-Agent") == custom_ua2
845
846    @pytest.mark.parametrize(
847        "headers",
848        [
849            None,
850            {},
851            {"User-Agent": "key"},
852            {"user-agent": "key"},
853            {b"uSeR-AgEnT": b"key"},
854            {b"user-agent": "key"},
855        ],
856    )
857    @pytest.mark.parametrize("chunked", [True, False])
858    def test_user_agent_header_not_sent_twice(self, headers, chunked):
859        with HTTPConnectionPool(self.host, self.port) as pool:
860            r = pool.request("GET", "/headers", headers=headers, chunked=chunked)
861            request_headers = json.loads(r.data.decode("utf8"))
862
863            if not headers:
864                assert request_headers["User-Agent"].startswith("python-urllib3/")
865                assert "key" not in request_headers["User-Agent"]
866            else:
867                assert request_headers["User-Agent"] == "key"
868
869    def test_no_user_agent_header(self):
870        """ConnectionPool can suppress sending a user agent header"""
871        custom_ua = "I'm not a web scraper, what are you talking about?"
872        with HTTPConnectionPool(self.host, self.port) as pool:
873            # Suppress user agent in the request headers.
874            no_ua_headers = {"User-Agent": SKIP_HEADER}
875            r = pool.request("GET", "/headers", headers=no_ua_headers)
876            request_headers = json.loads(r.data.decode("utf8"))
877            assert "User-Agent" not in request_headers
878            assert no_ua_headers["User-Agent"] == SKIP_HEADER
879
880            # Suppress user agent in the pool headers.
881            pool.headers = no_ua_headers
882            r = pool.request("GET", "/headers")
883            request_headers = json.loads(r.data.decode("utf8"))
884            assert "User-Agent" not in request_headers
885            assert no_ua_headers["User-Agent"] == SKIP_HEADER
886
887            # Request headers override pool headers.
888            pool_headers = {"User-Agent": custom_ua}
889            pool.headers = pool_headers
890            r = pool.request("GET", "/headers", headers=no_ua_headers)
891            request_headers = json.loads(r.data.decode("utf8"))
892            assert "User-Agent" not in request_headers
893            assert no_ua_headers["User-Agent"] == SKIP_HEADER
894            assert pool_headers.get("User-Agent") == custom_ua
895
896    @pytest.mark.parametrize(
897        "accept_encoding",
898        [
899            "Accept-Encoding",
900            "accept-encoding",
901            b"Accept-Encoding",
902            b"accept-encoding",
903            None,
904        ],
905    )
906    @pytest.mark.parametrize("host", ["Host", "host", b"Host", b"host", None])
907    @pytest.mark.parametrize(
908        "user_agent", ["User-Agent", "user-agent", b"User-Agent", b"user-agent", None]
909    )
910    @pytest.mark.parametrize("chunked", [True, False])
911    def test_skip_header(self, accept_encoding, host, user_agent, chunked):
912        headers = {}
913
914        if accept_encoding is not None:
915            headers[accept_encoding] = SKIP_HEADER
916        if host is not None:
917            headers[host] = SKIP_HEADER
918        if user_agent is not None:
919            headers[user_agent] = SKIP_HEADER
920
921        with HTTPConnectionPool(self.host, self.port) as pool:
922            r = pool.request("GET", "/headers", headers=headers, chunked=chunked)
923        request_headers = json.loads(r.data.decode("utf8"))
924
925        if accept_encoding is None:
926            assert "Accept-Encoding" in request_headers
927        else:
928            assert accept_encoding not in request_headers
929        if host is None:
930            assert "Host" in request_headers
931        else:
932            assert host not in request_headers
933        if user_agent is None:
934            assert "User-Agent" in request_headers
935        else:
936            assert user_agent not in request_headers
937
938    @pytest.mark.parametrize("header", ["Content-Length", "content-length"])
939    @pytest.mark.parametrize("chunked", [True, False])
940    def test_skip_header_non_supported(self, header, chunked):
941        with HTTPConnectionPool(self.host, self.port) as pool:
942            with pytest.raises(ValueError) as e:
943                pool.request(
944                    "GET", "/headers", headers={header: SKIP_HEADER}, chunked=chunked
945                )
946            assert (
947                str(e.value)
948                == "urllib3.util.SKIP_HEADER only supports 'Accept-Encoding', 'Host', 'User-Agent'"
949            )
950
951            # Ensure that the error message stays up to date with 'SKIP_HEADER_SUPPORTED_HEADERS'
952            assert all(
953                ("'" + header.title() + "'") in str(e.value)
954                for header in SKIPPABLE_HEADERS
955            )
956
957    @pytest.mark.parametrize("chunked", [True, False])
958    @pytest.mark.parametrize("pool_request", [True, False])
959    @pytest.mark.parametrize("header_type", [dict, HTTPHeaderDict])
960    def test_headers_not_modified_by_request(self, chunked, pool_request, header_type):
961        # Test that the .request*() methods of ConnectionPool and HTTPConnection
962        # don't modify the given 'headers' structure, instead they should
963        # make their own internal copies at request time.
964        headers = header_type()
965        headers["key"] = "val"
966
967        with HTTPConnectionPool(self.host, self.port) as pool:
968            pool.headers = headers
969            if pool_request:
970                pool.request("GET", "/headers", chunked=chunked)
971            else:
972                conn = pool._get_conn()
973                if chunked:
974                    conn.request_chunked("GET", "/headers")
975                else:
976                    conn.request("GET", "/headers")
977
978            assert pool.headers == {"key": "val"}
979            assert isinstance(pool.headers, header_type)
980
981        with HTTPConnectionPool(self.host, self.port) as pool:
982            if pool_request:
983                pool.request("GET", "/headers", headers=headers, chunked=chunked)
984            else:
985                conn = pool._get_conn()
986                if chunked:
987                    conn.request_chunked("GET", "/headers", headers=headers)
988                else:
989                    conn.request("GET", "/headers", headers=headers)
990
991            assert headers == {"key": "val"}
992
993    def test_bytes_header(self):
994        with HTTPConnectionPool(self.host, self.port) as pool:
995            headers = {"User-Agent": b"test header"}
996            r = pool.request("GET", "/headers", headers=headers)
997            request_headers = json.loads(r.data.decode("utf8"))
998            assert "User-Agent" in request_headers
999            assert request_headers["User-Agent"] == "test header"
1000
1001    @pytest.mark.parametrize(
1002        "user_agent", [u"Schönefeld/1.18.0", u"Schönefeld/1.18.0".encode("iso-8859-1")]
1003    )
1004    def test_user_agent_non_ascii_user_agent(self, user_agent):
1005        if six.PY2 and not isinstance(user_agent, str):
1006            pytest.skip(
1007                "Python 2 raises UnicodeEncodeError when passed a unicode header"
1008            )
1009
1010        with HTTPConnectionPool(self.host, self.port, retries=False) as pool:
1011            r = pool.urlopen(
1012                "GET",
1013                "/headers",
1014                headers={"User-Agent": user_agent},
1015            )
1016            request_headers = json.loads(r.data.decode("utf8"))
1017            assert "User-Agent" in request_headers
1018            assert request_headers["User-Agent"] == u"Schönefeld/1.18.0"
1019
1020    @onlyPy2
1021    def test_user_agent_non_ascii_fails_on_python_2(self):
1022        with HTTPConnectionPool(self.host, self.port, retries=False) as pool:
1023            with pytest.raises(UnicodeEncodeError) as e:
1024                pool.urlopen(
1025                    "GET",
1026                    "/headers",
1027                    headers={"User-Agent": u"Schönefeld/1.18.0"},
1028                )
1029            assert str(e.value) == (
1030                "'ascii' codec can't encode character u'\\xf6' in "
1031                "position 3: ordinal not in range(128)"
1032            )
1033
1034
1035class TestRetry(HTTPDummyServerTestCase):
1036    def test_max_retry(self):
1037        with HTTPConnectionPool(self.host, self.port) as pool:
1038            with pytest.raises(MaxRetryError):
1039                pool.request("GET", "/redirect", fields={"target": "/"}, retries=0)
1040
1041    def test_disabled_retry(self):
1042        """Disabled retries should disable redirect handling."""
1043        with HTTPConnectionPool(self.host, self.port) as pool:
1044            r = pool.request("GET", "/redirect", fields={"target": "/"}, retries=False)
1045            assert r.status == 303
1046
1047            r = pool.request(
1048                "GET",
1049                "/redirect",
1050                fields={"target": "/"},
1051                retries=Retry(redirect=False),
1052            )
1053            assert r.status == 303
1054
1055        with HTTPConnectionPool(
1056            "thishostdoesnotexist.invalid", self.port, timeout=0.001
1057        ) as pool:
1058            with pytest.raises(NewConnectionError):
1059                pool.request("GET", "/test", retries=False)
1060
1061    def test_read_retries(self):
1062        """Should retry for status codes in the whitelist"""
1063        with HTTPConnectionPool(self.host, self.port) as pool:
1064            retry = Retry(read=1, status_forcelist=[418])
1065            resp = pool.request(
1066                "GET",
1067                "/successful_retry",
1068                headers={"test-name": "test_read_retries"},
1069                retries=retry,
1070            )
1071            assert resp.status == 200
1072
1073    def test_read_total_retries(self):
1074        """HTTP response w/ status code in the whitelist should be retried"""
1075        with HTTPConnectionPool(self.host, self.port) as pool:
1076            headers = {"test-name": "test_read_total_retries"}
1077            retry = Retry(total=1, status_forcelist=[418])
1078            resp = pool.request(
1079                "GET", "/successful_retry", headers=headers, retries=retry
1080            )
1081            assert resp.status == 200
1082
1083    def test_retries_wrong_whitelist(self):
1084        """HTTP response w/ status code not in whitelist shouldn't be retried"""
1085        with HTTPConnectionPool(self.host, self.port) as pool:
1086            retry = Retry(total=1, status_forcelist=[202])
1087            resp = pool.request(
1088                "GET",
1089                "/successful_retry",
1090                headers={"test-name": "test_wrong_whitelist"},
1091                retries=retry,
1092            )
1093            assert resp.status == 418
1094
1095    def test_default_method_whitelist_retried(self):
1096        """urllib3 should retry methods in the default method whitelist"""
1097        with HTTPConnectionPool(self.host, self.port) as pool:
1098            retry = Retry(total=1, status_forcelist=[418])
1099            resp = pool.request(
1100                "OPTIONS",
1101                "/successful_retry",
1102                headers={"test-name": "test_default_whitelist"},
1103                retries=retry,
1104            )
1105            assert resp.status == 200
1106
1107    def test_retries_wrong_method_list(self):
1108        """Method not in our whitelist should not be retried, even if code matches"""
1109        with HTTPConnectionPool(self.host, self.port) as pool:
1110            headers = {"test-name": "test_wrong_method_whitelist"}
1111            retry = Retry(total=1, status_forcelist=[418], method_whitelist=["POST"])
1112            resp = pool.request(
1113                "GET", "/successful_retry", headers=headers, retries=retry
1114            )
1115            assert resp.status == 418
1116
1117    def test_read_retries_unsuccessful(self):
1118        with HTTPConnectionPool(self.host, self.port) as pool:
1119            headers = {"test-name": "test_read_retries_unsuccessful"}
1120            resp = pool.request("GET", "/successful_retry", headers=headers, retries=1)
1121            assert resp.status == 418
1122
1123    def test_retry_reuse_safe(self):
1124        """It should be possible to reuse a Retry object across requests"""
1125        with HTTPConnectionPool(self.host, self.port) as pool:
1126            headers = {"test-name": "test_retry_safe"}
1127            retry = Retry(total=1, status_forcelist=[418])
1128            resp = pool.request(
1129                "GET", "/successful_retry", headers=headers, retries=retry
1130            )
1131            assert resp.status == 200
1132
1133        with HTTPConnectionPool(self.host, self.port) as pool:
1134            resp = pool.request(
1135                "GET", "/successful_retry", headers=headers, retries=retry
1136            )
1137            assert resp.status == 200
1138
1139    def test_retry_return_in_response(self):
1140        with HTTPConnectionPool(self.host, self.port) as pool:
1141            headers = {"test-name": "test_retry_return_in_response"}
1142            retry = Retry(total=2, status_forcelist=[418])
1143            resp = pool.request(
1144                "GET", "/successful_retry", headers=headers, retries=retry
1145            )
1146            assert resp.status == 200
1147            assert resp.retries.total == 1
1148            assert resp.retries.history == (
1149                RequestHistory("GET", "/successful_retry", None, 418, None),
1150            )
1151
1152    def test_retry_redirect_history(self):
1153        with HTTPConnectionPool(self.host, self.port) as pool:
1154            resp = pool.request("GET", "/redirect", fields={"target": "/"})
1155            assert resp.status == 200
1156            assert resp.retries.history == (
1157                RequestHistory("GET", "/redirect?target=%2F", None, 303, "/"),
1158            )
1159
1160    def test_multi_redirect_history(self):
1161        with HTTPConnectionPool(self.host, self.port) as pool:
1162            r = pool.request(
1163                "GET",
1164                "/multi_redirect",
1165                fields={"redirect_codes": "303,302,200"},
1166                redirect=False,
1167            )
1168            assert r.status == 303
1169            assert r.retries.history == tuple()
1170
1171        with HTTPConnectionPool(self.host, self.port) as pool:
1172            r = pool.request(
1173                "GET",
1174                "/multi_redirect",
1175                retries=10,
1176                fields={"redirect_codes": "303,302,301,307,302,200"},
1177            )
1178            assert r.status == 200
1179            assert r.data == b"Done redirecting"
1180
1181            expected = [
1182                (303, "/multi_redirect?redirect_codes=302,301,307,302,200"),
1183                (302, "/multi_redirect?redirect_codes=301,307,302,200"),
1184                (301, "/multi_redirect?redirect_codes=307,302,200"),
1185                (307, "/multi_redirect?redirect_codes=302,200"),
1186                (302, "/multi_redirect?redirect_codes=200"),
1187            ]
1188            actual = [
1189                (history.status, history.redirect_location)
1190                for history in r.retries.history
1191            ]
1192            assert actual == expected
1193
1194
1195class TestRetryAfter(HTTPDummyServerTestCase):
1196    def test_retry_after(self):
1197        # Request twice in a second to get a 429 response.
1198        with HTTPConnectionPool(self.host, self.port) as pool:
1199            r = pool.request(
1200                "GET",
1201                "/retry_after",
1202                fields={"status": "429 Too Many Requests"},
1203                retries=False,
1204            )
1205            r = pool.request(
1206                "GET",
1207                "/retry_after",
1208                fields={"status": "429 Too Many Requests"},
1209                retries=False,
1210            )
1211            assert r.status == 429
1212
1213            r = pool.request(
1214                "GET",
1215                "/retry_after",
1216                fields={"status": "429 Too Many Requests"},
1217                retries=True,
1218            )
1219            assert r.status == 200
1220
1221            # Request twice in a second to get a 503 response.
1222            r = pool.request(
1223                "GET",
1224                "/retry_after",
1225                fields={"status": "503 Service Unavailable"},
1226                retries=False,
1227            )
1228            r = pool.request(
1229                "GET",
1230                "/retry_after",
1231                fields={"status": "503 Service Unavailable"},
1232                retries=False,
1233            )
1234            assert r.status == 503
1235
1236            r = pool.request(
1237                "GET",
1238                "/retry_after",
1239                fields={"status": "503 Service Unavailable"},
1240                retries=True,
1241            )
1242            assert r.status == 200
1243
1244            # Ignore Retry-After header on status which is not defined in
1245            # Retry.RETRY_AFTER_STATUS_CODES.
1246            r = pool.request(
1247                "GET",
1248                "/retry_after",
1249                fields={"status": "418 I'm a teapot"},
1250                retries=True,
1251            )
1252            assert r.status == 418
1253
1254    def test_redirect_after(self):
1255        with HTTPConnectionPool(self.host, self.port) as pool:
1256            r = pool.request("GET", "/redirect_after", retries=False)
1257            assert r.status == 303
1258
1259            t = time.time()
1260            r = pool.request("GET", "/redirect_after")
1261            assert r.status == 200
1262            delta = time.time() - t
1263            assert delta >= 1
1264
1265            t = time.time()
1266            timestamp = t + 2
1267            r = pool.request("GET", "/redirect_after?date=" + str(timestamp))
1268            assert r.status == 200
1269            delta = time.time() - t
1270            assert delta >= 1
1271
1272            # Retry-After is past
1273            t = time.time()
1274            timestamp = t - 1
1275            r = pool.request("GET", "/redirect_after?date=" + str(timestamp))
1276            delta = time.time() - t
1277            assert r.status == 200
1278            assert delta < 1
1279
1280
1281class TestFileBodiesOnRetryOrRedirect(HTTPDummyServerTestCase):
1282    def test_retries_put_filehandle(self):
1283        """HTTP PUT retry with a file-like object should not timeout"""
1284        with HTTPConnectionPool(self.host, self.port, timeout=0.1) as pool:
1285            retry = Retry(total=3, status_forcelist=[418])
1286            # httplib reads in 8k chunks; use a larger content length
1287            content_length = 65535
1288            data = b"A" * content_length
1289            uploaded_file = io.BytesIO(data)
1290            headers = {
1291                "test-name": "test_retries_put_filehandle",
1292                "Content-Length": str(content_length),
1293            }
1294            resp = pool.urlopen(
1295                "PUT",
1296                "/successful_retry",
1297                headers=headers,
1298                retries=retry,
1299                body=uploaded_file,
1300                assert_same_host=False,
1301                redirect=False,
1302            )
1303            assert resp.status == 200
1304
1305    def test_redirect_put_file(self):
1306        """PUT with file object should work with a redirection response"""
1307        with HTTPConnectionPool(self.host, self.port, timeout=0.1) as pool:
1308            retry = Retry(total=3, status_forcelist=[418])
1309            # httplib reads in 8k chunks; use a larger content length
1310            content_length = 65535
1311            data = b"A" * content_length
1312            uploaded_file = io.BytesIO(data)
1313            headers = {
1314                "test-name": "test_redirect_put_file",
1315                "Content-Length": str(content_length),
1316            }
1317            url = "/redirect?target=/echo&status=307"
1318            resp = pool.urlopen(
1319                "PUT",
1320                url,
1321                headers=headers,
1322                retries=retry,
1323                body=uploaded_file,
1324                assert_same_host=False,
1325                redirect=True,
1326            )
1327            assert resp.status == 200
1328            assert resp.data == data
1329
1330    def test_redirect_with_failed_tell(self):
1331        """Abort request if failed to get a position from tell()"""
1332
1333        class BadTellObject(io.BytesIO):
1334            def tell(self):
1335                raise IOError
1336
1337        body = BadTellObject(b"the data")
1338        url = "/redirect?target=/successful_retry"
1339        # httplib uses fileno if Content-Length isn't supplied,
1340        # which is unsupported by BytesIO.
1341        headers = {"Content-Length": "8"}
1342        with HTTPConnectionPool(self.host, self.port, timeout=0.1) as pool:
1343            with pytest.raises(UnrewindableBodyError) as e:
1344                pool.urlopen("PUT", url, headers=headers, body=body)
1345            assert "Unable to record file position for" in str(e.value)
1346
1347
1348class TestRetryPoolSize(HTTPDummyServerTestCase):
1349    def test_pool_size_retry(self):
1350        retries = Retry(total=1, raise_on_status=False, status_forcelist=[404])
1351        with HTTPConnectionPool(
1352            self.host, self.port, maxsize=10, retries=retries, block=True
1353        ) as pool:
1354            pool.urlopen("GET", "/not_found", preload_content=False)
1355            assert pool.num_connections == 1
1356
1357
1358class TestRedirectPoolSize(HTTPDummyServerTestCase):
1359    def test_pool_size_redirect(self):
1360        retries = Retry(
1361            total=1, raise_on_status=False, status_forcelist=[404], redirect=True
1362        )
1363        with HTTPConnectionPool(
1364            self.host, self.port, maxsize=10, retries=retries, block=True
1365        ) as pool:
1366            pool.urlopen("GET", "/redirect", preload_content=False)
1367            assert pool.num_connections == 1
1368