1import errno 2from http import client, HTTPStatus 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13TestCase = unittest.TestCase 14 15from test import support 16 17here = os.path.dirname(__file__) 18# Self-signed cert file for 'localhost' 19CERT_localhost = os.path.join(here, 'keycert.pem') 20# Self-signed cert file for 'fakehostname' 21CERT_fakehostname = os.path.join(here, 'keycert2.pem') 22# Self-signed cert file for self-signed.pythontest.net 23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 24 25# constants for testing chunked encoding 26chunked_start = ( 27 'HTTP/1.1 200 OK\r\n' 28 'Transfer-Encoding: chunked\r\n\r\n' 29 'a\r\n' 30 'hello worl\r\n' 31 '3\r\n' 32 'd! \r\n' 33 '8\r\n' 34 'and now \r\n' 35 '22\r\n' 36 'for something completely different\r\n' 37) 38chunked_expected = b'hello world! and now for something completely different' 39chunk_extension = ";foo=bar" 40last_chunk = "0\r\n" 41last_chunk_extended = "0" + chunk_extension + "\r\n" 42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 43chunked_end = "\r\n" 44 45HOST = support.HOST 46 47class FakeSocket: 48 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 49 if isinstance(text, str): 50 text = text.encode("ascii") 51 self.text = text 52 self.fileclass = fileclass 53 self.data = b'' 54 self.sendall_calls = 0 55 self.file_closed = False 56 self.host = host 57 self.port = port 58 59 def sendall(self, data): 60 self.sendall_calls += 1 61 self.data += data 62 63 def makefile(self, mode, bufsize=None): 64 if mode != 'r' and mode != 'rb': 65 raise client.UnimplementedFileMode() 66 # keep the file around so we can check how much was read from it 67 self.file = self.fileclass(self.text) 68 self.file.close = self.file_close #nerf close () 69 return self.file 70 71 def file_close(self): 72 self.file_closed = True 73 74 def close(self): 75 pass 76 77 def setsockopt(self, level, optname, value): 78 pass 79 80class EPipeSocket(FakeSocket): 81 82 def __init__(self, text, pipe_trigger): 83 # When sendall() is called with pipe_trigger, raise EPIPE. 84 FakeSocket.__init__(self, text) 85 self.pipe_trigger = pipe_trigger 86 87 def sendall(self, data): 88 if self.pipe_trigger in data: 89 raise OSError(errno.EPIPE, "gotcha") 90 self.data += data 91 92 def close(self): 93 pass 94 95class NoEOFBytesIO(io.BytesIO): 96 """Like BytesIO, but raises AssertionError on EOF. 97 98 This is used below to test that http.client doesn't try to read 99 more from the underlying file than it should. 100 """ 101 def read(self, n=-1): 102 data = io.BytesIO.read(self, n) 103 if data == b'': 104 raise AssertionError('caller tried to read past EOF') 105 return data 106 107 def readline(self, length=None): 108 data = io.BytesIO.readline(self, length) 109 if data == b'': 110 raise AssertionError('caller tried to read past EOF') 111 return data 112 113class FakeSocketHTTPConnection(client.HTTPConnection): 114 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 115 116 def __init__(self, *args): 117 self.connections = 0 118 super().__init__('example.com') 119 self.fake_socket_args = args 120 self._create_connection = self.create_connection 121 122 def connect(self): 123 """Count the number of times connect() is invoked""" 124 self.connections += 1 125 return super().connect() 126 127 def create_connection(self, *pos, **kw): 128 return FakeSocket(*self.fake_socket_args) 129 130class HeaderTests(TestCase): 131 def test_auto_headers(self): 132 # Some headers are added automatically, but should not be added by 133 # .request() if they are explicitly set. 134 135 class HeaderCountingBuffer(list): 136 def __init__(self): 137 self.count = {} 138 def append(self, item): 139 kv = item.split(b':') 140 if len(kv) > 1: 141 # item is a 'Key: Value' header string 142 lcKey = kv[0].decode('ascii').lower() 143 self.count.setdefault(lcKey, 0) 144 self.count[lcKey] += 1 145 list.append(self, item) 146 147 for explicit_header in True, False: 148 for header in 'Content-length', 'Host', 'Accept-encoding': 149 conn = client.HTTPConnection('example.com') 150 conn.sock = FakeSocket('blahblahblah') 151 conn._buffer = HeaderCountingBuffer() 152 153 body = 'spamspamspam' 154 headers = {} 155 if explicit_header: 156 headers[header] = str(len(body)) 157 conn.request('POST', '/', body, headers) 158 self.assertEqual(conn._buffer.count[header.lower()], 1) 159 160 def test_content_length_0(self): 161 162 class ContentLengthChecker(list): 163 def __init__(self): 164 list.__init__(self) 165 self.content_length = None 166 def append(self, item): 167 kv = item.split(b':', 1) 168 if len(kv) > 1 and kv[0].lower() == b'content-length': 169 self.content_length = kv[1].strip() 170 list.append(self, item) 171 172 # Here, we're testing that methods expecting a body get a 173 # content-length set to zero if the body is empty (either None or '') 174 bodies = (None, '') 175 methods_with_body = ('PUT', 'POST', 'PATCH') 176 for method, body in itertools.product(methods_with_body, bodies): 177 conn = client.HTTPConnection('example.com') 178 conn.sock = FakeSocket(None) 179 conn._buffer = ContentLengthChecker() 180 conn.request(method, '/', body) 181 self.assertEqual( 182 conn._buffer.content_length, b'0', 183 'Header Content-Length incorrect on {}'.format(method) 184 ) 185 186 # For these methods, we make sure that content-length is not set when 187 # the body is None because it might cause unexpected behaviour on the 188 # server. 189 methods_without_body = ( 190 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 191 ) 192 for method in methods_without_body: 193 conn = client.HTTPConnection('example.com') 194 conn.sock = FakeSocket(None) 195 conn._buffer = ContentLengthChecker() 196 conn.request(method, '/', None) 197 self.assertEqual( 198 conn._buffer.content_length, None, 199 'Header Content-Length set for empty body on {}'.format(method) 200 ) 201 202 # If the body is set to '', that's considered to be "present but 203 # empty" rather than "missing", so content length would be set, even 204 # for methods that don't expect a body. 205 for method in methods_without_body: 206 conn = client.HTTPConnection('example.com') 207 conn.sock = FakeSocket(None) 208 conn._buffer = ContentLengthChecker() 209 conn.request(method, '/', '') 210 self.assertEqual( 211 conn._buffer.content_length, b'0', 212 'Header Content-Length incorrect on {}'.format(method) 213 ) 214 215 # If the body is set, make sure Content-Length is set. 216 for method in itertools.chain(methods_without_body, methods_with_body): 217 conn = client.HTTPConnection('example.com') 218 conn.sock = FakeSocket(None) 219 conn._buffer = ContentLengthChecker() 220 conn.request(method, '/', ' ') 221 self.assertEqual( 222 conn._buffer.content_length, b'1', 223 'Header Content-Length incorrect on {}'.format(method) 224 ) 225 226 def test_putheader(self): 227 conn = client.HTTPConnection('example.com') 228 conn.sock = FakeSocket(None) 229 conn.putrequest('GET','/') 230 conn.putheader('Content-length', 42) 231 self.assertIn(b'Content-length: 42', conn._buffer) 232 233 conn.putheader('Foo', ' bar ') 234 self.assertIn(b'Foo: bar ', conn._buffer) 235 conn.putheader('Bar', '\tbaz\t') 236 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 237 conn.putheader('Authorization', 'Bearer mytoken') 238 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 239 conn.putheader('IterHeader', 'IterA', 'IterB') 240 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 241 conn.putheader('LatinHeader', b'\xFF') 242 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 243 conn.putheader('Utf8Header', b'\xc3\x80') 244 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 245 conn.putheader('C1-Control', b'next\x85line') 246 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 247 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 248 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 249 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 250 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 251 conn.putheader('Key Space', 'value') 252 self.assertIn(b'Key Space: value', conn._buffer) 253 conn.putheader('KeySpace ', 'value') 254 self.assertIn(b'KeySpace : value', conn._buffer) 255 conn.putheader(b'Nonbreak\xa0Space', 'value') 256 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 257 conn.putheader(b'\xa0NonbreakSpace', 'value') 258 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 259 260 def test_ipv6host_header(self): 261 # Default host header on IPv6 transaction should be wrapped by [] if 262 # it is an IPv6 address 263 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 264 b'Accept-Encoding: identity\r\n\r\n' 265 conn = client.HTTPConnection('[2001::]:81') 266 sock = FakeSocket('') 267 conn.sock = sock 268 conn.request('GET', '/foo') 269 self.assertTrue(sock.data.startswith(expected)) 270 271 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 272 b'Accept-Encoding: identity\r\n\r\n' 273 conn = client.HTTPConnection('[2001:102A::]') 274 sock = FakeSocket('') 275 conn.sock = sock 276 conn.request('GET', '/foo') 277 self.assertTrue(sock.data.startswith(expected)) 278 279 def test_malformed_headers_coped_with(self): 280 # Issue 19996 281 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 282 sock = FakeSocket(body) 283 resp = client.HTTPResponse(sock) 284 resp.begin() 285 286 self.assertEqual(resp.getheader('First'), 'val') 287 self.assertEqual(resp.getheader('Second'), 'val') 288 289 def test_parse_all_octets(self): 290 # Ensure no valid header field octet breaks the parser 291 body = ( 292 b'HTTP/1.1 200 OK\r\n' 293 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 294 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 295 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 296 b'obs-fold: text\r\n' 297 b' folded with space\r\n' 298 b'\tfolded with tab\r\n' 299 b'Content-Length: 0\r\n' 300 b'\r\n' 301 ) 302 sock = FakeSocket(body) 303 resp = client.HTTPResponse(sock) 304 resp.begin() 305 self.assertEqual(resp.getheader('Content-Length'), '0') 306 self.assertEqual(resp.msg['Content-Length'], '0') 307 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 308 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 309 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 310 self.assertEqual(resp.getheader('VCHAR'), vchar) 311 self.assertEqual(resp.msg['VCHAR'], vchar) 312 self.assertIsNotNone(resp.getheader('obs-text')) 313 self.assertIn('obs-text', resp.msg) 314 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 315 self.assertTrue(folded.startswith('text')) 316 self.assertIn(' folded with space', folded) 317 self.assertTrue(folded.endswith('folded with tab')) 318 319 def test_invalid_headers(self): 320 conn = client.HTTPConnection('example.com') 321 conn.sock = FakeSocket('') 322 conn.putrequest('GET', '/') 323 324 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 325 # longer allowed in header names 326 cases = ( 327 (b'Invalid\r\nName', b'ValidValue'), 328 (b'Invalid\rName', b'ValidValue'), 329 (b'Invalid\nName', b'ValidValue'), 330 (b'\r\nInvalidName', b'ValidValue'), 331 (b'\rInvalidName', b'ValidValue'), 332 (b'\nInvalidName', b'ValidValue'), 333 (b' InvalidName', b'ValidValue'), 334 (b'\tInvalidName', b'ValidValue'), 335 (b'Invalid:Name', b'ValidValue'), 336 (b':InvalidName', b'ValidValue'), 337 (b'ValidName', b'Invalid\r\nValue'), 338 (b'ValidName', b'Invalid\rValue'), 339 (b'ValidName', b'Invalid\nValue'), 340 (b'ValidName', b'InvalidValue\r\n'), 341 (b'ValidName', b'InvalidValue\r'), 342 (b'ValidName', b'InvalidValue\n'), 343 ) 344 for name, value in cases: 345 with self.subTest((name, value)): 346 with self.assertRaisesRegex(ValueError, 'Invalid header'): 347 conn.putheader(name, value) 348 349 def test_headers_debuglevel(self): 350 body = ( 351 b'HTTP/1.1 200 OK\r\n' 352 b'First: val\r\n' 353 b'Second: val1\r\n' 354 b'Second: val2\r\n' 355 ) 356 sock = FakeSocket(body) 357 resp = client.HTTPResponse(sock, debuglevel=1) 358 with support.captured_stdout() as output: 359 resp.begin() 360 lines = output.getvalue().splitlines() 361 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 362 self.assertEqual(lines[1], "header: First: val") 363 self.assertEqual(lines[2], "header: Second: val1") 364 self.assertEqual(lines[3], "header: Second: val2") 365 366 367class HttpMethodTests(TestCase): 368 def test_invalid_method_names(self): 369 methods = ( 370 'GET\r', 371 'POST\n', 372 'PUT\n\r', 373 'POST\nValue', 374 'POST\nHOST:abc', 375 'GET\nrHost:abc\n', 376 'POST\rRemainder:\r', 377 'GET\rHOST:\n', 378 '\nPUT' 379 ) 380 381 for method in methods: 382 with self.assertRaisesRegex( 383 ValueError, "method can't contain control characters"): 384 conn = client.HTTPConnection('example.com') 385 conn.sock = FakeSocket(None) 386 conn.request(method=method, url="/") 387 388 389class TransferEncodingTest(TestCase): 390 expected_body = b"It's just a flesh wound" 391 392 def test_endheaders_chunked(self): 393 conn = client.HTTPConnection('example.com') 394 conn.sock = FakeSocket(b'') 395 conn.putrequest('POST', '/') 396 conn.endheaders(self._make_body(), encode_chunked=True) 397 398 _, _, body = self._parse_request(conn.sock.data) 399 body = self._parse_chunked(body) 400 self.assertEqual(body, self.expected_body) 401 402 def test_explicit_headers(self): 403 # explicit chunked 404 conn = client.HTTPConnection('example.com') 405 conn.sock = FakeSocket(b'') 406 # this shouldn't actually be automatically chunk-encoded because the 407 # calling code has explicitly stated that it's taking care of it 408 conn.request( 409 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 410 411 _, headers, body = self._parse_request(conn.sock.data) 412 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 413 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 414 self.assertEqual(body, self.expected_body) 415 416 # explicit chunked, string body 417 conn = client.HTTPConnection('example.com') 418 conn.sock = FakeSocket(b'') 419 conn.request( 420 'POST', '/', self.expected_body.decode('latin-1'), 421 {'Transfer-Encoding': 'chunked'}) 422 423 _, headers, body = self._parse_request(conn.sock.data) 424 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 425 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 426 self.assertEqual(body, self.expected_body) 427 428 # User-specified TE, but request() does the chunk encoding 429 conn = client.HTTPConnection('example.com') 430 conn.sock = FakeSocket(b'') 431 conn.request('POST', '/', 432 headers={'Transfer-Encoding': 'gzip, chunked'}, 433 encode_chunked=True, 434 body=self._make_body()) 435 _, headers, body = self._parse_request(conn.sock.data) 436 self.assertNotIn('content-length', [k.lower() for k in headers]) 437 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 438 self.assertEqual(self._parse_chunked(body), self.expected_body) 439 440 def test_request(self): 441 for empty_lines in (False, True,): 442 conn = client.HTTPConnection('example.com') 443 conn.sock = FakeSocket(b'') 444 conn.request( 445 'POST', '/', self._make_body(empty_lines=empty_lines)) 446 447 _, headers, body = self._parse_request(conn.sock.data) 448 body = self._parse_chunked(body) 449 self.assertEqual(body, self.expected_body) 450 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 451 452 # Content-Length and Transfer-Encoding SHOULD not be sent in the 453 # same request 454 self.assertNotIn('content-length', [k.lower() for k in headers]) 455 456 def test_empty_body(self): 457 # Zero-length iterable should be treated like any other iterable 458 conn = client.HTTPConnection('example.com') 459 conn.sock = FakeSocket(b'') 460 conn.request('POST', '/', ()) 461 _, headers, body = self._parse_request(conn.sock.data) 462 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 463 self.assertNotIn('content-length', [k.lower() for k in headers]) 464 self.assertEqual(body, b"0\r\n\r\n") 465 466 def _make_body(self, empty_lines=False): 467 lines = self.expected_body.split(b' ') 468 for idx, line in enumerate(lines): 469 # for testing handling empty lines 470 if empty_lines and idx % 2: 471 yield b'' 472 if idx < len(lines) - 1: 473 yield line + b' ' 474 else: 475 yield line 476 477 def _parse_request(self, data): 478 lines = data.split(b'\r\n') 479 request = lines[0] 480 headers = {} 481 n = 1 482 while n < len(lines) and len(lines[n]) > 0: 483 key, val = lines[n].split(b':') 484 key = key.decode('latin-1').strip() 485 headers[key] = val.decode('latin-1').strip() 486 n += 1 487 488 return request, headers, b'\r\n'.join(lines[n + 1:]) 489 490 def _parse_chunked(self, data): 491 body = [] 492 trailers = {} 493 n = 0 494 lines = data.split(b'\r\n') 495 # parse body 496 while True: 497 size, chunk = lines[n:n+2] 498 size = int(size, 16) 499 500 if size == 0: 501 n += 1 502 break 503 504 self.assertEqual(size, len(chunk)) 505 body.append(chunk) 506 507 n += 2 508 # we /should/ hit the end chunk, but check against the size of 509 # lines so we're not stuck in an infinite loop should we get 510 # malformed data 511 if n > len(lines): 512 break 513 514 return b''.join(body) 515 516 517class BasicTest(TestCase): 518 def test_dir_with_added_behavior_on_status(self): 519 # see issue40084 520 self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404)))) 521 522 def test_status_lines(self): 523 # Test HTTP status lines 524 525 body = "HTTP/1.1 200 Ok\r\n\r\nText" 526 sock = FakeSocket(body) 527 resp = client.HTTPResponse(sock) 528 resp.begin() 529 self.assertEqual(resp.read(0), b'') # Issue #20007 530 self.assertFalse(resp.isclosed()) 531 self.assertFalse(resp.closed) 532 self.assertEqual(resp.read(), b"Text") 533 self.assertTrue(resp.isclosed()) 534 self.assertFalse(resp.closed) 535 resp.close() 536 self.assertTrue(resp.closed) 537 538 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 539 sock = FakeSocket(body) 540 resp = client.HTTPResponse(sock) 541 self.assertRaises(client.BadStatusLine, resp.begin) 542 543 def test_bad_status_repr(self): 544 exc = client.BadStatusLine('') 545 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 546 547 def test_partial_reads(self): 548 # if we have Content-Length, HTTPResponse knows when to close itself, 549 # the same behaviour as when we read the whole thing with read() 550 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 551 sock = FakeSocket(body) 552 resp = client.HTTPResponse(sock) 553 resp.begin() 554 self.assertEqual(resp.read(2), b'Te') 555 self.assertFalse(resp.isclosed()) 556 self.assertEqual(resp.read(2), b'xt') 557 self.assertTrue(resp.isclosed()) 558 self.assertFalse(resp.closed) 559 resp.close() 560 self.assertTrue(resp.closed) 561 562 def test_mixed_reads(self): 563 # readline() should update the remaining length, so that read() knows 564 # how much data is left and does not raise IncompleteRead 565 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 566 sock = FakeSocket(body) 567 resp = client.HTTPResponse(sock) 568 resp.begin() 569 self.assertEqual(resp.readline(), b'Text\r\n') 570 self.assertFalse(resp.isclosed()) 571 self.assertEqual(resp.read(), b'Another') 572 self.assertTrue(resp.isclosed()) 573 self.assertFalse(resp.closed) 574 resp.close() 575 self.assertTrue(resp.closed) 576 577 def test_partial_readintos(self): 578 # if we have Content-Length, HTTPResponse knows when to close itself, 579 # the same behaviour as when we read the whole thing with read() 580 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 581 sock = FakeSocket(body) 582 resp = client.HTTPResponse(sock) 583 resp.begin() 584 b = bytearray(2) 585 n = resp.readinto(b) 586 self.assertEqual(n, 2) 587 self.assertEqual(bytes(b), b'Te') 588 self.assertFalse(resp.isclosed()) 589 n = resp.readinto(b) 590 self.assertEqual(n, 2) 591 self.assertEqual(bytes(b), b'xt') 592 self.assertTrue(resp.isclosed()) 593 self.assertFalse(resp.closed) 594 resp.close() 595 self.assertTrue(resp.closed) 596 597 def test_partial_reads_no_content_length(self): 598 # when no length is present, the socket should be gracefully closed when 599 # all data was read 600 body = "HTTP/1.1 200 Ok\r\n\r\nText" 601 sock = FakeSocket(body) 602 resp = client.HTTPResponse(sock) 603 resp.begin() 604 self.assertEqual(resp.read(2), b'Te') 605 self.assertFalse(resp.isclosed()) 606 self.assertEqual(resp.read(2), b'xt') 607 self.assertEqual(resp.read(1), b'') 608 self.assertTrue(resp.isclosed()) 609 self.assertFalse(resp.closed) 610 resp.close() 611 self.assertTrue(resp.closed) 612 613 def test_partial_readintos_no_content_length(self): 614 # when no length is present, the socket should be gracefully closed when 615 # all data was read 616 body = "HTTP/1.1 200 Ok\r\n\r\nText" 617 sock = FakeSocket(body) 618 resp = client.HTTPResponse(sock) 619 resp.begin() 620 b = bytearray(2) 621 n = resp.readinto(b) 622 self.assertEqual(n, 2) 623 self.assertEqual(bytes(b), b'Te') 624 self.assertFalse(resp.isclosed()) 625 n = resp.readinto(b) 626 self.assertEqual(n, 2) 627 self.assertEqual(bytes(b), b'xt') 628 n = resp.readinto(b) 629 self.assertEqual(n, 0) 630 self.assertTrue(resp.isclosed()) 631 632 def test_partial_reads_incomplete_body(self): 633 # if the server shuts down the connection before the whole 634 # content-length is delivered, the socket is gracefully closed 635 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 636 sock = FakeSocket(body) 637 resp = client.HTTPResponse(sock) 638 resp.begin() 639 self.assertEqual(resp.read(2), b'Te') 640 self.assertFalse(resp.isclosed()) 641 self.assertEqual(resp.read(2), b'xt') 642 self.assertEqual(resp.read(1), b'') 643 self.assertTrue(resp.isclosed()) 644 645 def test_partial_readintos_incomplete_body(self): 646 # if the server shuts down the connection before the whole 647 # content-length is delivered, the socket is gracefully closed 648 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 649 sock = FakeSocket(body) 650 resp = client.HTTPResponse(sock) 651 resp.begin() 652 b = bytearray(2) 653 n = resp.readinto(b) 654 self.assertEqual(n, 2) 655 self.assertEqual(bytes(b), b'Te') 656 self.assertFalse(resp.isclosed()) 657 n = resp.readinto(b) 658 self.assertEqual(n, 2) 659 self.assertEqual(bytes(b), b'xt') 660 n = resp.readinto(b) 661 self.assertEqual(n, 0) 662 self.assertTrue(resp.isclosed()) 663 self.assertFalse(resp.closed) 664 resp.close() 665 self.assertTrue(resp.closed) 666 667 def test_host_port(self): 668 # Check invalid host_port 669 670 for hp in ("www.python.org:abc", "user:password@www.python.org"): 671 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 672 673 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 674 "fe80::207:e9ff:fe9b", 8000), 675 ("www.python.org:80", "www.python.org", 80), 676 ("www.python.org:", "www.python.org", 80), 677 ("www.python.org", "www.python.org", 80), 678 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 679 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 680 c = client.HTTPConnection(hp) 681 self.assertEqual(h, c.host) 682 self.assertEqual(p, c.port) 683 684 def test_response_headers(self): 685 # test response with multiple message headers with the same field name. 686 text = ('HTTP/1.1 200 OK\r\n' 687 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 688 'Version="1"; Path="/acme"\r\n' 689 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 690 ' Path="/acme"\r\n' 691 '\r\n' 692 'No body\r\n') 693 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 694 ', ' 695 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 696 s = FakeSocket(text) 697 r = client.HTTPResponse(s) 698 r.begin() 699 cookies = r.getheader("Set-Cookie") 700 self.assertEqual(cookies, hdr) 701 702 def test_read_head(self): 703 # Test that the library doesn't attempt to read any data 704 # from a HEAD request. (Tickles SF bug #622042.) 705 sock = FakeSocket( 706 'HTTP/1.1 200 OK\r\n' 707 'Content-Length: 14432\r\n' 708 '\r\n', 709 NoEOFBytesIO) 710 resp = client.HTTPResponse(sock, method="HEAD") 711 resp.begin() 712 if resp.read(): 713 self.fail("Did not expect response from HEAD request") 714 715 def test_readinto_head(self): 716 # Test that the library doesn't attempt to read any data 717 # from a HEAD request. (Tickles SF bug #622042.) 718 sock = FakeSocket( 719 'HTTP/1.1 200 OK\r\n' 720 'Content-Length: 14432\r\n' 721 '\r\n', 722 NoEOFBytesIO) 723 resp = client.HTTPResponse(sock, method="HEAD") 724 resp.begin() 725 b = bytearray(5) 726 if resp.readinto(b) != 0: 727 self.fail("Did not expect response from HEAD request") 728 self.assertEqual(bytes(b), b'\x00'*5) 729 730 def test_too_many_headers(self): 731 headers = '\r\n'.join('Header%d: foo' % i 732 for i in range(client._MAXHEADERS + 1)) + '\r\n' 733 text = ('HTTP/1.1 200 OK\r\n' + headers) 734 s = FakeSocket(text) 735 r = client.HTTPResponse(s) 736 self.assertRaisesRegex(client.HTTPException, 737 r"got more than \d+ headers", r.begin) 738 739 def test_send_file(self): 740 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 741 b'Accept-Encoding: identity\r\n' 742 b'Transfer-Encoding: chunked\r\n' 743 b'\r\n') 744 745 with open(__file__, 'rb') as body: 746 conn = client.HTTPConnection('example.com') 747 sock = FakeSocket(body) 748 conn.sock = sock 749 conn.request('GET', '/foo', body) 750 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 751 (sock.data[:len(expected)], expected)) 752 753 def test_send(self): 754 expected = b'this is a test this is only a test' 755 conn = client.HTTPConnection('example.com') 756 sock = FakeSocket(None) 757 conn.sock = sock 758 conn.send(expected) 759 self.assertEqual(expected, sock.data) 760 sock.data = b'' 761 conn.send(array.array('b', expected)) 762 self.assertEqual(expected, sock.data) 763 sock.data = b'' 764 conn.send(io.BytesIO(expected)) 765 self.assertEqual(expected, sock.data) 766 767 def test_send_updating_file(self): 768 def data(): 769 yield 'data' 770 yield None 771 yield 'data_two' 772 773 class UpdatingFile(io.TextIOBase): 774 mode = 'r' 775 d = data() 776 def read(self, blocksize=-1): 777 return next(self.d) 778 779 expected = b'data' 780 781 conn = client.HTTPConnection('example.com') 782 sock = FakeSocket("") 783 conn.sock = sock 784 conn.send(UpdatingFile()) 785 self.assertEqual(sock.data, expected) 786 787 788 def test_send_iter(self): 789 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 790 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 791 b'\r\nonetwothree' 792 793 def body(): 794 yield b"one" 795 yield b"two" 796 yield b"three" 797 798 conn = client.HTTPConnection('example.com') 799 sock = FakeSocket("") 800 conn.sock = sock 801 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 802 self.assertEqual(sock.data, expected) 803 804 def test_blocksize_request(self): 805 """Check that request() respects the configured block size.""" 806 blocksize = 8 # For easy debugging. 807 conn = client.HTTPConnection('example.com', blocksize=blocksize) 808 sock = FakeSocket(None) 809 conn.sock = sock 810 expected = b"a" * blocksize + b"b" 811 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 812 self.assertEqual(sock.sendall_calls, 3) 813 body = sock.data.split(b"\r\n\r\n", 1)[1] 814 self.assertEqual(body, expected) 815 816 def test_blocksize_send(self): 817 """Check that send() respects the configured block size.""" 818 blocksize = 8 # For easy debugging. 819 conn = client.HTTPConnection('example.com', blocksize=blocksize) 820 sock = FakeSocket(None) 821 conn.sock = sock 822 expected = b"a" * blocksize + b"b" 823 conn.send(io.BytesIO(expected)) 824 self.assertEqual(sock.sendall_calls, 2) 825 self.assertEqual(sock.data, expected) 826 827 def test_send_type_error(self): 828 # See: Issue #12676 829 conn = client.HTTPConnection('example.com') 830 conn.sock = FakeSocket('') 831 with self.assertRaises(TypeError): 832 conn.request('POST', 'test', conn) 833 834 def test_chunked(self): 835 expected = chunked_expected 836 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 837 resp = client.HTTPResponse(sock, method="GET") 838 resp.begin() 839 self.assertEqual(resp.read(), expected) 840 resp.close() 841 842 # Various read sizes 843 for n in range(1, 12): 844 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 845 resp = client.HTTPResponse(sock, method="GET") 846 resp.begin() 847 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 848 resp.close() 849 850 for x in ('', 'foo\r\n'): 851 sock = FakeSocket(chunked_start + x) 852 resp = client.HTTPResponse(sock, method="GET") 853 resp.begin() 854 try: 855 resp.read() 856 except client.IncompleteRead as i: 857 self.assertEqual(i.partial, expected) 858 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 859 self.assertEqual(repr(i), expected_message) 860 self.assertEqual(str(i), expected_message) 861 else: 862 self.fail('IncompleteRead expected') 863 finally: 864 resp.close() 865 866 def test_readinto_chunked(self): 867 868 expected = chunked_expected 869 nexpected = len(expected) 870 b = bytearray(128) 871 872 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 873 resp = client.HTTPResponse(sock, method="GET") 874 resp.begin() 875 n = resp.readinto(b) 876 self.assertEqual(b[:nexpected], expected) 877 self.assertEqual(n, nexpected) 878 resp.close() 879 880 # Various read sizes 881 for n in range(1, 12): 882 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 883 resp = client.HTTPResponse(sock, method="GET") 884 resp.begin() 885 m = memoryview(b) 886 i = resp.readinto(m[0:n]) 887 i += resp.readinto(m[i:n + i]) 888 i += resp.readinto(m[i:]) 889 self.assertEqual(b[:nexpected], expected) 890 self.assertEqual(i, nexpected) 891 resp.close() 892 893 for x in ('', 'foo\r\n'): 894 sock = FakeSocket(chunked_start + x) 895 resp = client.HTTPResponse(sock, method="GET") 896 resp.begin() 897 try: 898 n = resp.readinto(b) 899 except client.IncompleteRead as i: 900 self.assertEqual(i.partial, expected) 901 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 902 self.assertEqual(repr(i), expected_message) 903 self.assertEqual(str(i), expected_message) 904 else: 905 self.fail('IncompleteRead expected') 906 finally: 907 resp.close() 908 909 def test_chunked_head(self): 910 chunked_start = ( 911 'HTTP/1.1 200 OK\r\n' 912 'Transfer-Encoding: chunked\r\n\r\n' 913 'a\r\n' 914 'hello world\r\n' 915 '1\r\n' 916 'd\r\n' 917 ) 918 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 919 resp = client.HTTPResponse(sock, method="HEAD") 920 resp.begin() 921 self.assertEqual(resp.read(), b'') 922 self.assertEqual(resp.status, 200) 923 self.assertEqual(resp.reason, 'OK') 924 self.assertTrue(resp.isclosed()) 925 self.assertFalse(resp.closed) 926 resp.close() 927 self.assertTrue(resp.closed) 928 929 def test_readinto_chunked_head(self): 930 chunked_start = ( 931 'HTTP/1.1 200 OK\r\n' 932 'Transfer-Encoding: chunked\r\n\r\n' 933 'a\r\n' 934 'hello world\r\n' 935 '1\r\n' 936 'd\r\n' 937 ) 938 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 939 resp = client.HTTPResponse(sock, method="HEAD") 940 resp.begin() 941 b = bytearray(5) 942 n = resp.readinto(b) 943 self.assertEqual(n, 0) 944 self.assertEqual(bytes(b), b'\x00'*5) 945 self.assertEqual(resp.status, 200) 946 self.assertEqual(resp.reason, 'OK') 947 self.assertTrue(resp.isclosed()) 948 self.assertFalse(resp.closed) 949 resp.close() 950 self.assertTrue(resp.closed) 951 952 def test_negative_content_length(self): 953 sock = FakeSocket( 954 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 955 resp = client.HTTPResponse(sock, method="GET") 956 resp.begin() 957 self.assertEqual(resp.read(), b'Hello\r\n') 958 self.assertTrue(resp.isclosed()) 959 960 def test_incomplete_read(self): 961 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 962 resp = client.HTTPResponse(sock, method="GET") 963 resp.begin() 964 try: 965 resp.read() 966 except client.IncompleteRead as i: 967 self.assertEqual(i.partial, b'Hello\r\n') 968 self.assertEqual(repr(i), 969 "IncompleteRead(7 bytes read, 3 more expected)") 970 self.assertEqual(str(i), 971 "IncompleteRead(7 bytes read, 3 more expected)") 972 self.assertTrue(resp.isclosed()) 973 else: 974 self.fail('IncompleteRead expected') 975 976 def test_epipe(self): 977 sock = EPipeSocket( 978 "HTTP/1.0 401 Authorization Required\r\n" 979 "Content-type: text/html\r\n" 980 "WWW-Authenticate: Basic realm=\"example\"\r\n", 981 b"Content-Length") 982 conn = client.HTTPConnection("example.com") 983 conn.sock = sock 984 self.assertRaises(OSError, 985 lambda: conn.request("PUT", "/url", "body")) 986 resp = conn.getresponse() 987 self.assertEqual(401, resp.status) 988 self.assertEqual("Basic realm=\"example\"", 989 resp.getheader("www-authenticate")) 990 991 # Test lines overflowing the max line size (_MAXLINE in http.client) 992 993 def test_overflowing_status_line(self): 994 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 995 resp = client.HTTPResponse(FakeSocket(body)) 996 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 997 998 def test_overflowing_header_line(self): 999 body = ( 1000 'HTTP/1.1 200 OK\r\n' 1001 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 1002 ) 1003 resp = client.HTTPResponse(FakeSocket(body)) 1004 self.assertRaises(client.LineTooLong, resp.begin) 1005 1006 def test_overflowing_header_limit_after_100(self): 1007 body = ( 1008 'HTTP/1.1 100 OK\r\n' 1009 'r\n' * 32768 1010 ) 1011 resp = client.HTTPResponse(FakeSocket(body)) 1012 with self.assertRaises(client.HTTPException) as cm: 1013 resp.begin() 1014 # We must assert more because other reasonable errors that we 1015 # do not want can also be HTTPException derived. 1016 self.assertIn('got more than ', str(cm.exception)) 1017 self.assertIn('headers', str(cm.exception)) 1018 1019 def test_overflowing_chunked_line(self): 1020 body = ( 1021 'HTTP/1.1 200 OK\r\n' 1022 'Transfer-Encoding: chunked\r\n\r\n' 1023 + '0' * 65536 + 'a\r\n' 1024 'hello world\r\n' 1025 '0\r\n' 1026 '\r\n' 1027 ) 1028 resp = client.HTTPResponse(FakeSocket(body)) 1029 resp.begin() 1030 self.assertRaises(client.LineTooLong, resp.read) 1031 1032 def test_early_eof(self): 1033 # Test httpresponse with no \r\n termination, 1034 body = "HTTP/1.1 200 Ok" 1035 sock = FakeSocket(body) 1036 resp = client.HTTPResponse(sock) 1037 resp.begin() 1038 self.assertEqual(resp.read(), b'') 1039 self.assertTrue(resp.isclosed()) 1040 self.assertFalse(resp.closed) 1041 resp.close() 1042 self.assertTrue(resp.closed) 1043 1044 def test_error_leak(self): 1045 # Test that the socket is not leaked if getresponse() fails 1046 conn = client.HTTPConnection('example.com') 1047 response = None 1048 class Response(client.HTTPResponse): 1049 def __init__(self, *pos, **kw): 1050 nonlocal response 1051 response = self # Avoid garbage collector closing the socket 1052 client.HTTPResponse.__init__(self, *pos, **kw) 1053 conn.response_class = Response 1054 conn.sock = FakeSocket('Invalid status line') 1055 conn.request('GET', '/') 1056 self.assertRaises(client.BadStatusLine, conn.getresponse) 1057 self.assertTrue(response.closed) 1058 self.assertTrue(conn.sock.file_closed) 1059 1060 def test_chunked_extension(self): 1061 extra = '3;foo=bar\r\n' + 'abc\r\n' 1062 expected = chunked_expected + b'abc' 1063 1064 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1065 resp = client.HTTPResponse(sock, method="GET") 1066 resp.begin() 1067 self.assertEqual(resp.read(), expected) 1068 resp.close() 1069 1070 def test_chunked_missing_end(self): 1071 """some servers may serve up a short chunked encoding stream""" 1072 expected = chunked_expected 1073 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1074 resp = client.HTTPResponse(sock, method="GET") 1075 resp.begin() 1076 self.assertEqual(resp.read(), expected) 1077 resp.close() 1078 1079 def test_chunked_trailers(self): 1080 """See that trailers are read and ignored""" 1081 expected = chunked_expected 1082 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1083 resp = client.HTTPResponse(sock, method="GET") 1084 resp.begin() 1085 self.assertEqual(resp.read(), expected) 1086 # we should have reached the end of the file 1087 self.assertEqual(sock.file.read(), b"") #we read to the end 1088 resp.close() 1089 1090 def test_chunked_sync(self): 1091 """Check that we don't read past the end of the chunked-encoding stream""" 1092 expected = chunked_expected 1093 extradata = "extradata" 1094 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1095 resp = client.HTTPResponse(sock, method="GET") 1096 resp.begin() 1097 self.assertEqual(resp.read(), expected) 1098 # the file should now have our extradata ready to be read 1099 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1100 resp.close() 1101 1102 def test_content_length_sync(self): 1103 """Check that we don't read past the end of the Content-Length stream""" 1104 extradata = b"extradata" 1105 expected = b"Hello123\r\n" 1106 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1107 resp = client.HTTPResponse(sock, method="GET") 1108 resp.begin() 1109 self.assertEqual(resp.read(), expected) 1110 # the file should now have our extradata ready to be read 1111 self.assertEqual(sock.file.read(), extradata) #we read to the end 1112 resp.close() 1113 1114 def test_readlines_content_length(self): 1115 extradata = b"extradata" 1116 expected = b"Hello123\r\n" 1117 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1118 resp = client.HTTPResponse(sock, method="GET") 1119 resp.begin() 1120 self.assertEqual(resp.readlines(2000), [expected]) 1121 # the file should now have our extradata ready to be read 1122 self.assertEqual(sock.file.read(), extradata) #we read to the end 1123 resp.close() 1124 1125 def test_read1_content_length(self): 1126 extradata = b"extradata" 1127 expected = b"Hello123\r\n" 1128 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1129 resp = client.HTTPResponse(sock, method="GET") 1130 resp.begin() 1131 self.assertEqual(resp.read1(2000), expected) 1132 # the file should now have our extradata ready to be read 1133 self.assertEqual(sock.file.read(), extradata) #we read to the end 1134 resp.close() 1135 1136 def test_readline_bound_content_length(self): 1137 extradata = b"extradata" 1138 expected = b"Hello123\r\n" 1139 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1140 resp = client.HTTPResponse(sock, method="GET") 1141 resp.begin() 1142 self.assertEqual(resp.readline(10), expected) 1143 self.assertEqual(resp.readline(10), b"") 1144 # the file should now have our extradata ready to be read 1145 self.assertEqual(sock.file.read(), extradata) #we read to the end 1146 resp.close() 1147 1148 def test_read1_bound_content_length(self): 1149 extradata = b"extradata" 1150 expected = b"Hello123\r\n" 1151 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1152 resp = client.HTTPResponse(sock, method="GET") 1153 resp.begin() 1154 self.assertEqual(resp.read1(20), expected*2) 1155 self.assertEqual(resp.read(), expected) 1156 # the file should now have our extradata ready to be read 1157 self.assertEqual(sock.file.read(), extradata) #we read to the end 1158 resp.close() 1159 1160 def test_response_fileno(self): 1161 # Make sure fd returned by fileno is valid. 1162 serv = socket.create_server((HOST, 0)) 1163 self.addCleanup(serv.close) 1164 1165 result = None 1166 def run_server(): 1167 [conn, address] = serv.accept() 1168 with conn, conn.makefile("rb") as reader: 1169 # Read the request header until a blank line 1170 while True: 1171 line = reader.readline() 1172 if not line.rstrip(b"\r\n"): 1173 break 1174 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1175 nonlocal result 1176 result = reader.read() 1177 1178 thread = threading.Thread(target=run_server) 1179 thread.start() 1180 self.addCleanup(thread.join, float(1)) 1181 conn = client.HTTPConnection(*serv.getsockname()) 1182 conn.request("CONNECT", "dummy:1234") 1183 response = conn.getresponse() 1184 try: 1185 self.assertEqual(response.status, client.OK) 1186 s = socket.socket(fileno=response.fileno()) 1187 try: 1188 s.sendall(b"proxied data\n") 1189 finally: 1190 s.detach() 1191 finally: 1192 response.close() 1193 conn.close() 1194 thread.join() 1195 self.assertEqual(result, b"proxied data\n") 1196 1197 def test_putrequest_override_domain_validation(self): 1198 """ 1199 It should be possible to override the default validation 1200 behavior in putrequest (bpo-38216). 1201 """ 1202 class UnsafeHTTPConnection(client.HTTPConnection): 1203 def _validate_path(self, url): 1204 pass 1205 1206 conn = UnsafeHTTPConnection('example.com') 1207 conn.sock = FakeSocket('') 1208 conn.putrequest('GET', '/\x00') 1209 1210 def test_putrequest_override_host_validation(self): 1211 class UnsafeHTTPConnection(client.HTTPConnection): 1212 def _validate_host(self, url): 1213 pass 1214 1215 conn = UnsafeHTTPConnection('example.com\r\n') 1216 conn.sock = FakeSocket('') 1217 # set skip_host so a ValueError is not raised upon adding the 1218 # invalid URL as the value of the "Host:" header 1219 conn.putrequest('GET', '/', skip_host=1) 1220 1221 def test_putrequest_override_encoding(self): 1222 """ 1223 It should be possible to override the default encoding 1224 to transmit bytes in another encoding even if invalid 1225 (bpo-36274). 1226 """ 1227 class UnsafeHTTPConnection(client.HTTPConnection): 1228 def _encode_request(self, str_url): 1229 return str_url.encode('utf-8') 1230 1231 conn = UnsafeHTTPConnection('example.com') 1232 conn.sock = FakeSocket('') 1233 conn.putrequest('GET', '/☃') 1234 1235 1236class ExtendedReadTest(TestCase): 1237 """ 1238 Test peek(), read1(), readline() 1239 """ 1240 lines = ( 1241 'HTTP/1.1 200 OK\r\n' 1242 '\r\n' 1243 'hello world!\n' 1244 'and now \n' 1245 'for something completely different\n' 1246 'foo' 1247 ) 1248 lines_expected = lines[lines.find('hello'):].encode("ascii") 1249 lines_chunked = ( 1250 'HTTP/1.1 200 OK\r\n' 1251 'Transfer-Encoding: chunked\r\n\r\n' 1252 'a\r\n' 1253 'hello worl\r\n' 1254 '3\r\n' 1255 'd!\n\r\n' 1256 '9\r\n' 1257 'and now \n\r\n' 1258 '23\r\n' 1259 'for something completely different\n\r\n' 1260 '3\r\n' 1261 'foo\r\n' 1262 '0\r\n' # terminating chunk 1263 '\r\n' # end of trailers 1264 ) 1265 1266 def setUp(self): 1267 sock = FakeSocket(self.lines) 1268 resp = client.HTTPResponse(sock, method="GET") 1269 resp.begin() 1270 resp.fp = io.BufferedReader(resp.fp) 1271 self.resp = resp 1272 1273 1274 1275 def test_peek(self): 1276 resp = self.resp 1277 # patch up the buffered peek so that it returns not too much stuff 1278 oldpeek = resp.fp.peek 1279 def mypeek(n=-1): 1280 p = oldpeek(n) 1281 if n >= 0: 1282 return p[:n] 1283 return p[:10] 1284 resp.fp.peek = mypeek 1285 1286 all = [] 1287 while True: 1288 # try a short peek 1289 p = resp.peek(3) 1290 if p: 1291 self.assertGreater(len(p), 0) 1292 # then unbounded peek 1293 p2 = resp.peek() 1294 self.assertGreaterEqual(len(p2), len(p)) 1295 self.assertTrue(p2.startswith(p)) 1296 next = resp.read(len(p2)) 1297 self.assertEqual(next, p2) 1298 else: 1299 next = resp.read() 1300 self.assertFalse(next) 1301 all.append(next) 1302 if not next: 1303 break 1304 self.assertEqual(b"".join(all), self.lines_expected) 1305 1306 def test_readline(self): 1307 resp = self.resp 1308 self._verify_readline(self.resp.readline, self.lines_expected) 1309 1310 def _verify_readline(self, readline, expected): 1311 all = [] 1312 while True: 1313 # short readlines 1314 line = readline(5) 1315 if line and line != b"foo": 1316 if len(line) < 5: 1317 self.assertTrue(line.endswith(b"\n")) 1318 all.append(line) 1319 if not line: 1320 break 1321 self.assertEqual(b"".join(all), expected) 1322 1323 def test_read1(self): 1324 resp = self.resp 1325 def r(): 1326 res = resp.read1(4) 1327 self.assertLessEqual(len(res), 4) 1328 return res 1329 readliner = Readliner(r) 1330 self._verify_readline(readliner.readline, self.lines_expected) 1331 1332 def test_read1_unbounded(self): 1333 resp = self.resp 1334 all = [] 1335 while True: 1336 data = resp.read1() 1337 if not data: 1338 break 1339 all.append(data) 1340 self.assertEqual(b"".join(all), self.lines_expected) 1341 1342 def test_read1_bounded(self): 1343 resp = self.resp 1344 all = [] 1345 while True: 1346 data = resp.read1(10) 1347 if not data: 1348 break 1349 self.assertLessEqual(len(data), 10) 1350 all.append(data) 1351 self.assertEqual(b"".join(all), self.lines_expected) 1352 1353 def test_read1_0(self): 1354 self.assertEqual(self.resp.read1(0), b"") 1355 1356 def test_peek_0(self): 1357 p = self.resp.peek(0) 1358 self.assertLessEqual(0, len(p)) 1359 1360 1361class ExtendedReadTestChunked(ExtendedReadTest): 1362 """ 1363 Test peek(), read1(), readline() in chunked mode 1364 """ 1365 lines = ( 1366 'HTTP/1.1 200 OK\r\n' 1367 'Transfer-Encoding: chunked\r\n\r\n' 1368 'a\r\n' 1369 'hello worl\r\n' 1370 '3\r\n' 1371 'd!\n\r\n' 1372 '9\r\n' 1373 'and now \n\r\n' 1374 '23\r\n' 1375 'for something completely different\n\r\n' 1376 '3\r\n' 1377 'foo\r\n' 1378 '0\r\n' # terminating chunk 1379 '\r\n' # end of trailers 1380 ) 1381 1382 1383class Readliner: 1384 """ 1385 a simple readline class that uses an arbitrary read function and buffering 1386 """ 1387 def __init__(self, readfunc): 1388 self.readfunc = readfunc 1389 self.remainder = b"" 1390 1391 def readline(self, limit): 1392 data = [] 1393 datalen = 0 1394 read = self.remainder 1395 try: 1396 while True: 1397 idx = read.find(b'\n') 1398 if idx != -1: 1399 break 1400 if datalen + len(read) >= limit: 1401 idx = limit - datalen - 1 1402 # read more data 1403 data.append(read) 1404 read = self.readfunc() 1405 if not read: 1406 idx = 0 #eof condition 1407 break 1408 idx += 1 1409 data.append(read[:idx]) 1410 self.remainder = read[idx:] 1411 return b"".join(data) 1412 except: 1413 self.remainder = b"".join(data) 1414 raise 1415 1416 1417class OfflineTest(TestCase): 1418 def test_all(self): 1419 # Documented objects defined in the module should be in __all__ 1420 expected = {"responses"} # Allowlist documented dict() object 1421 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1422 # intentionally omitted for simplicity 1423 blacklist = {"HTTPMessage", "parse_headers"} 1424 for name in dir(client): 1425 if name.startswith("_") or name in blacklist: 1426 continue 1427 module_object = getattr(client, name) 1428 if getattr(module_object, "__module__", None) == "http.client": 1429 expected.add(name) 1430 self.assertCountEqual(client.__all__, expected) 1431 1432 def test_responses(self): 1433 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1434 1435 def test_client_constants(self): 1436 # Make sure we don't break backward compatibility with 3.4 1437 expected = [ 1438 'CONTINUE', 1439 'SWITCHING_PROTOCOLS', 1440 'PROCESSING', 1441 'OK', 1442 'CREATED', 1443 'ACCEPTED', 1444 'NON_AUTHORITATIVE_INFORMATION', 1445 'NO_CONTENT', 1446 'RESET_CONTENT', 1447 'PARTIAL_CONTENT', 1448 'MULTI_STATUS', 1449 'IM_USED', 1450 'MULTIPLE_CHOICES', 1451 'MOVED_PERMANENTLY', 1452 'FOUND', 1453 'SEE_OTHER', 1454 'NOT_MODIFIED', 1455 'USE_PROXY', 1456 'TEMPORARY_REDIRECT', 1457 'BAD_REQUEST', 1458 'UNAUTHORIZED', 1459 'PAYMENT_REQUIRED', 1460 'FORBIDDEN', 1461 'NOT_FOUND', 1462 'METHOD_NOT_ALLOWED', 1463 'NOT_ACCEPTABLE', 1464 'PROXY_AUTHENTICATION_REQUIRED', 1465 'REQUEST_TIMEOUT', 1466 'CONFLICT', 1467 'GONE', 1468 'LENGTH_REQUIRED', 1469 'PRECONDITION_FAILED', 1470 'REQUEST_ENTITY_TOO_LARGE', 1471 'REQUEST_URI_TOO_LONG', 1472 'UNSUPPORTED_MEDIA_TYPE', 1473 'REQUESTED_RANGE_NOT_SATISFIABLE', 1474 'EXPECTATION_FAILED', 1475 'MISDIRECTED_REQUEST', 1476 'UNPROCESSABLE_ENTITY', 1477 'LOCKED', 1478 'FAILED_DEPENDENCY', 1479 'UPGRADE_REQUIRED', 1480 'PRECONDITION_REQUIRED', 1481 'TOO_MANY_REQUESTS', 1482 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1483 'UNAVAILABLE_FOR_LEGAL_REASONS', 1484 'INTERNAL_SERVER_ERROR', 1485 'NOT_IMPLEMENTED', 1486 'BAD_GATEWAY', 1487 'SERVICE_UNAVAILABLE', 1488 'GATEWAY_TIMEOUT', 1489 'HTTP_VERSION_NOT_SUPPORTED', 1490 'INSUFFICIENT_STORAGE', 1491 'NOT_EXTENDED', 1492 'NETWORK_AUTHENTICATION_REQUIRED', 1493 ] 1494 for const in expected: 1495 with self.subTest(constant=const): 1496 self.assertTrue(hasattr(client, const)) 1497 1498 1499class SourceAddressTest(TestCase): 1500 def setUp(self): 1501 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1502 self.port = support.bind_port(self.serv) 1503 self.source_port = support.find_unused_port() 1504 self.serv.listen() 1505 self.conn = None 1506 1507 def tearDown(self): 1508 if self.conn: 1509 self.conn.close() 1510 self.conn = None 1511 self.serv.close() 1512 self.serv = None 1513 1514 def testHTTPConnectionSourceAddress(self): 1515 self.conn = client.HTTPConnection(HOST, self.port, 1516 source_address=('', self.source_port)) 1517 self.conn.connect() 1518 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1519 1520 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1521 'http.client.HTTPSConnection not defined') 1522 def testHTTPSConnectionSourceAddress(self): 1523 self.conn = client.HTTPSConnection(HOST, self.port, 1524 source_address=('', self.source_port)) 1525 # We don't test anything here other than the constructor not barfing as 1526 # this code doesn't deal with setting up an active running SSL server 1527 # for an ssl_wrapped connect() to actually return from. 1528 1529 1530class TimeoutTest(TestCase): 1531 PORT = None 1532 1533 def setUp(self): 1534 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1535 TimeoutTest.PORT = support.bind_port(self.serv) 1536 self.serv.listen() 1537 1538 def tearDown(self): 1539 self.serv.close() 1540 self.serv = None 1541 1542 def testTimeoutAttribute(self): 1543 # This will prove that the timeout gets through HTTPConnection 1544 # and into the socket. 1545 1546 # default -- use global socket timeout 1547 self.assertIsNone(socket.getdefaulttimeout()) 1548 socket.setdefaulttimeout(30) 1549 try: 1550 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1551 httpConn.connect() 1552 finally: 1553 socket.setdefaulttimeout(None) 1554 self.assertEqual(httpConn.sock.gettimeout(), 30) 1555 httpConn.close() 1556 1557 # no timeout -- do not use global socket default 1558 self.assertIsNone(socket.getdefaulttimeout()) 1559 socket.setdefaulttimeout(30) 1560 try: 1561 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1562 timeout=None) 1563 httpConn.connect() 1564 finally: 1565 socket.setdefaulttimeout(None) 1566 self.assertEqual(httpConn.sock.gettimeout(), None) 1567 httpConn.close() 1568 1569 # a value 1570 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1571 httpConn.connect() 1572 self.assertEqual(httpConn.sock.gettimeout(), 30) 1573 httpConn.close() 1574 1575 1576class PersistenceTest(TestCase): 1577 1578 def test_reuse_reconnect(self): 1579 # Should reuse or reconnect depending on header from server 1580 tests = ( 1581 ('1.0', '', False), 1582 ('1.0', 'Connection: keep-alive\r\n', True), 1583 ('1.1', '', True), 1584 ('1.1', 'Connection: close\r\n', False), 1585 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1586 ('1.1', 'Connection: cloSE\r\n', False), 1587 ) 1588 for version, header, reuse in tests: 1589 with self.subTest(version=version, header=header): 1590 msg = ( 1591 'HTTP/{} 200 OK\r\n' 1592 '{}' 1593 'Content-Length: 12\r\n' 1594 '\r\n' 1595 'Dummy body\r\n' 1596 ).format(version, header) 1597 conn = FakeSocketHTTPConnection(msg) 1598 self.assertIsNone(conn.sock) 1599 conn.request('GET', '/open-connection') 1600 with conn.getresponse() as response: 1601 self.assertEqual(conn.sock is None, not reuse) 1602 response.read() 1603 self.assertEqual(conn.sock is None, not reuse) 1604 self.assertEqual(conn.connections, 1) 1605 conn.request('GET', '/subsequent-request') 1606 self.assertEqual(conn.connections, 1 if reuse else 2) 1607 1608 def test_disconnected(self): 1609 1610 def make_reset_reader(text): 1611 """Return BufferedReader that raises ECONNRESET at EOF""" 1612 stream = io.BytesIO(text) 1613 def readinto(buffer): 1614 size = io.BytesIO.readinto(stream, buffer) 1615 if size == 0: 1616 raise ConnectionResetError() 1617 return size 1618 stream.readinto = readinto 1619 return io.BufferedReader(stream) 1620 1621 tests = ( 1622 (io.BytesIO, client.RemoteDisconnected), 1623 (make_reset_reader, ConnectionResetError), 1624 ) 1625 for stream_factory, exception in tests: 1626 with self.subTest(exception=exception): 1627 conn = FakeSocketHTTPConnection(b'', stream_factory) 1628 conn.request('GET', '/eof-response') 1629 self.assertRaises(exception, conn.getresponse) 1630 self.assertIsNone(conn.sock) 1631 # HTTPConnection.connect() should be automatically invoked 1632 conn.request('GET', '/reconnect') 1633 self.assertEqual(conn.connections, 2) 1634 1635 def test_100_close(self): 1636 conn = FakeSocketHTTPConnection( 1637 b'HTTP/1.1 100 Continue\r\n' 1638 b'\r\n' 1639 # Missing final response 1640 ) 1641 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1642 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1643 self.assertIsNone(conn.sock) 1644 conn.request('GET', '/reconnect') 1645 self.assertEqual(conn.connections, 2) 1646 1647 1648class HTTPSTest(TestCase): 1649 1650 def setUp(self): 1651 if not hasattr(client, 'HTTPSConnection'): 1652 self.skipTest('ssl support required') 1653 1654 def make_server(self, certfile): 1655 from test.ssl_servers import make_https_server 1656 return make_https_server(self, certfile=certfile) 1657 1658 def test_attributes(self): 1659 # simple test to check it's storing the timeout 1660 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1661 self.assertEqual(h.timeout, 30) 1662 1663 def test_networked(self): 1664 # Default settings: requires a valid cert from a trusted CA 1665 import ssl 1666 support.requires('network') 1667 with support.transient_internet('self-signed.pythontest.net'): 1668 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1669 with self.assertRaises(ssl.SSLError) as exc_info: 1670 h.request('GET', '/') 1671 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1672 1673 def test_networked_noverification(self): 1674 # Switch off cert verification 1675 import ssl 1676 support.requires('network') 1677 with support.transient_internet('self-signed.pythontest.net'): 1678 context = ssl._create_unverified_context() 1679 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1680 context=context) 1681 h.request('GET', '/') 1682 resp = h.getresponse() 1683 h.close() 1684 self.assertIn('nginx', resp.getheader('server')) 1685 resp.close() 1686 1687 @support.system_must_validate_cert 1688 def test_networked_trusted_by_default_cert(self): 1689 # Default settings: requires a valid cert from a trusted CA 1690 support.requires('network') 1691 with support.transient_internet('www.python.org'): 1692 h = client.HTTPSConnection('www.python.org', 443) 1693 h.request('GET', '/') 1694 resp = h.getresponse() 1695 content_type = resp.getheader('content-type') 1696 resp.close() 1697 h.close() 1698 self.assertIn('text/html', content_type) 1699 1700 def test_networked_good_cert(self): 1701 # We feed the server's cert as a validating cert 1702 import ssl 1703 support.requires('network') 1704 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1705 with support.transient_internet(selfsigned_pythontestdotnet): 1706 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1707 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1708 self.assertEqual(context.check_hostname, True) 1709 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1710 try: 1711 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1712 context=context) 1713 h.request('GET', '/') 1714 resp = h.getresponse() 1715 except ssl.SSLError as ssl_err: 1716 ssl_err_str = str(ssl_err) 1717 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1718 # modern Linux distros (Debian Buster, etc) default OpenSSL 1719 # configurations it'll fail saying "key too weak" until we 1720 # address https://bugs.python.org/issue36816 to use a proper 1721 # key size on self-signed.pythontest.net. 1722 if re.search(r'(?i)key.too.weak', ssl_err_str): 1723 raise unittest.SkipTest( 1724 f'Got {ssl_err_str} trying to connect ' 1725 f'to {selfsigned_pythontestdotnet}. ' 1726 'See https://bugs.python.org/issue36816.') 1727 raise 1728 server_string = resp.getheader('server') 1729 resp.close() 1730 h.close() 1731 self.assertIn('nginx', server_string) 1732 1733 def test_networked_bad_cert(self): 1734 # We feed a "CA" cert that is unrelated to the server's cert 1735 import ssl 1736 support.requires('network') 1737 with support.transient_internet('self-signed.pythontest.net'): 1738 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1739 context.load_verify_locations(CERT_localhost) 1740 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1741 with self.assertRaises(ssl.SSLError) as exc_info: 1742 h.request('GET', '/') 1743 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1744 1745 def test_local_unknown_cert(self): 1746 # The custom cert isn't known to the default trust bundle 1747 import ssl 1748 server = self.make_server(CERT_localhost) 1749 h = client.HTTPSConnection('localhost', server.port) 1750 with self.assertRaises(ssl.SSLError) as exc_info: 1751 h.request('GET', '/') 1752 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1753 1754 def test_local_good_hostname(self): 1755 # The (valid) cert validates the HTTP hostname 1756 import ssl 1757 server = self.make_server(CERT_localhost) 1758 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1759 context.load_verify_locations(CERT_localhost) 1760 h = client.HTTPSConnection('localhost', server.port, context=context) 1761 self.addCleanup(h.close) 1762 h.request('GET', '/nonexistent') 1763 resp = h.getresponse() 1764 self.addCleanup(resp.close) 1765 self.assertEqual(resp.status, 404) 1766 1767 def test_local_bad_hostname(self): 1768 # The (valid) cert doesn't validate the HTTP hostname 1769 import ssl 1770 server = self.make_server(CERT_fakehostname) 1771 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1772 context.load_verify_locations(CERT_fakehostname) 1773 h = client.HTTPSConnection('localhost', server.port, context=context) 1774 with self.assertRaises(ssl.CertificateError): 1775 h.request('GET', '/') 1776 # Same with explicit check_hostname=True 1777 with support.check_warnings(('', DeprecationWarning)): 1778 h = client.HTTPSConnection('localhost', server.port, 1779 context=context, check_hostname=True) 1780 with self.assertRaises(ssl.CertificateError): 1781 h.request('GET', '/') 1782 # With check_hostname=False, the mismatching is ignored 1783 context.check_hostname = False 1784 with support.check_warnings(('', DeprecationWarning)): 1785 h = client.HTTPSConnection('localhost', server.port, 1786 context=context, check_hostname=False) 1787 h.request('GET', '/nonexistent') 1788 resp = h.getresponse() 1789 resp.close() 1790 h.close() 1791 self.assertEqual(resp.status, 404) 1792 # The context's check_hostname setting is used if one isn't passed to 1793 # HTTPSConnection. 1794 context.check_hostname = False 1795 h = client.HTTPSConnection('localhost', server.port, context=context) 1796 h.request('GET', '/nonexistent') 1797 resp = h.getresponse() 1798 self.assertEqual(resp.status, 404) 1799 resp.close() 1800 h.close() 1801 # Passing check_hostname to HTTPSConnection should override the 1802 # context's setting. 1803 with support.check_warnings(('', DeprecationWarning)): 1804 h = client.HTTPSConnection('localhost', server.port, 1805 context=context, check_hostname=True) 1806 with self.assertRaises(ssl.CertificateError): 1807 h.request('GET', '/') 1808 1809 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1810 'http.client.HTTPSConnection not available') 1811 def test_host_port(self): 1812 # Check invalid host_port 1813 1814 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1815 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1816 1817 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1818 "fe80::207:e9ff:fe9b", 8000), 1819 ("www.python.org:443", "www.python.org", 443), 1820 ("www.python.org:", "www.python.org", 443), 1821 ("www.python.org", "www.python.org", 443), 1822 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1823 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1824 443)): 1825 c = client.HTTPSConnection(hp) 1826 self.assertEqual(h, c.host) 1827 self.assertEqual(p, c.port) 1828 1829 def test_tls13_pha(self): 1830 import ssl 1831 if not ssl.HAS_TLSv1_3: 1832 self.skipTest('TLS 1.3 support required') 1833 # just check status of PHA flag 1834 h = client.HTTPSConnection('localhost', 443) 1835 self.assertTrue(h._context.post_handshake_auth) 1836 1837 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1838 self.assertFalse(context.post_handshake_auth) 1839 h = client.HTTPSConnection('localhost', 443, context=context) 1840 self.assertIs(h._context, context) 1841 self.assertFalse(h._context.post_handshake_auth) 1842 1843 with warnings.catch_warnings(): 1844 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1845 DeprecationWarning) 1846 h = client.HTTPSConnection('localhost', 443, context=context, 1847 cert_file=CERT_localhost) 1848 self.assertTrue(h._context.post_handshake_auth) 1849 1850 1851class RequestBodyTest(TestCase): 1852 """Test cases where a request includes a message body.""" 1853 1854 def setUp(self): 1855 self.conn = client.HTTPConnection('example.com') 1856 self.conn.sock = self.sock = FakeSocket("") 1857 self.conn.sock = self.sock 1858 1859 def get_headers_and_fp(self): 1860 f = io.BytesIO(self.sock.data) 1861 f.readline() # read the request line 1862 message = client.parse_headers(f) 1863 return message, f 1864 1865 def test_list_body(self): 1866 # Note that no content-length is automatically calculated for 1867 # an iterable. The request will fall back to send chunked 1868 # transfer encoding. 1869 cases = ( 1870 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1871 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1872 ) 1873 for body, expected in cases: 1874 with self.subTest(body): 1875 self.conn = client.HTTPConnection('example.com') 1876 self.conn.sock = self.sock = FakeSocket('') 1877 1878 self.conn.request('PUT', '/url', body) 1879 msg, f = self.get_headers_and_fp() 1880 self.assertNotIn('Content-Type', msg) 1881 self.assertNotIn('Content-Length', msg) 1882 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1883 self.assertEqual(expected, f.read()) 1884 1885 def test_manual_content_length(self): 1886 # Set an incorrect content-length so that we can verify that 1887 # it will not be over-ridden by the library. 1888 self.conn.request("PUT", "/url", "body", 1889 {"Content-Length": "42"}) 1890 message, f = self.get_headers_and_fp() 1891 self.assertEqual("42", message.get("content-length")) 1892 self.assertEqual(4, len(f.read())) 1893 1894 def test_ascii_body(self): 1895 self.conn.request("PUT", "/url", "body") 1896 message, f = self.get_headers_and_fp() 1897 self.assertEqual("text/plain", message.get_content_type()) 1898 self.assertIsNone(message.get_charset()) 1899 self.assertEqual("4", message.get("content-length")) 1900 self.assertEqual(b'body', f.read()) 1901 1902 def test_latin1_body(self): 1903 self.conn.request("PUT", "/url", "body\xc1") 1904 message, f = self.get_headers_and_fp() 1905 self.assertEqual("text/plain", message.get_content_type()) 1906 self.assertIsNone(message.get_charset()) 1907 self.assertEqual("5", message.get("content-length")) 1908 self.assertEqual(b'body\xc1', f.read()) 1909 1910 def test_bytes_body(self): 1911 self.conn.request("PUT", "/url", b"body\xc1") 1912 message, f = self.get_headers_and_fp() 1913 self.assertEqual("text/plain", message.get_content_type()) 1914 self.assertIsNone(message.get_charset()) 1915 self.assertEqual("5", message.get("content-length")) 1916 self.assertEqual(b'body\xc1', f.read()) 1917 1918 def test_text_file_body(self): 1919 self.addCleanup(support.unlink, support.TESTFN) 1920 with open(support.TESTFN, "w") as f: 1921 f.write("body") 1922 with open(support.TESTFN) as f: 1923 self.conn.request("PUT", "/url", f) 1924 message, f = self.get_headers_and_fp() 1925 self.assertEqual("text/plain", message.get_content_type()) 1926 self.assertIsNone(message.get_charset()) 1927 # No content-length will be determined for files; the body 1928 # will be sent using chunked transfer encoding instead. 1929 self.assertIsNone(message.get("content-length")) 1930 self.assertEqual("chunked", message.get("transfer-encoding")) 1931 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1932 1933 def test_binary_file_body(self): 1934 self.addCleanup(support.unlink, support.TESTFN) 1935 with open(support.TESTFN, "wb") as f: 1936 f.write(b"body\xc1") 1937 with open(support.TESTFN, "rb") as f: 1938 self.conn.request("PUT", "/url", f) 1939 message, f = self.get_headers_and_fp() 1940 self.assertEqual("text/plain", message.get_content_type()) 1941 self.assertIsNone(message.get_charset()) 1942 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1943 self.assertNotIn("Content-Length", message) 1944 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1945 1946 1947class HTTPResponseTest(TestCase): 1948 1949 def setUp(self): 1950 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1951 second-value\r\n\r\nText" 1952 sock = FakeSocket(body) 1953 self.resp = client.HTTPResponse(sock) 1954 self.resp.begin() 1955 1956 def test_getting_header(self): 1957 header = self.resp.getheader('My-Header') 1958 self.assertEqual(header, 'first-value, second-value') 1959 1960 header = self.resp.getheader('My-Header', 'some default') 1961 self.assertEqual(header, 'first-value, second-value') 1962 1963 def test_getting_nonexistent_header_with_string_default(self): 1964 header = self.resp.getheader('No-Such-Header', 'default-value') 1965 self.assertEqual(header, 'default-value') 1966 1967 def test_getting_nonexistent_header_with_iterable_default(self): 1968 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1969 self.assertEqual(header, 'default, values') 1970 1971 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1972 self.assertEqual(header, 'default, values') 1973 1974 def test_getting_nonexistent_header_without_default(self): 1975 header = self.resp.getheader('No-Such-Header') 1976 self.assertEqual(header, None) 1977 1978 def test_getting_header_defaultint(self): 1979 header = self.resp.getheader('No-Such-Header',default=42) 1980 self.assertEqual(header, 42) 1981 1982class TunnelTests(TestCase): 1983 def setUp(self): 1984 response_text = ( 1985 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1986 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1987 'Content-Length: 42\r\n\r\n' 1988 ) 1989 self.host = 'proxy.com' 1990 self.conn = client.HTTPConnection(self.host) 1991 self.conn._create_connection = self._create_connection(response_text) 1992 1993 def tearDown(self): 1994 self.conn.close() 1995 1996 def _create_connection(self, response_text): 1997 def create_connection(address, timeout=None, source_address=None): 1998 return FakeSocket(response_text, host=address[0], port=address[1]) 1999 return create_connection 2000 2001 def test_set_tunnel_host_port_headers(self): 2002 tunnel_host = 'destination.com' 2003 tunnel_port = 8888 2004 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 2005 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 2006 headers=tunnel_headers) 2007 self.conn.request('HEAD', '/', '') 2008 self.assertEqual(self.conn.sock.host, self.host) 2009 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2010 self.assertEqual(self.conn._tunnel_host, tunnel_host) 2011 self.assertEqual(self.conn._tunnel_port, tunnel_port) 2012 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 2013 2014 def test_disallow_set_tunnel_after_connect(self): 2015 # Once connected, we shouldn't be able to tunnel anymore 2016 self.conn.connect() 2017 self.assertRaises(RuntimeError, self.conn.set_tunnel, 2018 'destination.com') 2019 2020 def test_connect_with_tunnel(self): 2021 self.conn.set_tunnel('destination.com') 2022 self.conn.request('HEAD', '/', '') 2023 self.assertEqual(self.conn.sock.host, self.host) 2024 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2025 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2026 # issue22095 2027 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 2028 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2029 2030 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 2031 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 2032 2033 def test_connect_put_request(self): 2034 self.conn.set_tunnel('destination.com') 2035 self.conn.request('PUT', '/', '') 2036 self.assertEqual(self.conn.sock.host, self.host) 2037 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2038 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2039 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2040 2041 def test_tunnel_debuglog(self): 2042 expected_header = 'X-Dummy: 1' 2043 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 2044 2045 self.conn.set_debuglevel(1) 2046 self.conn._create_connection = self._create_connection(response_text) 2047 self.conn.set_tunnel('destination.com') 2048 2049 with support.captured_stdout() as output: 2050 self.conn.request('PUT', '/', '') 2051 lines = output.getvalue().splitlines() 2052 self.assertIn('header: {}'.format(expected_header), lines) 2053 2054 2055if __name__ == '__main__': 2056 unittest.main(verbosity=2) 2057