1import errno
2from http import client, HTTPStatus
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13TestCase = unittest.TestCase
14
15from test import support
16
17here = os.path.dirname(__file__)
18# Self-signed cert file for 'localhost'
19CERT_localhost = os.path.join(here, 'keycert.pem')
20# Self-signed cert file for 'fakehostname'
21CERT_fakehostname = os.path.join(here, 'keycert2.pem')
22# Self-signed cert file for self-signed.pythontest.net
23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
24
25# constants for testing chunked encoding
26chunked_start = (
27    'HTTP/1.1 200 OK\r\n'
28    'Transfer-Encoding: chunked\r\n\r\n'
29    'a\r\n'
30    'hello worl\r\n'
31    '3\r\n'
32    'd! \r\n'
33    '8\r\n'
34    'and now \r\n'
35    '22\r\n'
36    'for something completely different\r\n'
37)
38chunked_expected = b'hello world! and now for something completely different'
39chunk_extension = ";foo=bar"
40last_chunk = "0\r\n"
41last_chunk_extended = "0" + chunk_extension + "\r\n"
42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
43chunked_end = "\r\n"
44
45HOST = support.HOST
46
47class FakeSocket:
48    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
49        if isinstance(text, str):
50            text = text.encode("ascii")
51        self.text = text
52        self.fileclass = fileclass
53        self.data = b''
54        self.sendall_calls = 0
55        self.file_closed = False
56        self.host = host
57        self.port = port
58
59    def sendall(self, data):
60        self.sendall_calls += 1
61        self.data += data
62
63    def makefile(self, mode, bufsize=None):
64        if mode != 'r' and mode != 'rb':
65            raise client.UnimplementedFileMode()
66        # keep the file around so we can check how much was read from it
67        self.file = self.fileclass(self.text)
68        self.file.close = self.file_close #nerf close ()
69        return self.file
70
71    def file_close(self):
72        self.file_closed = True
73
74    def close(self):
75        pass
76
77    def setsockopt(self, level, optname, value):
78        pass
79
80class EPipeSocket(FakeSocket):
81
82    def __init__(self, text, pipe_trigger):
83        # When sendall() is called with pipe_trigger, raise EPIPE.
84        FakeSocket.__init__(self, text)
85        self.pipe_trigger = pipe_trigger
86
87    def sendall(self, data):
88        if self.pipe_trigger in data:
89            raise OSError(errno.EPIPE, "gotcha")
90        self.data += data
91
92    def close(self):
93        pass
94
95class NoEOFBytesIO(io.BytesIO):
96    """Like BytesIO, but raises AssertionError on EOF.
97
98    This is used below to test that http.client doesn't try to read
99    more from the underlying file than it should.
100    """
101    def read(self, n=-1):
102        data = io.BytesIO.read(self, n)
103        if data == b'':
104            raise AssertionError('caller tried to read past EOF')
105        return data
106
107    def readline(self, length=None):
108        data = io.BytesIO.readline(self, length)
109        if data == b'':
110            raise AssertionError('caller tried to read past EOF')
111        return data
112
113class FakeSocketHTTPConnection(client.HTTPConnection):
114    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
115
116    def __init__(self, *args):
117        self.connections = 0
118        super().__init__('example.com')
119        self.fake_socket_args = args
120        self._create_connection = self.create_connection
121
122    def connect(self):
123        """Count the number of times connect() is invoked"""
124        self.connections += 1
125        return super().connect()
126
127    def create_connection(self, *pos, **kw):
128        return FakeSocket(*self.fake_socket_args)
129
130class HeaderTests(TestCase):
131    def test_auto_headers(self):
132        # Some headers are added automatically, but should not be added by
133        # .request() if they are explicitly set.
134
135        class HeaderCountingBuffer(list):
136            def __init__(self):
137                self.count = {}
138            def append(self, item):
139                kv = item.split(b':')
140                if len(kv) > 1:
141                    # item is a 'Key: Value' header string
142                    lcKey = kv[0].decode('ascii').lower()
143                    self.count.setdefault(lcKey, 0)
144                    self.count[lcKey] += 1
145                list.append(self, item)
146
147        for explicit_header in True, False:
148            for header in 'Content-length', 'Host', 'Accept-encoding':
149                conn = client.HTTPConnection('example.com')
150                conn.sock = FakeSocket('blahblahblah')
151                conn._buffer = HeaderCountingBuffer()
152
153                body = 'spamspamspam'
154                headers = {}
155                if explicit_header:
156                    headers[header] = str(len(body))
157                conn.request('POST', '/', body, headers)
158                self.assertEqual(conn._buffer.count[header.lower()], 1)
159
160    def test_content_length_0(self):
161
162        class ContentLengthChecker(list):
163            def __init__(self):
164                list.__init__(self)
165                self.content_length = None
166            def append(self, item):
167                kv = item.split(b':', 1)
168                if len(kv) > 1 and kv[0].lower() == b'content-length':
169                    self.content_length = kv[1].strip()
170                list.append(self, item)
171
172        # Here, we're testing that methods expecting a body get a
173        # content-length set to zero if the body is empty (either None or '')
174        bodies = (None, '')
175        methods_with_body = ('PUT', 'POST', 'PATCH')
176        for method, body in itertools.product(methods_with_body, bodies):
177            conn = client.HTTPConnection('example.com')
178            conn.sock = FakeSocket(None)
179            conn._buffer = ContentLengthChecker()
180            conn.request(method, '/', body)
181            self.assertEqual(
182                conn._buffer.content_length, b'0',
183                'Header Content-Length incorrect on {}'.format(method)
184            )
185
186        # For these methods, we make sure that content-length is not set when
187        # the body is None because it might cause unexpected behaviour on the
188        # server.
189        methods_without_body = (
190             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
191        )
192        for method in methods_without_body:
193            conn = client.HTTPConnection('example.com')
194            conn.sock = FakeSocket(None)
195            conn._buffer = ContentLengthChecker()
196            conn.request(method, '/', None)
197            self.assertEqual(
198                conn._buffer.content_length, None,
199                'Header Content-Length set for empty body on {}'.format(method)
200            )
201
202        # If the body is set to '', that's considered to be "present but
203        # empty" rather than "missing", so content length would be set, even
204        # for methods that don't expect a body.
205        for method in methods_without_body:
206            conn = client.HTTPConnection('example.com')
207            conn.sock = FakeSocket(None)
208            conn._buffer = ContentLengthChecker()
209            conn.request(method, '/', '')
210            self.assertEqual(
211                conn._buffer.content_length, b'0',
212                'Header Content-Length incorrect on {}'.format(method)
213            )
214
215        # If the body is set, make sure Content-Length is set.
216        for method in itertools.chain(methods_without_body, methods_with_body):
217            conn = client.HTTPConnection('example.com')
218            conn.sock = FakeSocket(None)
219            conn._buffer = ContentLengthChecker()
220            conn.request(method, '/', ' ')
221            self.assertEqual(
222                conn._buffer.content_length, b'1',
223                'Header Content-Length incorrect on {}'.format(method)
224            )
225
226    def test_putheader(self):
227        conn = client.HTTPConnection('example.com')
228        conn.sock = FakeSocket(None)
229        conn.putrequest('GET','/')
230        conn.putheader('Content-length', 42)
231        self.assertIn(b'Content-length: 42', conn._buffer)
232
233        conn.putheader('Foo', ' bar ')
234        self.assertIn(b'Foo:  bar ', conn._buffer)
235        conn.putheader('Bar', '\tbaz\t')
236        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
237        conn.putheader('Authorization', 'Bearer mytoken')
238        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
239        conn.putheader('IterHeader', 'IterA', 'IterB')
240        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
241        conn.putheader('LatinHeader', b'\xFF')
242        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
243        conn.putheader('Utf8Header', b'\xc3\x80')
244        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
245        conn.putheader('C1-Control', b'next\x85line')
246        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
247        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
248        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
249        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
250        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
251        conn.putheader('Key Space', 'value')
252        self.assertIn(b'Key Space: value', conn._buffer)
253        conn.putheader('KeySpace ', 'value')
254        self.assertIn(b'KeySpace : value', conn._buffer)
255        conn.putheader(b'Nonbreak\xa0Space', 'value')
256        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
257        conn.putheader(b'\xa0NonbreakSpace', 'value')
258        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
259
260    def test_ipv6host_header(self):
261        # Default host header on IPv6 transaction should be wrapped by [] if
262        # it is an IPv6 address
263        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
264                   b'Accept-Encoding: identity\r\n\r\n'
265        conn = client.HTTPConnection('[2001::]:81')
266        sock = FakeSocket('')
267        conn.sock = sock
268        conn.request('GET', '/foo')
269        self.assertTrue(sock.data.startswith(expected))
270
271        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
272                   b'Accept-Encoding: identity\r\n\r\n'
273        conn = client.HTTPConnection('[2001:102A::]')
274        sock = FakeSocket('')
275        conn.sock = sock
276        conn.request('GET', '/foo')
277        self.assertTrue(sock.data.startswith(expected))
278
279    def test_malformed_headers_coped_with(self):
280        # Issue 19996
281        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
282        sock = FakeSocket(body)
283        resp = client.HTTPResponse(sock)
284        resp.begin()
285
286        self.assertEqual(resp.getheader('First'), 'val')
287        self.assertEqual(resp.getheader('Second'), 'val')
288
289    def test_parse_all_octets(self):
290        # Ensure no valid header field octet breaks the parser
291        body = (
292            b'HTTP/1.1 200 OK\r\n'
293            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
294            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
295            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
296            b'obs-fold: text\r\n'
297            b' folded with space\r\n'
298            b'\tfolded with tab\r\n'
299            b'Content-Length: 0\r\n'
300            b'\r\n'
301        )
302        sock = FakeSocket(body)
303        resp = client.HTTPResponse(sock)
304        resp.begin()
305        self.assertEqual(resp.getheader('Content-Length'), '0')
306        self.assertEqual(resp.msg['Content-Length'], '0')
307        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
308        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
309        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
310        self.assertEqual(resp.getheader('VCHAR'), vchar)
311        self.assertEqual(resp.msg['VCHAR'], vchar)
312        self.assertIsNotNone(resp.getheader('obs-text'))
313        self.assertIn('obs-text', resp.msg)
314        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
315            self.assertTrue(folded.startswith('text'))
316            self.assertIn(' folded with space', folded)
317            self.assertTrue(folded.endswith('folded with tab'))
318
319    def test_invalid_headers(self):
320        conn = client.HTTPConnection('example.com')
321        conn.sock = FakeSocket('')
322        conn.putrequest('GET', '/')
323
324        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
325        # longer allowed in header names
326        cases = (
327            (b'Invalid\r\nName', b'ValidValue'),
328            (b'Invalid\rName', b'ValidValue'),
329            (b'Invalid\nName', b'ValidValue'),
330            (b'\r\nInvalidName', b'ValidValue'),
331            (b'\rInvalidName', b'ValidValue'),
332            (b'\nInvalidName', b'ValidValue'),
333            (b' InvalidName', b'ValidValue'),
334            (b'\tInvalidName', b'ValidValue'),
335            (b'Invalid:Name', b'ValidValue'),
336            (b':InvalidName', b'ValidValue'),
337            (b'ValidName', b'Invalid\r\nValue'),
338            (b'ValidName', b'Invalid\rValue'),
339            (b'ValidName', b'Invalid\nValue'),
340            (b'ValidName', b'InvalidValue\r\n'),
341            (b'ValidName', b'InvalidValue\r'),
342            (b'ValidName', b'InvalidValue\n'),
343        )
344        for name, value in cases:
345            with self.subTest((name, value)):
346                with self.assertRaisesRegex(ValueError, 'Invalid header'):
347                    conn.putheader(name, value)
348
349    def test_headers_debuglevel(self):
350        body = (
351            b'HTTP/1.1 200 OK\r\n'
352            b'First: val\r\n'
353            b'Second: val1\r\n'
354            b'Second: val2\r\n'
355        )
356        sock = FakeSocket(body)
357        resp = client.HTTPResponse(sock, debuglevel=1)
358        with support.captured_stdout() as output:
359            resp.begin()
360        lines = output.getvalue().splitlines()
361        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
362        self.assertEqual(lines[1], "header: First: val")
363        self.assertEqual(lines[2], "header: Second: val1")
364        self.assertEqual(lines[3], "header: Second: val2")
365
366
367class HttpMethodTests(TestCase):
368    def test_invalid_method_names(self):
369        methods = (
370            'GET\r',
371            'POST\n',
372            'PUT\n\r',
373            'POST\nValue',
374            'POST\nHOST:abc',
375            'GET\nrHost:abc\n',
376            'POST\rRemainder:\r',
377            'GET\rHOST:\n',
378            '\nPUT'
379        )
380
381        for method in methods:
382            with self.assertRaisesRegex(
383                    ValueError, "method can't contain control characters"):
384                conn = client.HTTPConnection('example.com')
385                conn.sock = FakeSocket(None)
386                conn.request(method=method, url="/")
387
388
389class TransferEncodingTest(TestCase):
390    expected_body = b"It's just a flesh wound"
391
392    def test_endheaders_chunked(self):
393        conn = client.HTTPConnection('example.com')
394        conn.sock = FakeSocket(b'')
395        conn.putrequest('POST', '/')
396        conn.endheaders(self._make_body(), encode_chunked=True)
397
398        _, _, body = self._parse_request(conn.sock.data)
399        body = self._parse_chunked(body)
400        self.assertEqual(body, self.expected_body)
401
402    def test_explicit_headers(self):
403        # explicit chunked
404        conn = client.HTTPConnection('example.com')
405        conn.sock = FakeSocket(b'')
406        # this shouldn't actually be automatically chunk-encoded because the
407        # calling code has explicitly stated that it's taking care of it
408        conn.request(
409            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
410
411        _, headers, body = self._parse_request(conn.sock.data)
412        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
413        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
414        self.assertEqual(body, self.expected_body)
415
416        # explicit chunked, string body
417        conn = client.HTTPConnection('example.com')
418        conn.sock = FakeSocket(b'')
419        conn.request(
420            'POST', '/', self.expected_body.decode('latin-1'),
421            {'Transfer-Encoding': 'chunked'})
422
423        _, headers, body = self._parse_request(conn.sock.data)
424        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
425        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
426        self.assertEqual(body, self.expected_body)
427
428        # User-specified TE, but request() does the chunk encoding
429        conn = client.HTTPConnection('example.com')
430        conn.sock = FakeSocket(b'')
431        conn.request('POST', '/',
432            headers={'Transfer-Encoding': 'gzip, chunked'},
433            encode_chunked=True,
434            body=self._make_body())
435        _, headers, body = self._parse_request(conn.sock.data)
436        self.assertNotIn('content-length', [k.lower() for k in headers])
437        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
438        self.assertEqual(self._parse_chunked(body), self.expected_body)
439
440    def test_request(self):
441        for empty_lines in (False, True,):
442            conn = client.HTTPConnection('example.com')
443            conn.sock = FakeSocket(b'')
444            conn.request(
445                'POST', '/', self._make_body(empty_lines=empty_lines))
446
447            _, headers, body = self._parse_request(conn.sock.data)
448            body = self._parse_chunked(body)
449            self.assertEqual(body, self.expected_body)
450            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
451
452            # Content-Length and Transfer-Encoding SHOULD not be sent in the
453            # same request
454            self.assertNotIn('content-length', [k.lower() for k in headers])
455
456    def test_empty_body(self):
457        # Zero-length iterable should be treated like any other iterable
458        conn = client.HTTPConnection('example.com')
459        conn.sock = FakeSocket(b'')
460        conn.request('POST', '/', ())
461        _, headers, body = self._parse_request(conn.sock.data)
462        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
463        self.assertNotIn('content-length', [k.lower() for k in headers])
464        self.assertEqual(body, b"0\r\n\r\n")
465
466    def _make_body(self, empty_lines=False):
467        lines = self.expected_body.split(b' ')
468        for idx, line in enumerate(lines):
469            # for testing handling empty lines
470            if empty_lines and idx % 2:
471                yield b''
472            if idx < len(lines) - 1:
473                yield line + b' '
474            else:
475                yield line
476
477    def _parse_request(self, data):
478        lines = data.split(b'\r\n')
479        request = lines[0]
480        headers = {}
481        n = 1
482        while n < len(lines) and len(lines[n]) > 0:
483            key, val = lines[n].split(b':')
484            key = key.decode('latin-1').strip()
485            headers[key] = val.decode('latin-1').strip()
486            n += 1
487
488        return request, headers, b'\r\n'.join(lines[n + 1:])
489
490    def _parse_chunked(self, data):
491        body = []
492        trailers = {}
493        n = 0
494        lines = data.split(b'\r\n')
495        # parse body
496        while True:
497            size, chunk = lines[n:n+2]
498            size = int(size, 16)
499
500            if size == 0:
501                n += 1
502                break
503
504            self.assertEqual(size, len(chunk))
505            body.append(chunk)
506
507            n += 2
508            # we /should/ hit the end chunk, but check against the size of
509            # lines so we're not stuck in an infinite loop should we get
510            # malformed data
511            if n > len(lines):
512                break
513
514        return b''.join(body)
515
516
517class BasicTest(TestCase):
518    def test_dir_with_added_behavior_on_status(self):
519        # see issue40084
520        self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
521
522    def test_status_lines(self):
523        # Test HTTP status lines
524
525        body = "HTTP/1.1 200 Ok\r\n\r\nText"
526        sock = FakeSocket(body)
527        resp = client.HTTPResponse(sock)
528        resp.begin()
529        self.assertEqual(resp.read(0), b'')  # Issue #20007
530        self.assertFalse(resp.isclosed())
531        self.assertFalse(resp.closed)
532        self.assertEqual(resp.read(), b"Text")
533        self.assertTrue(resp.isclosed())
534        self.assertFalse(resp.closed)
535        resp.close()
536        self.assertTrue(resp.closed)
537
538        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
539        sock = FakeSocket(body)
540        resp = client.HTTPResponse(sock)
541        self.assertRaises(client.BadStatusLine, resp.begin)
542
543    def test_bad_status_repr(self):
544        exc = client.BadStatusLine('')
545        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
546
547    def test_partial_reads(self):
548        # if we have Content-Length, HTTPResponse knows when to close itself,
549        # the same behaviour as when we read the whole thing with read()
550        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
551        sock = FakeSocket(body)
552        resp = client.HTTPResponse(sock)
553        resp.begin()
554        self.assertEqual(resp.read(2), b'Te')
555        self.assertFalse(resp.isclosed())
556        self.assertEqual(resp.read(2), b'xt')
557        self.assertTrue(resp.isclosed())
558        self.assertFalse(resp.closed)
559        resp.close()
560        self.assertTrue(resp.closed)
561
562    def test_mixed_reads(self):
563        # readline() should update the remaining length, so that read() knows
564        # how much data is left and does not raise IncompleteRead
565        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
566        sock = FakeSocket(body)
567        resp = client.HTTPResponse(sock)
568        resp.begin()
569        self.assertEqual(resp.readline(), b'Text\r\n')
570        self.assertFalse(resp.isclosed())
571        self.assertEqual(resp.read(), b'Another')
572        self.assertTrue(resp.isclosed())
573        self.assertFalse(resp.closed)
574        resp.close()
575        self.assertTrue(resp.closed)
576
577    def test_partial_readintos(self):
578        # if we have Content-Length, HTTPResponse knows when to close itself,
579        # the same behaviour as when we read the whole thing with read()
580        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
581        sock = FakeSocket(body)
582        resp = client.HTTPResponse(sock)
583        resp.begin()
584        b = bytearray(2)
585        n = resp.readinto(b)
586        self.assertEqual(n, 2)
587        self.assertEqual(bytes(b), b'Te')
588        self.assertFalse(resp.isclosed())
589        n = resp.readinto(b)
590        self.assertEqual(n, 2)
591        self.assertEqual(bytes(b), b'xt')
592        self.assertTrue(resp.isclosed())
593        self.assertFalse(resp.closed)
594        resp.close()
595        self.assertTrue(resp.closed)
596
597    def test_partial_reads_no_content_length(self):
598        # when no length is present, the socket should be gracefully closed when
599        # all data was read
600        body = "HTTP/1.1 200 Ok\r\n\r\nText"
601        sock = FakeSocket(body)
602        resp = client.HTTPResponse(sock)
603        resp.begin()
604        self.assertEqual(resp.read(2), b'Te')
605        self.assertFalse(resp.isclosed())
606        self.assertEqual(resp.read(2), b'xt')
607        self.assertEqual(resp.read(1), b'')
608        self.assertTrue(resp.isclosed())
609        self.assertFalse(resp.closed)
610        resp.close()
611        self.assertTrue(resp.closed)
612
613    def test_partial_readintos_no_content_length(self):
614        # when no length is present, the socket should be gracefully closed when
615        # all data was read
616        body = "HTTP/1.1 200 Ok\r\n\r\nText"
617        sock = FakeSocket(body)
618        resp = client.HTTPResponse(sock)
619        resp.begin()
620        b = bytearray(2)
621        n = resp.readinto(b)
622        self.assertEqual(n, 2)
623        self.assertEqual(bytes(b), b'Te')
624        self.assertFalse(resp.isclosed())
625        n = resp.readinto(b)
626        self.assertEqual(n, 2)
627        self.assertEqual(bytes(b), b'xt')
628        n = resp.readinto(b)
629        self.assertEqual(n, 0)
630        self.assertTrue(resp.isclosed())
631
632    def test_partial_reads_incomplete_body(self):
633        # if the server shuts down the connection before the whole
634        # content-length is delivered, the socket is gracefully closed
635        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
636        sock = FakeSocket(body)
637        resp = client.HTTPResponse(sock)
638        resp.begin()
639        self.assertEqual(resp.read(2), b'Te')
640        self.assertFalse(resp.isclosed())
641        self.assertEqual(resp.read(2), b'xt')
642        self.assertEqual(resp.read(1), b'')
643        self.assertTrue(resp.isclosed())
644
645    def test_partial_readintos_incomplete_body(self):
646        # if the server shuts down the connection before the whole
647        # content-length is delivered, the socket is gracefully closed
648        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
649        sock = FakeSocket(body)
650        resp = client.HTTPResponse(sock)
651        resp.begin()
652        b = bytearray(2)
653        n = resp.readinto(b)
654        self.assertEqual(n, 2)
655        self.assertEqual(bytes(b), b'Te')
656        self.assertFalse(resp.isclosed())
657        n = resp.readinto(b)
658        self.assertEqual(n, 2)
659        self.assertEqual(bytes(b), b'xt')
660        n = resp.readinto(b)
661        self.assertEqual(n, 0)
662        self.assertTrue(resp.isclosed())
663        self.assertFalse(resp.closed)
664        resp.close()
665        self.assertTrue(resp.closed)
666
667    def test_host_port(self):
668        # Check invalid host_port
669
670        for hp in ("www.python.org:abc", "user:password@www.python.org"):
671            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
672
673        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
674                          "fe80::207:e9ff:fe9b", 8000),
675                         ("www.python.org:80", "www.python.org", 80),
676                         ("www.python.org:", "www.python.org", 80),
677                         ("www.python.org", "www.python.org", 80),
678                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
679                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
680            c = client.HTTPConnection(hp)
681            self.assertEqual(h, c.host)
682            self.assertEqual(p, c.port)
683
684    def test_response_headers(self):
685        # test response with multiple message headers with the same field name.
686        text = ('HTTP/1.1 200 OK\r\n'
687                'Set-Cookie: Customer="WILE_E_COYOTE"; '
688                'Version="1"; Path="/acme"\r\n'
689                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
690                ' Path="/acme"\r\n'
691                '\r\n'
692                'No body\r\n')
693        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
694               ', '
695               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
696        s = FakeSocket(text)
697        r = client.HTTPResponse(s)
698        r.begin()
699        cookies = r.getheader("Set-Cookie")
700        self.assertEqual(cookies, hdr)
701
702    def test_read_head(self):
703        # Test that the library doesn't attempt to read any data
704        # from a HEAD request.  (Tickles SF bug #622042.)
705        sock = FakeSocket(
706            'HTTP/1.1 200 OK\r\n'
707            'Content-Length: 14432\r\n'
708            '\r\n',
709            NoEOFBytesIO)
710        resp = client.HTTPResponse(sock, method="HEAD")
711        resp.begin()
712        if resp.read():
713            self.fail("Did not expect response from HEAD request")
714
715    def test_readinto_head(self):
716        # Test that the library doesn't attempt to read any data
717        # from a HEAD request.  (Tickles SF bug #622042.)
718        sock = FakeSocket(
719            'HTTP/1.1 200 OK\r\n'
720            'Content-Length: 14432\r\n'
721            '\r\n',
722            NoEOFBytesIO)
723        resp = client.HTTPResponse(sock, method="HEAD")
724        resp.begin()
725        b = bytearray(5)
726        if resp.readinto(b) != 0:
727            self.fail("Did not expect response from HEAD request")
728        self.assertEqual(bytes(b), b'\x00'*5)
729
730    def test_too_many_headers(self):
731        headers = '\r\n'.join('Header%d: foo' % i
732                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
733        text = ('HTTP/1.1 200 OK\r\n' + headers)
734        s = FakeSocket(text)
735        r = client.HTTPResponse(s)
736        self.assertRaisesRegex(client.HTTPException,
737                               r"got more than \d+ headers", r.begin)
738
739    def test_send_file(self):
740        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
741                    b'Accept-Encoding: identity\r\n'
742                    b'Transfer-Encoding: chunked\r\n'
743                    b'\r\n')
744
745        with open(__file__, 'rb') as body:
746            conn = client.HTTPConnection('example.com')
747            sock = FakeSocket(body)
748            conn.sock = sock
749            conn.request('GET', '/foo', body)
750            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
751                    (sock.data[:len(expected)], expected))
752
753    def test_send(self):
754        expected = b'this is a test this is only a test'
755        conn = client.HTTPConnection('example.com')
756        sock = FakeSocket(None)
757        conn.sock = sock
758        conn.send(expected)
759        self.assertEqual(expected, sock.data)
760        sock.data = b''
761        conn.send(array.array('b', expected))
762        self.assertEqual(expected, sock.data)
763        sock.data = b''
764        conn.send(io.BytesIO(expected))
765        self.assertEqual(expected, sock.data)
766
767    def test_send_updating_file(self):
768        def data():
769            yield 'data'
770            yield None
771            yield 'data_two'
772
773        class UpdatingFile(io.TextIOBase):
774            mode = 'r'
775            d = data()
776            def read(self, blocksize=-1):
777                return next(self.d)
778
779        expected = b'data'
780
781        conn = client.HTTPConnection('example.com')
782        sock = FakeSocket("")
783        conn.sock = sock
784        conn.send(UpdatingFile())
785        self.assertEqual(sock.data, expected)
786
787
788    def test_send_iter(self):
789        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
790                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
791                   b'\r\nonetwothree'
792
793        def body():
794            yield b"one"
795            yield b"two"
796            yield b"three"
797
798        conn = client.HTTPConnection('example.com')
799        sock = FakeSocket("")
800        conn.sock = sock
801        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
802        self.assertEqual(sock.data, expected)
803
804    def test_blocksize_request(self):
805        """Check that request() respects the configured block size."""
806        blocksize = 8  # For easy debugging.
807        conn = client.HTTPConnection('example.com', blocksize=blocksize)
808        sock = FakeSocket(None)
809        conn.sock = sock
810        expected = b"a" * blocksize + b"b"
811        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
812        self.assertEqual(sock.sendall_calls, 3)
813        body = sock.data.split(b"\r\n\r\n", 1)[1]
814        self.assertEqual(body, expected)
815
816    def test_blocksize_send(self):
817        """Check that send() respects the configured block size."""
818        blocksize = 8  # For easy debugging.
819        conn = client.HTTPConnection('example.com', blocksize=blocksize)
820        sock = FakeSocket(None)
821        conn.sock = sock
822        expected = b"a" * blocksize + b"b"
823        conn.send(io.BytesIO(expected))
824        self.assertEqual(sock.sendall_calls, 2)
825        self.assertEqual(sock.data, expected)
826
827    def test_send_type_error(self):
828        # See: Issue #12676
829        conn = client.HTTPConnection('example.com')
830        conn.sock = FakeSocket('')
831        with self.assertRaises(TypeError):
832            conn.request('POST', 'test', conn)
833
834    def test_chunked(self):
835        expected = chunked_expected
836        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
837        resp = client.HTTPResponse(sock, method="GET")
838        resp.begin()
839        self.assertEqual(resp.read(), expected)
840        resp.close()
841
842        # Various read sizes
843        for n in range(1, 12):
844            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
845            resp = client.HTTPResponse(sock, method="GET")
846            resp.begin()
847            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
848            resp.close()
849
850        for x in ('', 'foo\r\n'):
851            sock = FakeSocket(chunked_start + x)
852            resp = client.HTTPResponse(sock, method="GET")
853            resp.begin()
854            try:
855                resp.read()
856            except client.IncompleteRead as i:
857                self.assertEqual(i.partial, expected)
858                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
859                self.assertEqual(repr(i), expected_message)
860                self.assertEqual(str(i), expected_message)
861            else:
862                self.fail('IncompleteRead expected')
863            finally:
864                resp.close()
865
866    def test_readinto_chunked(self):
867
868        expected = chunked_expected
869        nexpected = len(expected)
870        b = bytearray(128)
871
872        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
873        resp = client.HTTPResponse(sock, method="GET")
874        resp.begin()
875        n = resp.readinto(b)
876        self.assertEqual(b[:nexpected], expected)
877        self.assertEqual(n, nexpected)
878        resp.close()
879
880        # Various read sizes
881        for n in range(1, 12):
882            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
883            resp = client.HTTPResponse(sock, method="GET")
884            resp.begin()
885            m = memoryview(b)
886            i = resp.readinto(m[0:n])
887            i += resp.readinto(m[i:n + i])
888            i += resp.readinto(m[i:])
889            self.assertEqual(b[:nexpected], expected)
890            self.assertEqual(i, nexpected)
891            resp.close()
892
893        for x in ('', 'foo\r\n'):
894            sock = FakeSocket(chunked_start + x)
895            resp = client.HTTPResponse(sock, method="GET")
896            resp.begin()
897            try:
898                n = resp.readinto(b)
899            except client.IncompleteRead as i:
900                self.assertEqual(i.partial, expected)
901                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
902                self.assertEqual(repr(i), expected_message)
903                self.assertEqual(str(i), expected_message)
904            else:
905                self.fail('IncompleteRead expected')
906            finally:
907                resp.close()
908
909    def test_chunked_head(self):
910        chunked_start = (
911            'HTTP/1.1 200 OK\r\n'
912            'Transfer-Encoding: chunked\r\n\r\n'
913            'a\r\n'
914            'hello world\r\n'
915            '1\r\n'
916            'd\r\n'
917        )
918        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
919        resp = client.HTTPResponse(sock, method="HEAD")
920        resp.begin()
921        self.assertEqual(resp.read(), b'')
922        self.assertEqual(resp.status, 200)
923        self.assertEqual(resp.reason, 'OK')
924        self.assertTrue(resp.isclosed())
925        self.assertFalse(resp.closed)
926        resp.close()
927        self.assertTrue(resp.closed)
928
929    def test_readinto_chunked_head(self):
930        chunked_start = (
931            'HTTP/1.1 200 OK\r\n'
932            'Transfer-Encoding: chunked\r\n\r\n'
933            'a\r\n'
934            'hello world\r\n'
935            '1\r\n'
936            'd\r\n'
937        )
938        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
939        resp = client.HTTPResponse(sock, method="HEAD")
940        resp.begin()
941        b = bytearray(5)
942        n = resp.readinto(b)
943        self.assertEqual(n, 0)
944        self.assertEqual(bytes(b), b'\x00'*5)
945        self.assertEqual(resp.status, 200)
946        self.assertEqual(resp.reason, 'OK')
947        self.assertTrue(resp.isclosed())
948        self.assertFalse(resp.closed)
949        resp.close()
950        self.assertTrue(resp.closed)
951
952    def test_negative_content_length(self):
953        sock = FakeSocket(
954            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
955        resp = client.HTTPResponse(sock, method="GET")
956        resp.begin()
957        self.assertEqual(resp.read(), b'Hello\r\n')
958        self.assertTrue(resp.isclosed())
959
960    def test_incomplete_read(self):
961        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
962        resp = client.HTTPResponse(sock, method="GET")
963        resp.begin()
964        try:
965            resp.read()
966        except client.IncompleteRead as i:
967            self.assertEqual(i.partial, b'Hello\r\n')
968            self.assertEqual(repr(i),
969                             "IncompleteRead(7 bytes read, 3 more expected)")
970            self.assertEqual(str(i),
971                             "IncompleteRead(7 bytes read, 3 more expected)")
972            self.assertTrue(resp.isclosed())
973        else:
974            self.fail('IncompleteRead expected')
975
976    def test_epipe(self):
977        sock = EPipeSocket(
978            "HTTP/1.0 401 Authorization Required\r\n"
979            "Content-type: text/html\r\n"
980            "WWW-Authenticate: Basic realm=\"example\"\r\n",
981            b"Content-Length")
982        conn = client.HTTPConnection("example.com")
983        conn.sock = sock
984        self.assertRaises(OSError,
985                          lambda: conn.request("PUT", "/url", "body"))
986        resp = conn.getresponse()
987        self.assertEqual(401, resp.status)
988        self.assertEqual("Basic realm=\"example\"",
989                         resp.getheader("www-authenticate"))
990
991    # Test lines overflowing the max line size (_MAXLINE in http.client)
992
993    def test_overflowing_status_line(self):
994        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
995        resp = client.HTTPResponse(FakeSocket(body))
996        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
997
998    def test_overflowing_header_line(self):
999        body = (
1000            'HTTP/1.1 200 OK\r\n'
1001            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
1002        )
1003        resp = client.HTTPResponse(FakeSocket(body))
1004        self.assertRaises(client.LineTooLong, resp.begin)
1005
1006    def test_overflowing_header_limit_after_100(self):
1007        body = (
1008            'HTTP/1.1 100 OK\r\n'
1009            'r\n' * 32768
1010        )
1011        resp = client.HTTPResponse(FakeSocket(body))
1012        with self.assertRaises(client.HTTPException) as cm:
1013            resp.begin()
1014        # We must assert more because other reasonable errors that we
1015        # do not want can also be HTTPException derived.
1016        self.assertIn('got more than ', str(cm.exception))
1017        self.assertIn('headers', str(cm.exception))
1018
1019    def test_overflowing_chunked_line(self):
1020        body = (
1021            'HTTP/1.1 200 OK\r\n'
1022            'Transfer-Encoding: chunked\r\n\r\n'
1023            + '0' * 65536 + 'a\r\n'
1024            'hello world\r\n'
1025            '0\r\n'
1026            '\r\n'
1027        )
1028        resp = client.HTTPResponse(FakeSocket(body))
1029        resp.begin()
1030        self.assertRaises(client.LineTooLong, resp.read)
1031
1032    def test_early_eof(self):
1033        # Test httpresponse with no \r\n termination,
1034        body = "HTTP/1.1 200 Ok"
1035        sock = FakeSocket(body)
1036        resp = client.HTTPResponse(sock)
1037        resp.begin()
1038        self.assertEqual(resp.read(), b'')
1039        self.assertTrue(resp.isclosed())
1040        self.assertFalse(resp.closed)
1041        resp.close()
1042        self.assertTrue(resp.closed)
1043
1044    def test_error_leak(self):
1045        # Test that the socket is not leaked if getresponse() fails
1046        conn = client.HTTPConnection('example.com')
1047        response = None
1048        class Response(client.HTTPResponse):
1049            def __init__(self, *pos, **kw):
1050                nonlocal response
1051                response = self  # Avoid garbage collector closing the socket
1052                client.HTTPResponse.__init__(self, *pos, **kw)
1053        conn.response_class = Response
1054        conn.sock = FakeSocket('Invalid status line')
1055        conn.request('GET', '/')
1056        self.assertRaises(client.BadStatusLine, conn.getresponse)
1057        self.assertTrue(response.closed)
1058        self.assertTrue(conn.sock.file_closed)
1059
1060    def test_chunked_extension(self):
1061        extra = '3;foo=bar\r\n' + 'abc\r\n'
1062        expected = chunked_expected + b'abc'
1063
1064        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1065        resp = client.HTTPResponse(sock, method="GET")
1066        resp.begin()
1067        self.assertEqual(resp.read(), expected)
1068        resp.close()
1069
1070    def test_chunked_missing_end(self):
1071        """some servers may serve up a short chunked encoding stream"""
1072        expected = chunked_expected
1073        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1074        resp = client.HTTPResponse(sock, method="GET")
1075        resp.begin()
1076        self.assertEqual(resp.read(), expected)
1077        resp.close()
1078
1079    def test_chunked_trailers(self):
1080        """See that trailers are read and ignored"""
1081        expected = chunked_expected
1082        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1083        resp = client.HTTPResponse(sock, method="GET")
1084        resp.begin()
1085        self.assertEqual(resp.read(), expected)
1086        # we should have reached the end of the file
1087        self.assertEqual(sock.file.read(), b"") #we read to the end
1088        resp.close()
1089
1090    def test_chunked_sync(self):
1091        """Check that we don't read past the end of the chunked-encoding stream"""
1092        expected = chunked_expected
1093        extradata = "extradata"
1094        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1095        resp = client.HTTPResponse(sock, method="GET")
1096        resp.begin()
1097        self.assertEqual(resp.read(), expected)
1098        # the file should now have our extradata ready to be read
1099        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1100        resp.close()
1101
1102    def test_content_length_sync(self):
1103        """Check that we don't read past the end of the Content-Length stream"""
1104        extradata = b"extradata"
1105        expected = b"Hello123\r\n"
1106        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1107        resp = client.HTTPResponse(sock, method="GET")
1108        resp.begin()
1109        self.assertEqual(resp.read(), expected)
1110        # the file should now have our extradata ready to be read
1111        self.assertEqual(sock.file.read(), extradata) #we read to the end
1112        resp.close()
1113
1114    def test_readlines_content_length(self):
1115        extradata = b"extradata"
1116        expected = b"Hello123\r\n"
1117        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1118        resp = client.HTTPResponse(sock, method="GET")
1119        resp.begin()
1120        self.assertEqual(resp.readlines(2000), [expected])
1121        # the file should now have our extradata ready to be read
1122        self.assertEqual(sock.file.read(), extradata) #we read to the end
1123        resp.close()
1124
1125    def test_read1_content_length(self):
1126        extradata = b"extradata"
1127        expected = b"Hello123\r\n"
1128        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1129        resp = client.HTTPResponse(sock, method="GET")
1130        resp.begin()
1131        self.assertEqual(resp.read1(2000), expected)
1132        # the file should now have our extradata ready to be read
1133        self.assertEqual(sock.file.read(), extradata) #we read to the end
1134        resp.close()
1135
1136    def test_readline_bound_content_length(self):
1137        extradata = b"extradata"
1138        expected = b"Hello123\r\n"
1139        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1140        resp = client.HTTPResponse(sock, method="GET")
1141        resp.begin()
1142        self.assertEqual(resp.readline(10), expected)
1143        self.assertEqual(resp.readline(10), b"")
1144        # the file should now have our extradata ready to be read
1145        self.assertEqual(sock.file.read(), extradata) #we read to the end
1146        resp.close()
1147
1148    def test_read1_bound_content_length(self):
1149        extradata = b"extradata"
1150        expected = b"Hello123\r\n"
1151        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1152        resp = client.HTTPResponse(sock, method="GET")
1153        resp.begin()
1154        self.assertEqual(resp.read1(20), expected*2)
1155        self.assertEqual(resp.read(), expected)
1156        # the file should now have our extradata ready to be read
1157        self.assertEqual(sock.file.read(), extradata) #we read to the end
1158        resp.close()
1159
1160    def test_response_fileno(self):
1161        # Make sure fd returned by fileno is valid.
1162        serv = socket.create_server((HOST, 0))
1163        self.addCleanup(serv.close)
1164
1165        result = None
1166        def run_server():
1167            [conn, address] = serv.accept()
1168            with conn, conn.makefile("rb") as reader:
1169                # Read the request header until a blank line
1170                while True:
1171                    line = reader.readline()
1172                    if not line.rstrip(b"\r\n"):
1173                        break
1174                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1175                nonlocal result
1176                result = reader.read()
1177
1178        thread = threading.Thread(target=run_server)
1179        thread.start()
1180        self.addCleanup(thread.join, float(1))
1181        conn = client.HTTPConnection(*serv.getsockname())
1182        conn.request("CONNECT", "dummy:1234")
1183        response = conn.getresponse()
1184        try:
1185            self.assertEqual(response.status, client.OK)
1186            s = socket.socket(fileno=response.fileno())
1187            try:
1188                s.sendall(b"proxied data\n")
1189            finally:
1190                s.detach()
1191        finally:
1192            response.close()
1193            conn.close()
1194        thread.join()
1195        self.assertEqual(result, b"proxied data\n")
1196
1197    def test_putrequest_override_domain_validation(self):
1198        """
1199        It should be possible to override the default validation
1200        behavior in putrequest (bpo-38216).
1201        """
1202        class UnsafeHTTPConnection(client.HTTPConnection):
1203            def _validate_path(self, url):
1204                pass
1205
1206        conn = UnsafeHTTPConnection('example.com')
1207        conn.sock = FakeSocket('')
1208        conn.putrequest('GET', '/\x00')
1209
1210    def test_putrequest_override_host_validation(self):
1211        class UnsafeHTTPConnection(client.HTTPConnection):
1212            def _validate_host(self, url):
1213                pass
1214
1215        conn = UnsafeHTTPConnection('example.com\r\n')
1216        conn.sock = FakeSocket('')
1217        # set skip_host so a ValueError is not raised upon adding the
1218        # invalid URL as the value of the "Host:" header
1219        conn.putrequest('GET', '/', skip_host=1)
1220
1221    def test_putrequest_override_encoding(self):
1222        """
1223        It should be possible to override the default encoding
1224        to transmit bytes in another encoding even if invalid
1225        (bpo-36274).
1226        """
1227        class UnsafeHTTPConnection(client.HTTPConnection):
1228            def _encode_request(self, str_url):
1229                return str_url.encode('utf-8')
1230
1231        conn = UnsafeHTTPConnection('example.com')
1232        conn.sock = FakeSocket('')
1233        conn.putrequest('GET', '/☃')
1234
1235
1236class ExtendedReadTest(TestCase):
1237    """
1238    Test peek(), read1(), readline()
1239    """
1240    lines = (
1241        'HTTP/1.1 200 OK\r\n'
1242        '\r\n'
1243        'hello world!\n'
1244        'and now \n'
1245        'for something completely different\n'
1246        'foo'
1247        )
1248    lines_expected = lines[lines.find('hello'):].encode("ascii")
1249    lines_chunked = (
1250        'HTTP/1.1 200 OK\r\n'
1251        'Transfer-Encoding: chunked\r\n\r\n'
1252        'a\r\n'
1253        'hello worl\r\n'
1254        '3\r\n'
1255        'd!\n\r\n'
1256        '9\r\n'
1257        'and now \n\r\n'
1258        '23\r\n'
1259        'for something completely different\n\r\n'
1260        '3\r\n'
1261        'foo\r\n'
1262        '0\r\n' # terminating chunk
1263        '\r\n'  # end of trailers
1264    )
1265
1266    def setUp(self):
1267        sock = FakeSocket(self.lines)
1268        resp = client.HTTPResponse(sock, method="GET")
1269        resp.begin()
1270        resp.fp = io.BufferedReader(resp.fp)
1271        self.resp = resp
1272
1273
1274
1275    def test_peek(self):
1276        resp = self.resp
1277        # patch up the buffered peek so that it returns not too much stuff
1278        oldpeek = resp.fp.peek
1279        def mypeek(n=-1):
1280            p = oldpeek(n)
1281            if n >= 0:
1282                return p[:n]
1283            return p[:10]
1284        resp.fp.peek = mypeek
1285
1286        all = []
1287        while True:
1288            # try a short peek
1289            p = resp.peek(3)
1290            if p:
1291                self.assertGreater(len(p), 0)
1292                # then unbounded peek
1293                p2 = resp.peek()
1294                self.assertGreaterEqual(len(p2), len(p))
1295                self.assertTrue(p2.startswith(p))
1296                next = resp.read(len(p2))
1297                self.assertEqual(next, p2)
1298            else:
1299                next = resp.read()
1300                self.assertFalse(next)
1301            all.append(next)
1302            if not next:
1303                break
1304        self.assertEqual(b"".join(all), self.lines_expected)
1305
1306    def test_readline(self):
1307        resp = self.resp
1308        self._verify_readline(self.resp.readline, self.lines_expected)
1309
1310    def _verify_readline(self, readline, expected):
1311        all = []
1312        while True:
1313            # short readlines
1314            line = readline(5)
1315            if line and line != b"foo":
1316                if len(line) < 5:
1317                    self.assertTrue(line.endswith(b"\n"))
1318            all.append(line)
1319            if not line:
1320                break
1321        self.assertEqual(b"".join(all), expected)
1322
1323    def test_read1(self):
1324        resp = self.resp
1325        def r():
1326            res = resp.read1(4)
1327            self.assertLessEqual(len(res), 4)
1328            return res
1329        readliner = Readliner(r)
1330        self._verify_readline(readliner.readline, self.lines_expected)
1331
1332    def test_read1_unbounded(self):
1333        resp = self.resp
1334        all = []
1335        while True:
1336            data = resp.read1()
1337            if not data:
1338                break
1339            all.append(data)
1340        self.assertEqual(b"".join(all), self.lines_expected)
1341
1342    def test_read1_bounded(self):
1343        resp = self.resp
1344        all = []
1345        while True:
1346            data = resp.read1(10)
1347            if not data:
1348                break
1349            self.assertLessEqual(len(data), 10)
1350            all.append(data)
1351        self.assertEqual(b"".join(all), self.lines_expected)
1352
1353    def test_read1_0(self):
1354        self.assertEqual(self.resp.read1(0), b"")
1355
1356    def test_peek_0(self):
1357        p = self.resp.peek(0)
1358        self.assertLessEqual(0, len(p))
1359
1360
1361class ExtendedReadTestChunked(ExtendedReadTest):
1362    """
1363    Test peek(), read1(), readline() in chunked mode
1364    """
1365    lines = (
1366        'HTTP/1.1 200 OK\r\n'
1367        'Transfer-Encoding: chunked\r\n\r\n'
1368        'a\r\n'
1369        'hello worl\r\n'
1370        '3\r\n'
1371        'd!\n\r\n'
1372        '9\r\n'
1373        'and now \n\r\n'
1374        '23\r\n'
1375        'for something completely different\n\r\n'
1376        '3\r\n'
1377        'foo\r\n'
1378        '0\r\n' # terminating chunk
1379        '\r\n'  # end of trailers
1380    )
1381
1382
1383class Readliner:
1384    """
1385    a simple readline class that uses an arbitrary read function and buffering
1386    """
1387    def __init__(self, readfunc):
1388        self.readfunc = readfunc
1389        self.remainder = b""
1390
1391    def readline(self, limit):
1392        data = []
1393        datalen = 0
1394        read = self.remainder
1395        try:
1396            while True:
1397                idx = read.find(b'\n')
1398                if idx != -1:
1399                    break
1400                if datalen + len(read) >= limit:
1401                    idx = limit - datalen - 1
1402                # read more data
1403                data.append(read)
1404                read = self.readfunc()
1405                if not read:
1406                    idx = 0 #eof condition
1407                    break
1408            idx += 1
1409            data.append(read[:idx])
1410            self.remainder = read[idx:]
1411            return b"".join(data)
1412        except:
1413            self.remainder = b"".join(data)
1414            raise
1415
1416
1417class OfflineTest(TestCase):
1418    def test_all(self):
1419        # Documented objects defined in the module should be in __all__
1420        expected = {"responses"}  # Allowlist documented dict() object
1421        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1422        # intentionally omitted for simplicity
1423        blacklist = {"HTTPMessage", "parse_headers"}
1424        for name in dir(client):
1425            if name.startswith("_") or name in blacklist:
1426                continue
1427            module_object = getattr(client, name)
1428            if getattr(module_object, "__module__", None) == "http.client":
1429                expected.add(name)
1430        self.assertCountEqual(client.__all__, expected)
1431
1432    def test_responses(self):
1433        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1434
1435    def test_client_constants(self):
1436        # Make sure we don't break backward compatibility with 3.4
1437        expected = [
1438            'CONTINUE',
1439            'SWITCHING_PROTOCOLS',
1440            'PROCESSING',
1441            'OK',
1442            'CREATED',
1443            'ACCEPTED',
1444            'NON_AUTHORITATIVE_INFORMATION',
1445            'NO_CONTENT',
1446            'RESET_CONTENT',
1447            'PARTIAL_CONTENT',
1448            'MULTI_STATUS',
1449            'IM_USED',
1450            'MULTIPLE_CHOICES',
1451            'MOVED_PERMANENTLY',
1452            'FOUND',
1453            'SEE_OTHER',
1454            'NOT_MODIFIED',
1455            'USE_PROXY',
1456            'TEMPORARY_REDIRECT',
1457            'BAD_REQUEST',
1458            'UNAUTHORIZED',
1459            'PAYMENT_REQUIRED',
1460            'FORBIDDEN',
1461            'NOT_FOUND',
1462            'METHOD_NOT_ALLOWED',
1463            'NOT_ACCEPTABLE',
1464            'PROXY_AUTHENTICATION_REQUIRED',
1465            'REQUEST_TIMEOUT',
1466            'CONFLICT',
1467            'GONE',
1468            'LENGTH_REQUIRED',
1469            'PRECONDITION_FAILED',
1470            'REQUEST_ENTITY_TOO_LARGE',
1471            'REQUEST_URI_TOO_LONG',
1472            'UNSUPPORTED_MEDIA_TYPE',
1473            'REQUESTED_RANGE_NOT_SATISFIABLE',
1474            'EXPECTATION_FAILED',
1475            'MISDIRECTED_REQUEST',
1476            'UNPROCESSABLE_ENTITY',
1477            'LOCKED',
1478            'FAILED_DEPENDENCY',
1479            'UPGRADE_REQUIRED',
1480            'PRECONDITION_REQUIRED',
1481            'TOO_MANY_REQUESTS',
1482            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1483            'UNAVAILABLE_FOR_LEGAL_REASONS',
1484            'INTERNAL_SERVER_ERROR',
1485            'NOT_IMPLEMENTED',
1486            'BAD_GATEWAY',
1487            'SERVICE_UNAVAILABLE',
1488            'GATEWAY_TIMEOUT',
1489            'HTTP_VERSION_NOT_SUPPORTED',
1490            'INSUFFICIENT_STORAGE',
1491            'NOT_EXTENDED',
1492            'NETWORK_AUTHENTICATION_REQUIRED',
1493        ]
1494        for const in expected:
1495            with self.subTest(constant=const):
1496                self.assertTrue(hasattr(client, const))
1497
1498
1499class SourceAddressTest(TestCase):
1500    def setUp(self):
1501        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1502        self.port = support.bind_port(self.serv)
1503        self.source_port = support.find_unused_port()
1504        self.serv.listen()
1505        self.conn = None
1506
1507    def tearDown(self):
1508        if self.conn:
1509            self.conn.close()
1510            self.conn = None
1511        self.serv.close()
1512        self.serv = None
1513
1514    def testHTTPConnectionSourceAddress(self):
1515        self.conn = client.HTTPConnection(HOST, self.port,
1516                source_address=('', self.source_port))
1517        self.conn.connect()
1518        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1519
1520    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1521                     'http.client.HTTPSConnection not defined')
1522    def testHTTPSConnectionSourceAddress(self):
1523        self.conn = client.HTTPSConnection(HOST, self.port,
1524                source_address=('', self.source_port))
1525        # We don't test anything here other than the constructor not barfing as
1526        # this code doesn't deal with setting up an active running SSL server
1527        # for an ssl_wrapped connect() to actually return from.
1528
1529
1530class TimeoutTest(TestCase):
1531    PORT = None
1532
1533    def setUp(self):
1534        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1535        TimeoutTest.PORT = support.bind_port(self.serv)
1536        self.serv.listen()
1537
1538    def tearDown(self):
1539        self.serv.close()
1540        self.serv = None
1541
1542    def testTimeoutAttribute(self):
1543        # This will prove that the timeout gets through HTTPConnection
1544        # and into the socket.
1545
1546        # default -- use global socket timeout
1547        self.assertIsNone(socket.getdefaulttimeout())
1548        socket.setdefaulttimeout(30)
1549        try:
1550            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1551            httpConn.connect()
1552        finally:
1553            socket.setdefaulttimeout(None)
1554        self.assertEqual(httpConn.sock.gettimeout(), 30)
1555        httpConn.close()
1556
1557        # no timeout -- do not use global socket default
1558        self.assertIsNone(socket.getdefaulttimeout())
1559        socket.setdefaulttimeout(30)
1560        try:
1561            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1562                                              timeout=None)
1563            httpConn.connect()
1564        finally:
1565            socket.setdefaulttimeout(None)
1566        self.assertEqual(httpConn.sock.gettimeout(), None)
1567        httpConn.close()
1568
1569        # a value
1570        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1571        httpConn.connect()
1572        self.assertEqual(httpConn.sock.gettimeout(), 30)
1573        httpConn.close()
1574
1575
1576class PersistenceTest(TestCase):
1577
1578    def test_reuse_reconnect(self):
1579        # Should reuse or reconnect depending on header from server
1580        tests = (
1581            ('1.0', '', False),
1582            ('1.0', 'Connection: keep-alive\r\n', True),
1583            ('1.1', '', True),
1584            ('1.1', 'Connection: close\r\n', False),
1585            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1586            ('1.1', 'Connection: cloSE\r\n', False),
1587        )
1588        for version, header, reuse in tests:
1589            with self.subTest(version=version, header=header):
1590                msg = (
1591                    'HTTP/{} 200 OK\r\n'
1592                    '{}'
1593                    'Content-Length: 12\r\n'
1594                    '\r\n'
1595                    'Dummy body\r\n'
1596                ).format(version, header)
1597                conn = FakeSocketHTTPConnection(msg)
1598                self.assertIsNone(conn.sock)
1599                conn.request('GET', '/open-connection')
1600                with conn.getresponse() as response:
1601                    self.assertEqual(conn.sock is None, not reuse)
1602                    response.read()
1603                self.assertEqual(conn.sock is None, not reuse)
1604                self.assertEqual(conn.connections, 1)
1605                conn.request('GET', '/subsequent-request')
1606                self.assertEqual(conn.connections, 1 if reuse else 2)
1607
1608    def test_disconnected(self):
1609
1610        def make_reset_reader(text):
1611            """Return BufferedReader that raises ECONNRESET at EOF"""
1612            stream = io.BytesIO(text)
1613            def readinto(buffer):
1614                size = io.BytesIO.readinto(stream, buffer)
1615                if size == 0:
1616                    raise ConnectionResetError()
1617                return size
1618            stream.readinto = readinto
1619            return io.BufferedReader(stream)
1620
1621        tests = (
1622            (io.BytesIO, client.RemoteDisconnected),
1623            (make_reset_reader, ConnectionResetError),
1624        )
1625        for stream_factory, exception in tests:
1626            with self.subTest(exception=exception):
1627                conn = FakeSocketHTTPConnection(b'', stream_factory)
1628                conn.request('GET', '/eof-response')
1629                self.assertRaises(exception, conn.getresponse)
1630                self.assertIsNone(conn.sock)
1631                # HTTPConnection.connect() should be automatically invoked
1632                conn.request('GET', '/reconnect')
1633                self.assertEqual(conn.connections, 2)
1634
1635    def test_100_close(self):
1636        conn = FakeSocketHTTPConnection(
1637            b'HTTP/1.1 100 Continue\r\n'
1638            b'\r\n'
1639            # Missing final response
1640        )
1641        conn.request('GET', '/', headers={'Expect': '100-continue'})
1642        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1643        self.assertIsNone(conn.sock)
1644        conn.request('GET', '/reconnect')
1645        self.assertEqual(conn.connections, 2)
1646
1647
1648class HTTPSTest(TestCase):
1649
1650    def setUp(self):
1651        if not hasattr(client, 'HTTPSConnection'):
1652            self.skipTest('ssl support required')
1653
1654    def make_server(self, certfile):
1655        from test.ssl_servers import make_https_server
1656        return make_https_server(self, certfile=certfile)
1657
1658    def test_attributes(self):
1659        # simple test to check it's storing the timeout
1660        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1661        self.assertEqual(h.timeout, 30)
1662
1663    def test_networked(self):
1664        # Default settings: requires a valid cert from a trusted CA
1665        import ssl
1666        support.requires('network')
1667        with support.transient_internet('self-signed.pythontest.net'):
1668            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1669            with self.assertRaises(ssl.SSLError) as exc_info:
1670                h.request('GET', '/')
1671            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1672
1673    def test_networked_noverification(self):
1674        # Switch off cert verification
1675        import ssl
1676        support.requires('network')
1677        with support.transient_internet('self-signed.pythontest.net'):
1678            context = ssl._create_unverified_context()
1679            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1680                                       context=context)
1681            h.request('GET', '/')
1682            resp = h.getresponse()
1683            h.close()
1684            self.assertIn('nginx', resp.getheader('server'))
1685            resp.close()
1686
1687    @support.system_must_validate_cert
1688    def test_networked_trusted_by_default_cert(self):
1689        # Default settings: requires a valid cert from a trusted CA
1690        support.requires('network')
1691        with support.transient_internet('www.python.org'):
1692            h = client.HTTPSConnection('www.python.org', 443)
1693            h.request('GET', '/')
1694            resp = h.getresponse()
1695            content_type = resp.getheader('content-type')
1696            resp.close()
1697            h.close()
1698            self.assertIn('text/html', content_type)
1699
1700    def test_networked_good_cert(self):
1701        # We feed the server's cert as a validating cert
1702        import ssl
1703        support.requires('network')
1704        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1705        with support.transient_internet(selfsigned_pythontestdotnet):
1706            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1707            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1708            self.assertEqual(context.check_hostname, True)
1709            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1710            try:
1711                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1712                                           context=context)
1713                h.request('GET', '/')
1714                resp = h.getresponse()
1715            except ssl.SSLError as ssl_err:
1716                ssl_err_str = str(ssl_err)
1717                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1718                # modern Linux distros (Debian Buster, etc) default OpenSSL
1719                # configurations it'll fail saying "key too weak" until we
1720                # address https://bugs.python.org/issue36816 to use a proper
1721                # key size on self-signed.pythontest.net.
1722                if re.search(r'(?i)key.too.weak', ssl_err_str):
1723                    raise unittest.SkipTest(
1724                        f'Got {ssl_err_str} trying to connect '
1725                        f'to {selfsigned_pythontestdotnet}. '
1726                        'See https://bugs.python.org/issue36816.')
1727                raise
1728            server_string = resp.getheader('server')
1729            resp.close()
1730            h.close()
1731            self.assertIn('nginx', server_string)
1732
1733    def test_networked_bad_cert(self):
1734        # We feed a "CA" cert that is unrelated to the server's cert
1735        import ssl
1736        support.requires('network')
1737        with support.transient_internet('self-signed.pythontest.net'):
1738            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1739            context.load_verify_locations(CERT_localhost)
1740            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1741            with self.assertRaises(ssl.SSLError) as exc_info:
1742                h.request('GET', '/')
1743            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1744
1745    def test_local_unknown_cert(self):
1746        # The custom cert isn't known to the default trust bundle
1747        import ssl
1748        server = self.make_server(CERT_localhost)
1749        h = client.HTTPSConnection('localhost', server.port)
1750        with self.assertRaises(ssl.SSLError) as exc_info:
1751            h.request('GET', '/')
1752        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1753
1754    def test_local_good_hostname(self):
1755        # The (valid) cert validates the HTTP hostname
1756        import ssl
1757        server = self.make_server(CERT_localhost)
1758        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1759        context.load_verify_locations(CERT_localhost)
1760        h = client.HTTPSConnection('localhost', server.port, context=context)
1761        self.addCleanup(h.close)
1762        h.request('GET', '/nonexistent')
1763        resp = h.getresponse()
1764        self.addCleanup(resp.close)
1765        self.assertEqual(resp.status, 404)
1766
1767    def test_local_bad_hostname(self):
1768        # The (valid) cert doesn't validate the HTTP hostname
1769        import ssl
1770        server = self.make_server(CERT_fakehostname)
1771        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1772        context.load_verify_locations(CERT_fakehostname)
1773        h = client.HTTPSConnection('localhost', server.port, context=context)
1774        with self.assertRaises(ssl.CertificateError):
1775            h.request('GET', '/')
1776        # Same with explicit check_hostname=True
1777        with support.check_warnings(('', DeprecationWarning)):
1778            h = client.HTTPSConnection('localhost', server.port,
1779                                       context=context, check_hostname=True)
1780        with self.assertRaises(ssl.CertificateError):
1781            h.request('GET', '/')
1782        # With check_hostname=False, the mismatching is ignored
1783        context.check_hostname = False
1784        with support.check_warnings(('', DeprecationWarning)):
1785            h = client.HTTPSConnection('localhost', server.port,
1786                                       context=context, check_hostname=False)
1787        h.request('GET', '/nonexistent')
1788        resp = h.getresponse()
1789        resp.close()
1790        h.close()
1791        self.assertEqual(resp.status, 404)
1792        # The context's check_hostname setting is used if one isn't passed to
1793        # HTTPSConnection.
1794        context.check_hostname = False
1795        h = client.HTTPSConnection('localhost', server.port, context=context)
1796        h.request('GET', '/nonexistent')
1797        resp = h.getresponse()
1798        self.assertEqual(resp.status, 404)
1799        resp.close()
1800        h.close()
1801        # Passing check_hostname to HTTPSConnection should override the
1802        # context's setting.
1803        with support.check_warnings(('', DeprecationWarning)):
1804            h = client.HTTPSConnection('localhost', server.port,
1805                                       context=context, check_hostname=True)
1806        with self.assertRaises(ssl.CertificateError):
1807            h.request('GET', '/')
1808
1809    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1810                     'http.client.HTTPSConnection not available')
1811    def test_host_port(self):
1812        # Check invalid host_port
1813
1814        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1815            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1816
1817        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1818                          "fe80::207:e9ff:fe9b", 8000),
1819                         ("www.python.org:443", "www.python.org", 443),
1820                         ("www.python.org:", "www.python.org", 443),
1821                         ("www.python.org", "www.python.org", 443),
1822                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1823                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1824                             443)):
1825            c = client.HTTPSConnection(hp)
1826            self.assertEqual(h, c.host)
1827            self.assertEqual(p, c.port)
1828
1829    def test_tls13_pha(self):
1830        import ssl
1831        if not ssl.HAS_TLSv1_3:
1832            self.skipTest('TLS 1.3 support required')
1833        # just check status of PHA flag
1834        h = client.HTTPSConnection('localhost', 443)
1835        self.assertTrue(h._context.post_handshake_auth)
1836
1837        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1838        self.assertFalse(context.post_handshake_auth)
1839        h = client.HTTPSConnection('localhost', 443, context=context)
1840        self.assertIs(h._context, context)
1841        self.assertFalse(h._context.post_handshake_auth)
1842
1843        with warnings.catch_warnings():
1844            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1845                                    DeprecationWarning)
1846            h = client.HTTPSConnection('localhost', 443, context=context,
1847                                       cert_file=CERT_localhost)
1848        self.assertTrue(h._context.post_handshake_auth)
1849
1850
1851class RequestBodyTest(TestCase):
1852    """Test cases where a request includes a message body."""
1853
1854    def setUp(self):
1855        self.conn = client.HTTPConnection('example.com')
1856        self.conn.sock = self.sock = FakeSocket("")
1857        self.conn.sock = self.sock
1858
1859    def get_headers_and_fp(self):
1860        f = io.BytesIO(self.sock.data)
1861        f.readline()  # read the request line
1862        message = client.parse_headers(f)
1863        return message, f
1864
1865    def test_list_body(self):
1866        # Note that no content-length is automatically calculated for
1867        # an iterable.  The request will fall back to send chunked
1868        # transfer encoding.
1869        cases = (
1870            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1871            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1872        )
1873        for body, expected in cases:
1874            with self.subTest(body):
1875                self.conn = client.HTTPConnection('example.com')
1876                self.conn.sock = self.sock = FakeSocket('')
1877
1878                self.conn.request('PUT', '/url', body)
1879                msg, f = self.get_headers_and_fp()
1880                self.assertNotIn('Content-Type', msg)
1881                self.assertNotIn('Content-Length', msg)
1882                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1883                self.assertEqual(expected, f.read())
1884
1885    def test_manual_content_length(self):
1886        # Set an incorrect content-length so that we can verify that
1887        # it will not be over-ridden by the library.
1888        self.conn.request("PUT", "/url", "body",
1889                          {"Content-Length": "42"})
1890        message, f = self.get_headers_and_fp()
1891        self.assertEqual("42", message.get("content-length"))
1892        self.assertEqual(4, len(f.read()))
1893
1894    def test_ascii_body(self):
1895        self.conn.request("PUT", "/url", "body")
1896        message, f = self.get_headers_and_fp()
1897        self.assertEqual("text/plain", message.get_content_type())
1898        self.assertIsNone(message.get_charset())
1899        self.assertEqual("4", message.get("content-length"))
1900        self.assertEqual(b'body', f.read())
1901
1902    def test_latin1_body(self):
1903        self.conn.request("PUT", "/url", "body\xc1")
1904        message, f = self.get_headers_and_fp()
1905        self.assertEqual("text/plain", message.get_content_type())
1906        self.assertIsNone(message.get_charset())
1907        self.assertEqual("5", message.get("content-length"))
1908        self.assertEqual(b'body\xc1', f.read())
1909
1910    def test_bytes_body(self):
1911        self.conn.request("PUT", "/url", b"body\xc1")
1912        message, f = self.get_headers_and_fp()
1913        self.assertEqual("text/plain", message.get_content_type())
1914        self.assertIsNone(message.get_charset())
1915        self.assertEqual("5", message.get("content-length"))
1916        self.assertEqual(b'body\xc1', f.read())
1917
1918    def test_text_file_body(self):
1919        self.addCleanup(support.unlink, support.TESTFN)
1920        with open(support.TESTFN, "w") as f:
1921            f.write("body")
1922        with open(support.TESTFN) as f:
1923            self.conn.request("PUT", "/url", f)
1924            message, f = self.get_headers_and_fp()
1925            self.assertEqual("text/plain", message.get_content_type())
1926            self.assertIsNone(message.get_charset())
1927            # No content-length will be determined for files; the body
1928            # will be sent using chunked transfer encoding instead.
1929            self.assertIsNone(message.get("content-length"))
1930            self.assertEqual("chunked", message.get("transfer-encoding"))
1931            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1932
1933    def test_binary_file_body(self):
1934        self.addCleanup(support.unlink, support.TESTFN)
1935        with open(support.TESTFN, "wb") as f:
1936            f.write(b"body\xc1")
1937        with open(support.TESTFN, "rb") as f:
1938            self.conn.request("PUT", "/url", f)
1939            message, f = self.get_headers_and_fp()
1940            self.assertEqual("text/plain", message.get_content_type())
1941            self.assertIsNone(message.get_charset())
1942            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1943            self.assertNotIn("Content-Length", message)
1944            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1945
1946
1947class HTTPResponseTest(TestCase):
1948
1949    def setUp(self):
1950        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1951                second-value\r\n\r\nText"
1952        sock = FakeSocket(body)
1953        self.resp = client.HTTPResponse(sock)
1954        self.resp.begin()
1955
1956    def test_getting_header(self):
1957        header = self.resp.getheader('My-Header')
1958        self.assertEqual(header, 'first-value, second-value')
1959
1960        header = self.resp.getheader('My-Header', 'some default')
1961        self.assertEqual(header, 'first-value, second-value')
1962
1963    def test_getting_nonexistent_header_with_string_default(self):
1964        header = self.resp.getheader('No-Such-Header', 'default-value')
1965        self.assertEqual(header, 'default-value')
1966
1967    def test_getting_nonexistent_header_with_iterable_default(self):
1968        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1969        self.assertEqual(header, 'default, values')
1970
1971        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1972        self.assertEqual(header, 'default, values')
1973
1974    def test_getting_nonexistent_header_without_default(self):
1975        header = self.resp.getheader('No-Such-Header')
1976        self.assertEqual(header, None)
1977
1978    def test_getting_header_defaultint(self):
1979        header = self.resp.getheader('No-Such-Header',default=42)
1980        self.assertEqual(header, 42)
1981
1982class TunnelTests(TestCase):
1983    def setUp(self):
1984        response_text = (
1985            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1986            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1987            'Content-Length: 42\r\n\r\n'
1988        )
1989        self.host = 'proxy.com'
1990        self.conn = client.HTTPConnection(self.host)
1991        self.conn._create_connection = self._create_connection(response_text)
1992
1993    def tearDown(self):
1994        self.conn.close()
1995
1996    def _create_connection(self, response_text):
1997        def create_connection(address, timeout=None, source_address=None):
1998            return FakeSocket(response_text, host=address[0], port=address[1])
1999        return create_connection
2000
2001    def test_set_tunnel_host_port_headers(self):
2002        tunnel_host = 'destination.com'
2003        tunnel_port = 8888
2004        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
2005        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
2006                             headers=tunnel_headers)
2007        self.conn.request('HEAD', '/', '')
2008        self.assertEqual(self.conn.sock.host, self.host)
2009        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2010        self.assertEqual(self.conn._tunnel_host, tunnel_host)
2011        self.assertEqual(self.conn._tunnel_port, tunnel_port)
2012        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2013
2014    def test_disallow_set_tunnel_after_connect(self):
2015        # Once connected, we shouldn't be able to tunnel anymore
2016        self.conn.connect()
2017        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2018                          'destination.com')
2019
2020    def test_connect_with_tunnel(self):
2021        self.conn.set_tunnel('destination.com')
2022        self.conn.request('HEAD', '/', '')
2023        self.assertEqual(self.conn.sock.host, self.host)
2024        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2025        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2026        # issue22095
2027        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2028        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2029
2030        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2031        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2032
2033    def test_connect_put_request(self):
2034        self.conn.set_tunnel('destination.com')
2035        self.conn.request('PUT', '/', '')
2036        self.assertEqual(self.conn.sock.host, self.host)
2037        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2038        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2039        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2040
2041    def test_tunnel_debuglog(self):
2042        expected_header = 'X-Dummy: 1'
2043        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2044
2045        self.conn.set_debuglevel(1)
2046        self.conn._create_connection = self._create_connection(response_text)
2047        self.conn.set_tunnel('destination.com')
2048
2049        with support.captured_stdout() as output:
2050            self.conn.request('PUT', '/', '')
2051        lines = output.getvalue().splitlines()
2052        self.assertIn('header: {}'.format(expected_header), lines)
2053
2054
2055if __name__ == '__main__':
2056    unittest.main(verbosity=2)
2057