1import httplib
2import itertools
3import array
4import StringIO
5import socket
6import errno
7import os
8import tempfile
9
10import unittest
11TestCase = unittest.TestCase
12
13from test import test_support
14
15here = os.path.dirname(__file__)
16# Self-signed cert file for 'localhost'
17CERT_localhost = os.path.join(here, 'keycert.pem')
18# Self-signed cert file for 'fakehostname'
19CERT_fakehostname = os.path.join(here, 'keycert2.pem')
20# Self-signed cert file for self-signed.pythontest.net
21CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
22
23HOST = test_support.HOST
24
25class FakeSocket:
26    def __init__(self, text, fileclass=StringIO.StringIO, host=None, port=None):
27        self.text = text
28        self.fileclass = fileclass
29        self.data = ''
30        self.file_closed = False
31        self.host = host
32        self.port = port
33
34    def sendall(self, data):
35        self.data += ''.join(data)
36
37    def makefile(self, mode, bufsize=None):
38        if mode != 'r' and mode != 'rb':
39            raise httplib.UnimplementedFileMode()
40        # keep the file around so we can check how much was read from it
41        self.file = self.fileclass(self.text)
42        self.file.close = self.file_close #nerf close ()
43        return self.file
44
45    def file_close(self):
46        self.file_closed = True
47
48    def close(self):
49        pass
50
51class EPipeSocket(FakeSocket):
52
53    def __init__(self, text, pipe_trigger):
54        # When sendall() is called with pipe_trigger, raise EPIPE.
55        FakeSocket.__init__(self, text)
56        self.pipe_trigger = pipe_trigger
57
58    def sendall(self, data):
59        if self.pipe_trigger in data:
60            raise socket.error(errno.EPIPE, "gotcha")
61        self.data += data
62
63    def close(self):
64        pass
65
66class NoEOFStringIO(StringIO.StringIO):
67    """Like StringIO, but raises AssertionError on EOF.
68
69    This is used below to test that httplib doesn't try to read
70    more from the underlying file than it should.
71    """
72    def read(self, n=-1):
73        data = StringIO.StringIO.read(self, n)
74        if data == '':
75            raise AssertionError('caller tried to read past EOF')
76        return data
77
78    def readline(self, length=None):
79        data = StringIO.StringIO.readline(self, length)
80        if data == '':
81            raise AssertionError('caller tried to read past EOF')
82        return data
83
84
85class HeaderTests(TestCase):
86    def test_auto_headers(self):
87        # Some headers are added automatically, but should not be added by
88        # .request() if they are explicitly set.
89
90        class HeaderCountingBuffer(list):
91            def __init__(self):
92                self.count = {}
93            def append(self, item):
94                kv = item.split(':')
95                if len(kv) > 1:
96                    # item is a 'Key: Value' header string
97                    lcKey = kv[0].lower()
98                    self.count.setdefault(lcKey, 0)
99                    self.count[lcKey] += 1
100                list.append(self, item)
101
102        for explicit_header in True, False:
103            for header in 'Content-length', 'Host', 'Accept-encoding':
104                conn = httplib.HTTPConnection('example.com')
105                conn.sock = FakeSocket('blahblahblah')
106                conn._buffer = HeaderCountingBuffer()
107
108                body = 'spamspamspam'
109                headers = {}
110                if explicit_header:
111                    headers[header] = str(len(body))
112                conn.request('POST', '/', body, headers)
113                self.assertEqual(conn._buffer.count[header.lower()], 1)
114
115    def test_content_length_0(self):
116
117        class ContentLengthChecker(list):
118            def __init__(self):
119                list.__init__(self)
120                self.content_length = None
121            def append(self, item):
122                kv = item.split(':', 1)
123                if len(kv) > 1 and kv[0].lower() == 'content-length':
124                    self.content_length = kv[1].strip()
125                list.append(self, item)
126
127        # Here, we're testing that methods expecting a body get a
128        # content-length set to zero if the body is empty (either None or '')
129        bodies = (None, '')
130        methods_with_body = ('PUT', 'POST', 'PATCH')
131        for method, body in itertools.product(methods_with_body, bodies):
132            conn = httplib.HTTPConnection('example.com')
133            conn.sock = FakeSocket(None)
134            conn._buffer = ContentLengthChecker()
135            conn.request(method, '/', body)
136            self.assertEqual(
137                conn._buffer.content_length, '0',
138                'Header Content-Length incorrect on {}'.format(method)
139            )
140
141        # For these methods, we make sure that content-length is not set when
142        # the body is None because it might cause unexpected behaviour on the
143        # server.
144        methods_without_body = (
145             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
146        )
147        for method in methods_without_body:
148            conn = httplib.HTTPConnection('example.com')
149            conn.sock = FakeSocket(None)
150            conn._buffer = ContentLengthChecker()
151            conn.request(method, '/', None)
152            self.assertEqual(
153                conn._buffer.content_length, None,
154                'Header Content-Length set for empty body on {}'.format(method)
155            )
156
157        # If the body is set to '', that's considered to be "present but
158        # empty" rather than "missing", so content length would be set, even
159        # for methods that don't expect a body.
160        for method in methods_without_body:
161            conn = httplib.HTTPConnection('example.com')
162            conn.sock = FakeSocket(None)
163            conn._buffer = ContentLengthChecker()
164            conn.request(method, '/', '')
165            self.assertEqual(
166                conn._buffer.content_length, '0',
167                'Header Content-Length incorrect on {}'.format(method)
168            )
169
170        # If the body is set, make sure Content-Length is set.
171        for method in itertools.chain(methods_without_body, methods_with_body):
172            conn = httplib.HTTPConnection('example.com')
173            conn.sock = FakeSocket(None)
174            conn._buffer = ContentLengthChecker()
175            conn.request(method, '/', ' ')
176            self.assertEqual(
177                conn._buffer.content_length, '1',
178                'Header Content-Length incorrect on {}'.format(method)
179            )
180
181    def test_putheader(self):
182        conn = httplib.HTTPConnection('example.com')
183        conn.sock = FakeSocket(None)
184        conn.putrequest('GET','/')
185        conn.putheader('Content-length',42)
186        self.assertIn('Content-length: 42', conn._buffer)
187
188        conn.putheader('Foo', ' bar ')
189        self.assertIn(b'Foo:  bar ', conn._buffer)
190        conn.putheader('Bar', '\tbaz\t')
191        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
192        conn.putheader('Authorization', 'Bearer mytoken')
193        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
194        conn.putheader('IterHeader', 'IterA', 'IterB')
195        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
196        conn.putheader('LatinHeader', b'\xFF')
197        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
198        conn.putheader('Utf8Header', b'\xc3\x80')
199        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
200        conn.putheader('C1-Control', b'next\x85line')
201        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
202        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
203        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
204        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
205        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
206        conn.putheader('Key Space', 'value')
207        self.assertIn(b'Key Space: value', conn._buffer)
208        conn.putheader('KeySpace ', 'value')
209        self.assertIn(b'KeySpace : value', conn._buffer)
210        conn.putheader(b'Nonbreak\xa0Space', 'value')
211        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
212        conn.putheader(b'\xa0NonbreakSpace', 'value')
213        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
214
215    def test_ipv6host_header(self):
216        # Default host header on IPv6 transaction should be wrapped by [] if
217        # it is an IPv6 address
218        expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
219                   'Accept-Encoding: identity\r\n\r\n'
220        conn = httplib.HTTPConnection('[2001::]:81')
221        sock = FakeSocket('')
222        conn.sock = sock
223        conn.request('GET', '/foo')
224        self.assertTrue(sock.data.startswith(expected))
225
226        expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
227                   'Accept-Encoding: identity\r\n\r\n'
228        conn = httplib.HTTPConnection('[2001:102A::]')
229        sock = FakeSocket('')
230        conn.sock = sock
231        conn.request('GET', '/foo')
232        self.assertTrue(sock.data.startswith(expected))
233
234    def test_malformed_headers_coped_with(self):
235        # Issue 19996
236        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
237        sock = FakeSocket(body)
238        resp = httplib.HTTPResponse(sock)
239        resp.begin()
240
241        self.assertEqual(resp.getheader('First'), 'val')
242        self.assertEqual(resp.getheader('Second'), 'val')
243
244    def test_malformed_truncation(self):
245        # Other malformed header lines, especially without colons, used to
246        # cause the rest of the header section to be truncated
247        resp = (
248            b'HTTP/1.1 200 OK\r\n'
249            b'Public-Key-Pins: \n'
250            b'pin-sha256="xxx=";\n'
251            b'report-uri="https://..."\r\n'
252            b'Transfer-Encoding: chunked\r\n'
253            b'\r\n'
254            b'4\r\nbody\r\n0\r\n\r\n'
255        )
256        resp = httplib.HTTPResponse(FakeSocket(resp))
257        resp.begin()
258        self.assertIsNotNone(resp.getheader('Public-Key-Pins'))
259        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
260        self.assertEqual(resp.read(), b'body')
261
262    def test_blank_line_forms(self):
263        # Test that both CRLF and LF blank lines can terminate the header
264        # section and start the body
265        for blank in (b'\r\n', b'\n'):
266            resp = b'HTTP/1.1 200 OK\r\n' b'Transfer-Encoding: chunked\r\n'
267            resp += blank
268            resp += b'4\r\nbody\r\n0\r\n\r\n'
269            resp = httplib.HTTPResponse(FakeSocket(resp))
270            resp.begin()
271            self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
272            self.assertEqual(resp.read(), b'body')
273
274            resp = b'HTTP/1.0 200 OK\r\n' + blank + b'body'
275            resp = httplib.HTTPResponse(FakeSocket(resp))
276            resp.begin()
277            self.assertEqual(resp.read(), b'body')
278
279        # A blank line ending in CR is not treated as the end of the HTTP
280        # header section, therefore header fields following it should be
281        # parsed if possible
282        resp = (
283            b'HTTP/1.1 200 OK\r\n'
284            b'\r'
285            b'Name: value\r\n'
286            b'Transfer-Encoding: chunked\r\n'
287            b'\r\n'
288            b'4\r\nbody\r\n0\r\n\r\n'
289        )
290        resp = httplib.HTTPResponse(FakeSocket(resp))
291        resp.begin()
292        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
293        self.assertEqual(resp.read(), b'body')
294
295        # No header fields nor blank line
296        resp = b'HTTP/1.0 200 OK\r\n'
297        resp = httplib.HTTPResponse(FakeSocket(resp))
298        resp.begin()
299        self.assertEqual(resp.read(), b'')
300
301    def test_from_line(self):
302        # The parser handles "From" lines specially, so test this does not
303        # affect parsing the rest of the header section
304        resp = (
305            b'HTTP/1.1 200 OK\r\n'
306            b'From start\r\n'
307            b' continued\r\n'
308            b'Name: value\r\n'
309            b'From middle\r\n'
310            b' continued\r\n'
311            b'Transfer-Encoding: chunked\r\n'
312            b'From end\r\n'
313            b'\r\n'
314            b'4\r\nbody\r\n0\r\n\r\n'
315        )
316        resp = httplib.HTTPResponse(FakeSocket(resp))
317        resp.begin()
318        self.assertIsNotNone(resp.getheader('Name'))
319        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
320        self.assertEqual(resp.read(), b'body')
321
322        resp = (
323            b'HTTP/1.0 200 OK\r\n'
324            b'From alone\r\n'
325            b'\r\n'
326            b'body'
327        )
328        resp = httplib.HTTPResponse(FakeSocket(resp))
329        resp.begin()
330        self.assertEqual(resp.read(), b'body')
331
332    def test_parse_all_octets(self):
333        # Ensure no valid header field octet breaks the parser
334        body = (
335            b'HTTP/1.1 200 OK\r\n'
336            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
337            b'VCHAR: ' + bytearray(range(0x21, 0x7E + 1)) + b'\r\n'
338            b'obs-text: ' + bytearray(range(0x80, 0xFF + 1)) + b'\r\n'
339            b'obs-fold: text\r\n'
340            b' folded with space\r\n'
341            b'\tfolded with tab\r\n'
342            b'Content-Length: 0\r\n'
343            b'\r\n'
344        )
345        sock = FakeSocket(body)
346        resp = httplib.HTTPResponse(sock)
347        resp.begin()
348        self.assertEqual(resp.getheader('Content-Length'), '0')
349        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
350        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
351        self.assertEqual(resp.getheader('VCHAR'), vchar)
352        self.assertIsNotNone(resp.getheader('obs-text'))
353        folded = resp.getheader('obs-fold')
354        self.assertTrue(folded.startswith('text'))
355        self.assertIn(' folded with space', folded)
356        self.assertTrue(folded.endswith('folded with tab'))
357
358    def test_invalid_headers(self):
359        conn = httplib.HTTPConnection('example.com')
360        conn.sock = FakeSocket('')
361        conn.putrequest('GET', '/')
362
363        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
364        # longer allowed in header names
365        cases = (
366            (b'Invalid\r\nName', b'ValidValue'),
367            (b'Invalid\rName', b'ValidValue'),
368            (b'Invalid\nName', b'ValidValue'),
369            (b'\r\nInvalidName', b'ValidValue'),
370            (b'\rInvalidName', b'ValidValue'),
371            (b'\nInvalidName', b'ValidValue'),
372            (b' InvalidName', b'ValidValue'),
373            (b'\tInvalidName', b'ValidValue'),
374            (b'Invalid:Name', b'ValidValue'),
375            (b':InvalidName', b'ValidValue'),
376            (b'ValidName', b'Invalid\r\nValue'),
377            (b'ValidName', b'Invalid\rValue'),
378            (b'ValidName', b'Invalid\nValue'),
379            (b'ValidName', b'InvalidValue\r\n'),
380            (b'ValidName', b'InvalidValue\r'),
381            (b'ValidName', b'InvalidValue\n'),
382        )
383        for name, value in cases:
384            with self.assertRaisesRegexp(ValueError, 'Invalid header'):
385                conn.putheader(name, value)
386
387
388class BasicTest(TestCase):
389    def test_status_lines(self):
390        # Test HTTP status lines
391
392        body = "HTTP/1.1 200 Ok\r\n\r\nText"
393        sock = FakeSocket(body)
394        resp = httplib.HTTPResponse(sock)
395        resp.begin()
396        self.assertEqual(resp.read(0), '')  # Issue #20007
397        self.assertFalse(resp.isclosed())
398        self.assertEqual(resp.read(), 'Text')
399        self.assertTrue(resp.isclosed())
400
401        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
402        sock = FakeSocket(body)
403        resp = httplib.HTTPResponse(sock)
404        self.assertRaises(httplib.BadStatusLine, resp.begin)
405
406    def test_bad_status_repr(self):
407        exc = httplib.BadStatusLine('')
408        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
409
410    def test_partial_reads(self):
411        # if we have a length, the system knows when to close itself
412        # same behaviour than when we read the whole thing with read()
413        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
414        sock = FakeSocket(body)
415        resp = httplib.HTTPResponse(sock)
416        resp.begin()
417        self.assertEqual(resp.read(2), 'Te')
418        self.assertFalse(resp.isclosed())
419        self.assertEqual(resp.read(2), 'xt')
420        self.assertTrue(resp.isclosed())
421
422    def test_partial_reads_no_content_length(self):
423        # when no length is present, the socket should be gracefully closed when
424        # all data was read
425        body = "HTTP/1.1 200 Ok\r\n\r\nText"
426        sock = FakeSocket(body)
427        resp = httplib.HTTPResponse(sock)
428        resp.begin()
429        self.assertEqual(resp.read(2), 'Te')
430        self.assertFalse(resp.isclosed())
431        self.assertEqual(resp.read(2), 'xt')
432        self.assertEqual(resp.read(1), '')
433        self.assertTrue(resp.isclosed())
434
435    def test_partial_reads_incomplete_body(self):
436        # if the server shuts down the connection before the whole
437        # content-length is delivered, the socket is gracefully closed
438        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
439        sock = FakeSocket(body)
440        resp = httplib.HTTPResponse(sock)
441        resp.begin()
442        self.assertEqual(resp.read(2), 'Te')
443        self.assertFalse(resp.isclosed())
444        self.assertEqual(resp.read(2), 'xt')
445        self.assertEqual(resp.read(1), '')
446        self.assertTrue(resp.isclosed())
447
448    def test_host_port(self):
449        # Check invalid host_port
450
451        # Note that httplib does not accept user:password@ in the host-port.
452        for hp in ("www.python.org:abc", "user:password@www.python.org"):
453            self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
454
455        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
456                          8000),
457                         ("www.python.org:80", "www.python.org", 80),
458                         ("www.python.org", "www.python.org", 80),
459                         ("www.python.org:", "www.python.org", 80),
460                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
461            http = httplib.HTTP(hp)
462            c = http._conn
463            if h != c.host:
464                self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
465            if p != c.port:
466                self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
467
468    def test_response_headers(self):
469        # test response with multiple message headers with the same field name.
470        text = ('HTTP/1.1 200 OK\r\n'
471                'Set-Cookie: Customer="WILE_E_COYOTE";'
472                ' Version="1"; Path="/acme"\r\n'
473                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
474                ' Path="/acme"\r\n'
475                '\r\n'
476                'No body\r\n')
477        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
478               ', '
479               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
480        s = FakeSocket(text)
481        r = httplib.HTTPResponse(s)
482        r.begin()
483        cookies = r.getheader("Set-Cookie")
484        if cookies != hdr:
485            self.fail("multiple headers not combined properly")
486
487    def test_read_head(self):
488        # Test that the library doesn't attempt to read any data
489        # from a HEAD request.  (Tickles SF bug #622042.)
490        sock = FakeSocket(
491            'HTTP/1.1 200 OK\r\n'
492            'Content-Length: 14432\r\n'
493            '\r\n',
494            NoEOFStringIO)
495        resp = httplib.HTTPResponse(sock, method="HEAD")
496        resp.begin()
497        if resp.read() != "":
498            self.fail("Did not expect response from HEAD request")
499
500    def test_too_many_headers(self):
501        headers = '\r\n'.join('Header%d: foo' % i for i in xrange(200)) + '\r\n'
502        text = ('HTTP/1.1 200 OK\r\n' + headers)
503        s = FakeSocket(text)
504        r = httplib.HTTPResponse(s)
505        self.assertRaises(httplib.HTTPException, r.begin)
506
507    def test_send_file(self):
508        expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
509                   'Accept-Encoding: identity\r\nContent-Length:'
510
511        body = open(__file__, 'rb')
512        conn = httplib.HTTPConnection('example.com')
513        sock = FakeSocket(body)
514        conn.sock = sock
515        conn.request('GET', '/foo', body)
516        self.assertTrue(sock.data.startswith(expected))
517        self.assertIn('def test_send_file', sock.data)
518
519    def test_send_tempfile(self):
520        expected = ('GET /foo HTTP/1.1\r\nHost: example.com\r\n'
521                    'Accept-Encoding: identity\r\nContent-Length: 9\r\n\r\n'
522                    'fake\ndata')
523
524        with tempfile.TemporaryFile() as body:
525            body.write('fake\ndata')
526            body.seek(0)
527
528            conn = httplib.HTTPConnection('example.com')
529            sock = FakeSocket(body)
530            conn.sock = sock
531            conn.request('GET', '/foo', body)
532        self.assertEqual(sock.data, expected)
533
534    def test_send(self):
535        expected = 'this is a test this is only a test'
536        conn = httplib.HTTPConnection('example.com')
537        sock = FakeSocket(None)
538        conn.sock = sock
539        conn.send(expected)
540        self.assertEqual(expected, sock.data)
541        sock.data = ''
542        conn.send(array.array('c', expected))
543        self.assertEqual(expected, sock.data)
544        sock.data = ''
545        conn.send(StringIO.StringIO(expected))
546        self.assertEqual(expected, sock.data)
547
548    def test_chunked(self):
549        chunked_start = (
550            'HTTP/1.1 200 OK\r\n'
551            'Transfer-Encoding: chunked\r\n\r\n'
552            'a\r\n'
553            'hello worl\r\n'
554            '1\r\n'
555            'd\r\n'
556        )
557        sock = FakeSocket(chunked_start + '0\r\n')
558        resp = httplib.HTTPResponse(sock, method="GET")
559        resp.begin()
560        self.assertEqual(resp.read(), 'hello world')
561        resp.close()
562
563        for x in ('', 'foo\r\n'):
564            sock = FakeSocket(chunked_start + x)
565            resp = httplib.HTTPResponse(sock, method="GET")
566            resp.begin()
567            try:
568                resp.read()
569            except httplib.IncompleteRead, i:
570                self.assertEqual(i.partial, 'hello world')
571                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
572                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
573            else:
574                self.fail('IncompleteRead expected')
575            finally:
576                resp.close()
577
578    def test_chunked_head(self):
579        chunked_start = (
580            'HTTP/1.1 200 OK\r\n'
581            'Transfer-Encoding: chunked\r\n\r\n'
582            'a\r\n'
583            'hello world\r\n'
584            '1\r\n'
585            'd\r\n'
586        )
587        sock = FakeSocket(chunked_start + '0\r\n')
588        resp = httplib.HTTPResponse(sock, method="HEAD")
589        resp.begin()
590        self.assertEqual(resp.read(), '')
591        self.assertEqual(resp.status, 200)
592        self.assertEqual(resp.reason, 'OK')
593        self.assertTrue(resp.isclosed())
594
595    def test_negative_content_length(self):
596        sock = FakeSocket('HTTP/1.1 200 OK\r\n'
597                          'Content-Length: -1\r\n\r\nHello\r\n')
598        resp = httplib.HTTPResponse(sock, method="GET")
599        resp.begin()
600        self.assertEqual(resp.read(), 'Hello\r\n')
601        self.assertTrue(resp.isclosed())
602
603    def test_incomplete_read(self):
604        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
605        resp = httplib.HTTPResponse(sock, method="GET")
606        resp.begin()
607        try:
608            resp.read()
609        except httplib.IncompleteRead as i:
610            self.assertEqual(i.partial, 'Hello\r\n')
611            self.assertEqual(repr(i),
612                             "IncompleteRead(7 bytes read, 3 more expected)")
613            self.assertEqual(str(i),
614                             "IncompleteRead(7 bytes read, 3 more expected)")
615            self.assertTrue(resp.isclosed())
616        else:
617            self.fail('IncompleteRead expected')
618
619    def test_epipe(self):
620        sock = EPipeSocket(
621            "HTTP/1.0 401 Authorization Required\r\n"
622            "Content-type: text/html\r\n"
623            "WWW-Authenticate: Basic realm=\"example\"\r\n",
624            b"Content-Length")
625        conn = httplib.HTTPConnection("example.com")
626        conn.sock = sock
627        self.assertRaises(socket.error,
628                          lambda: conn.request("PUT", "/url", "body"))
629        resp = conn.getresponse()
630        self.assertEqual(401, resp.status)
631        self.assertEqual("Basic realm=\"example\"",
632                         resp.getheader("www-authenticate"))
633
634    def test_filenoattr(self):
635        # Just test the fileno attribute in the HTTPResponse Object.
636        body = "HTTP/1.1 200 Ok\r\n\r\nText"
637        sock = FakeSocket(body)
638        resp = httplib.HTTPResponse(sock)
639        self.assertTrue(hasattr(resp,'fileno'),
640                'HTTPResponse should expose a fileno attribute')
641
642    # Test lines overflowing the max line size (_MAXLINE in httplib)
643
644    def test_overflowing_status_line(self):
645        self.skipTest("disabled for HTTP 0.9 support")
646        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
647        resp = httplib.HTTPResponse(FakeSocket(body))
648        self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin)
649
650    def test_overflowing_header_line(self):
651        body = (
652            'HTTP/1.1 200 OK\r\n'
653            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
654        )
655        resp = httplib.HTTPResponse(FakeSocket(body))
656        self.assertRaises(httplib.LineTooLong, resp.begin)
657
658    def test_overflowing_chunked_line(self):
659        body = (
660            'HTTP/1.1 200 OK\r\n'
661            'Transfer-Encoding: chunked\r\n\r\n'
662            + '0' * 65536 + 'a\r\n'
663            'hello world\r\n'
664            '0\r\n'
665        )
666        resp = httplib.HTTPResponse(FakeSocket(body))
667        resp.begin()
668        self.assertRaises(httplib.LineTooLong, resp.read)
669
670    def test_early_eof(self):
671        # Test httpresponse with no \r\n termination,
672        body = "HTTP/1.1 200 Ok"
673        sock = FakeSocket(body)
674        resp = httplib.HTTPResponse(sock)
675        resp.begin()
676        self.assertEqual(resp.read(), '')
677        self.assertTrue(resp.isclosed())
678
679    def test_error_leak(self):
680        # Test that the socket is not leaked if getresponse() fails
681        conn = httplib.HTTPConnection('example.com')
682        response = []
683        class Response(httplib.HTTPResponse):
684            def __init__(self, *pos, **kw):
685                response.append(self)  # Avoid garbage collector closing the socket
686                httplib.HTTPResponse.__init__(self, *pos, **kw)
687        conn.response_class = Response
688        conn.sock = FakeSocket('')  # Emulate server dropping connection
689        conn.request('GET', '/')
690        self.assertRaises(httplib.BadStatusLine, conn.getresponse)
691        self.assertTrue(response)
692        #self.assertTrue(response[0].closed)
693        self.assertTrue(conn.sock.file_closed)
694
695    def test_proxy_tunnel_without_status_line(self):
696        # Issue 17849: If a proxy tunnel is created that does not return
697        # a status code, fail.
698        body = 'hello world'
699        conn = httplib.HTTPConnection('example.com', strict=False)
700        conn.set_tunnel('foo')
701        conn.sock = FakeSocket(body)
702        with self.assertRaisesRegexp(socket.error, "Invalid response"):
703            conn._tunnel()
704
705    def test_putrequest_override_domain_validation(self):
706        """
707        It should be possible to override the default validation
708        behavior in putrequest (bpo-38216).
709        """
710        class UnsafeHTTPConnection(httplib.HTTPConnection):
711            def _validate_path(self, url):
712                pass
713
714        conn = UnsafeHTTPConnection('example.com')
715        conn.sock = FakeSocket('')
716        conn.putrequest('GET', '/\x00')
717
718    def test_putrequest_override_host_validation(self):
719        class UnsafeHTTPConnection(httplib.HTTPConnection):
720            def _validate_host(self, url):
721                pass
722
723        conn = UnsafeHTTPConnection('example.com\r\n')
724        conn.sock = FakeSocket('')
725        # set skip_host so a ValueError is not raised upon adding the
726        # invalid URL as the value of the "Host:" header
727        conn.putrequest('GET', '/', skip_host=1)
728
729
730class OfflineTest(TestCase):
731    def test_responses(self):
732        self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
733
734
735class TestServerMixin:
736    """A limited socket server mixin.
737
738    This is used by test cases for testing http connection end points.
739    """
740    def setUp(self):
741        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
742        self.port = test_support.bind_port(self.serv)
743        self.source_port = test_support.find_unused_port()
744        self.serv.listen(5)
745        self.conn = None
746
747    def tearDown(self):
748        if self.conn:
749            self.conn.close()
750            self.conn = None
751        self.serv.close()
752        self.serv = None
753
754class SourceAddressTest(TestServerMixin, TestCase):
755    def testHTTPConnectionSourceAddress(self):
756        self.conn = httplib.HTTPConnection(HOST, self.port,
757                source_address=('', self.source_port))
758        self.conn.connect()
759        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
760
761    @unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'),
762                     'httplib.HTTPSConnection not defined')
763    def testHTTPSConnectionSourceAddress(self):
764        self.conn = httplib.HTTPSConnection(HOST, self.port,
765                source_address=('', self.source_port))
766        # We don't test anything here other than the constructor not barfing as
767        # this code doesn't deal with setting up an active running SSL server
768        # for an ssl_wrapped connect() to actually return from.
769
770
771class HTTPTest(TestServerMixin, TestCase):
772    def testHTTPConnection(self):
773        self.conn = httplib.HTTP(host=HOST, port=self.port, strict=None)
774        self.conn.connect()
775        self.assertEqual(self.conn._conn.host, HOST)
776        self.assertEqual(self.conn._conn.port, self.port)
777
778    def testHTTPWithConnectHostPort(self):
779        testhost = 'unreachable.test.domain'
780        testport = '80'
781        self.conn = httplib.HTTP(host=testhost, port=testport)
782        self.conn.connect(host=HOST, port=self.port)
783        self.assertNotEqual(self.conn._conn.host, testhost)
784        self.assertNotEqual(self.conn._conn.port, testport)
785        self.assertEqual(self.conn._conn.host, HOST)
786        self.assertEqual(self.conn._conn.port, self.port)
787
788
789class TimeoutTest(TestCase):
790    PORT = None
791
792    def setUp(self):
793        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
794        TimeoutTest.PORT = test_support.bind_port(self.serv)
795        self.serv.listen(5)
796
797    def tearDown(self):
798        self.serv.close()
799        self.serv = None
800
801    def testTimeoutAttribute(self):
802        '''This will prove that the timeout gets through
803        HTTPConnection and into the socket.
804        '''
805        # default -- use global socket timeout
806        self.assertIsNone(socket.getdefaulttimeout())
807        socket.setdefaulttimeout(30)
808        try:
809            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
810            httpConn.connect()
811        finally:
812            socket.setdefaulttimeout(None)
813        self.assertEqual(httpConn.sock.gettimeout(), 30)
814        httpConn.close()
815
816        # no timeout -- do not use global socket default
817        self.assertIsNone(socket.getdefaulttimeout())
818        socket.setdefaulttimeout(30)
819        try:
820            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
821                                              timeout=None)
822            httpConn.connect()
823        finally:
824            socket.setdefaulttimeout(None)
825        self.assertEqual(httpConn.sock.gettimeout(), None)
826        httpConn.close()
827
828        # a value
829        httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
830        httpConn.connect()
831        self.assertEqual(httpConn.sock.gettimeout(), 30)
832        httpConn.close()
833
834
835class HTTPSTest(TestCase):
836
837    def setUp(self):
838        if not hasattr(httplib, 'HTTPSConnection'):
839            self.skipTest('ssl support required')
840
841    def make_server(self, certfile):
842        from test.ssl_servers import make_https_server
843        return make_https_server(self, certfile=certfile)
844
845    def test_attributes(self):
846        # simple test to check it's storing the timeout
847        h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
848        self.assertEqual(h.timeout, 30)
849
850    def test_networked(self):
851        # Default settings: requires a valid cert from a trusted CA
852        import ssl
853        test_support.requires('network')
854        with test_support.transient_internet('self-signed.pythontest.net'):
855            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443)
856            with self.assertRaises(ssl.SSLError) as exc_info:
857                h.request('GET', '/')
858            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
859
860    def test_networked_noverification(self):
861        # Switch off cert verification
862        import ssl
863        test_support.requires('network')
864        with test_support.transient_internet('self-signed.pythontest.net'):
865            context = ssl._create_stdlib_context()
866            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443,
867                                        context=context)
868            h.request('GET', '/')
869            resp = h.getresponse()
870            self.assertIn('nginx', resp.getheader('server'))
871
872    @test_support.system_must_validate_cert
873    def test_networked_trusted_by_default_cert(self):
874        # Default settings: requires a valid cert from a trusted CA
875        test_support.requires('network')
876        with test_support.transient_internet('www.python.org'):
877            h = httplib.HTTPSConnection('www.python.org', 443)
878            h.request('GET', '/')
879            resp = h.getresponse()
880            content_type = resp.getheader('content-type')
881            self.assertIn('text/html', content_type)
882
883    def test_networked_good_cert(self):
884        # We feed the server's cert as a validating cert
885        import ssl
886        test_support.requires('network')
887        with test_support.transient_internet('self-signed.pythontest.net'):
888            context = ssl.SSLContext(ssl.PROTOCOL_TLS)
889            context.verify_mode = ssl.CERT_REQUIRED
890            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
891            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
892            h.request('GET', '/')
893            resp = h.getresponse()
894            server_string = resp.getheader('server')
895            self.assertIn('nginx', server_string)
896
897    def test_networked_bad_cert(self):
898        # We feed a "CA" cert that is unrelated to the server's cert
899        import ssl
900        test_support.requires('network')
901        with test_support.transient_internet('self-signed.pythontest.net'):
902            context = ssl.SSLContext(ssl.PROTOCOL_TLS)
903            context.verify_mode = ssl.CERT_REQUIRED
904            context.load_verify_locations(CERT_localhost)
905            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
906            with self.assertRaises(ssl.SSLError) as exc_info:
907                h.request('GET', '/')
908            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
909
910    def test_local_unknown_cert(self):
911        # The custom cert isn't known to the default trust bundle
912        import ssl
913        server = self.make_server(CERT_localhost)
914        h = httplib.HTTPSConnection('localhost', server.port)
915        with self.assertRaises(ssl.SSLError) as exc_info:
916            h.request('GET', '/')
917        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
918
919    def test_local_good_hostname(self):
920        # The (valid) cert validates the HTTP hostname
921        import ssl
922        server = self.make_server(CERT_localhost)
923        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
924        context.verify_mode = ssl.CERT_REQUIRED
925        context.load_verify_locations(CERT_localhost)
926        h = httplib.HTTPSConnection('localhost', server.port, context=context)
927        h.request('GET', '/nonexistent')
928        resp = h.getresponse()
929        self.assertEqual(resp.status, 404)
930
931    def test_local_bad_hostname(self):
932        # The (valid) cert doesn't validate the HTTP hostname
933        import ssl
934        server = self.make_server(CERT_fakehostname)
935        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
936        context.verify_mode = ssl.CERT_REQUIRED
937        context.check_hostname = True
938        context.load_verify_locations(CERT_fakehostname)
939        h = httplib.HTTPSConnection('localhost', server.port, context=context)
940        with self.assertRaises(ssl.CertificateError):
941            h.request('GET', '/')
942        h.close()
943        # With context.check_hostname=False, the mismatching is ignored
944        context.check_hostname = False
945        h = httplib.HTTPSConnection('localhost', server.port, context=context)
946        h.request('GET', '/nonexistent')
947        resp = h.getresponse()
948        self.assertEqual(resp.status, 404)
949
950    def test_host_port(self):
951        # Check invalid host_port
952
953        for hp in ("www.python.org:abc", "user:password@www.python.org"):
954            self.assertRaises(httplib.InvalidURL, httplib.HTTPSConnection, hp)
955
956        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
957                          "fe80::207:e9ff:fe9b", 8000),
958                         ("www.python.org:443", "www.python.org", 443),
959                         ("www.python.org:", "www.python.org", 443),
960                         ("www.python.org", "www.python.org", 443),
961                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
962                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
963                             443)):
964            c = httplib.HTTPSConnection(hp)
965            self.assertEqual(h, c.host)
966            self.assertEqual(p, c.port)
967
968
969class TunnelTests(TestCase):
970    def test_connect(self):
971        response_text = (
972            'HTTP/1.0 200 OK\r\n\r\n'   # Reply to CONNECT
973            'HTTP/1.1 200 OK\r\n'       # Reply to HEAD
974            'Content-Length: 42\r\n\r\n'
975        )
976
977        def create_connection(address, timeout=None, source_address=None):
978            return FakeSocket(response_text, host=address[0], port=address[1])
979
980        conn = httplib.HTTPConnection('proxy.com')
981        conn._create_connection = create_connection
982
983        # Once connected, we should not be able to tunnel anymore
984        conn.connect()
985        self.assertRaises(RuntimeError, conn.set_tunnel, 'destination.com')
986
987        # But if close the connection, we are good.
988        conn.close()
989        conn.set_tunnel('destination.com')
990        conn.request('HEAD', '/', '')
991
992        self.assertEqual(conn.sock.host, 'proxy.com')
993        self.assertEqual(conn.sock.port, 80)
994        self.assertIn('CONNECT destination.com', conn.sock.data)
995        # issue22095
996        self.assertNotIn('Host: destination.com:None', conn.sock.data)
997        self.assertIn('Host: destination.com', conn.sock.data)
998
999        self.assertNotIn('Host: proxy.com', conn.sock.data)
1000
1001        conn.close()
1002
1003        conn.request('PUT', '/', '')
1004        self.assertEqual(conn.sock.host, 'proxy.com')
1005        self.assertEqual(conn.sock.port, 80)
1006        self.assertTrue('CONNECT destination.com' in conn.sock.data)
1007        self.assertTrue('Host: destination.com' in conn.sock.data)
1008
1009
1010@test_support.reap_threads
1011def test_main(verbose=None):
1012    test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
1013                              HTTPTest, HTTPSTest, SourceAddressTest,
1014                              TunnelTests)
1015
1016if __name__ == '__main__':
1017    test_main()
1018