1# 2# Copyright 2014 Facebook 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""Client and server implementations of HTTP/1.x. 17 18.. versionadded:: 4.0 19""" 20 21import asyncio 22import logging 23import re 24import types 25 26from tornado.concurrent import ( 27 Future, 28 future_add_done_callback, 29 future_set_result_unless_cancelled, 30) 31from tornado.escape import native_str, utf8 32from tornado import gen 33from tornado import httputil 34from tornado import iostream 35from tornado.log import gen_log, app_log 36from tornado.util import GzipDecompressor 37 38 39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple 40 41 42class _QuietException(Exception): 43 def __init__(self) -> None: 44 pass 45 46 47class _ExceptionLoggingContext(object): 48 """Used with the ``with`` statement when calling delegate methods to 49 log any exceptions with the given logger. Any exceptions caught are 50 converted to _QuietException 51 """ 52 53 def __init__(self, logger: logging.Logger) -> None: 54 self.logger = logger 55 56 def __enter__(self) -> None: 57 pass 58 59 def __exit__( 60 self, 61 typ: "Optional[Type[BaseException]]", 62 value: Optional[BaseException], 63 tb: types.TracebackType, 64 ) -> None: 65 if value is not None: 66 assert typ is not None 67 self.logger.error("Uncaught exception", exc_info=(typ, value, tb)) 68 raise _QuietException 69 70 71class HTTP1ConnectionParameters(object): 72 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`. 73 """ 74 75 def __init__( 76 self, 77 no_keep_alive: bool = False, 78 chunk_size: Optional[int] = None, 79 max_header_size: Optional[int] = None, 80 header_timeout: Optional[float] = None, 81 max_body_size: Optional[int] = None, 82 body_timeout: Optional[float] = None, 83 decompress: bool = False, 84 ) -> None: 85 """ 86 :arg bool no_keep_alive: If true, always close the connection after 87 one request. 88 :arg int chunk_size: how much data to read into memory at once 89 :arg int max_header_size: maximum amount of data for HTTP headers 90 :arg float header_timeout: how long to wait for all headers (seconds) 91 :arg int max_body_size: maximum amount of data for body 92 :arg float body_timeout: how long to wait while reading body (seconds) 93 :arg bool decompress: if true, decode incoming 94 ``Content-Encoding: gzip`` 95 """ 96 self.no_keep_alive = no_keep_alive 97 self.chunk_size = chunk_size or 65536 98 self.max_header_size = max_header_size or 65536 99 self.header_timeout = header_timeout 100 self.max_body_size = max_body_size 101 self.body_timeout = body_timeout 102 self.decompress = decompress 103 104 105class HTTP1Connection(httputil.HTTPConnection): 106 """Implements the HTTP/1.x protocol. 107 108 This class can be on its own for clients, or via `HTTP1ServerConnection` 109 for servers. 110 """ 111 112 def __init__( 113 self, 114 stream: iostream.IOStream, 115 is_client: bool, 116 params: Optional[HTTP1ConnectionParameters] = None, 117 context: Optional[object] = None, 118 ) -> None: 119 """ 120 :arg stream: an `.IOStream` 121 :arg bool is_client: client or server 122 :arg params: a `.HTTP1ConnectionParameters` instance or ``None`` 123 :arg context: an opaque application-defined object that can be accessed 124 as ``connection.context``. 125 """ 126 self.is_client = is_client 127 self.stream = stream 128 if params is None: 129 params = HTTP1ConnectionParameters() 130 self.params = params 131 self.context = context 132 self.no_keep_alive = params.no_keep_alive 133 # The body limits can be altered by the delegate, so save them 134 # here instead of just referencing self.params later. 135 self._max_body_size = self.params.max_body_size or self.stream.max_buffer_size 136 self._body_timeout = self.params.body_timeout 137 # _write_finished is set to True when finish() has been called, 138 # i.e. there will be no more data sent. Data may still be in the 139 # stream's write buffer. 140 self._write_finished = False 141 # True when we have read the entire incoming body. 142 self._read_finished = False 143 # _finish_future resolves when all data has been written and flushed 144 # to the IOStream. 145 self._finish_future = Future() # type: Future[None] 146 # If true, the connection should be closed after this request 147 # (after the response has been written in the server side, 148 # and after it has been read in the client) 149 self._disconnect_on_finish = False 150 self._clear_callbacks() 151 # Save the start lines after we read or write them; they 152 # affect later processing (e.g. 304 responses and HEAD methods 153 # have content-length but no bodies) 154 self._request_start_line = None # type: Optional[httputil.RequestStartLine] 155 self._response_start_line = None # type: Optional[httputil.ResponseStartLine] 156 self._request_headers = None # type: Optional[httputil.HTTPHeaders] 157 # True if we are writing output with chunked encoding. 158 self._chunking_output = False 159 # While reading a body with a content-length, this is the 160 # amount left to read. 161 self._expected_content_remaining = None # type: Optional[int] 162 # A Future for our outgoing writes, returned by IOStream.write. 163 self._pending_write = None # type: Optional[Future[None]] 164 165 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]: 166 """Read a single HTTP response. 167 168 Typical client-mode usage is to write a request using `write_headers`, 169 `write`, and `finish`, and then call ``read_response``. 170 171 :arg delegate: a `.HTTPMessageDelegate` 172 173 Returns a `.Future` that resolves to a bool after the full response has 174 been read. The result is true if the stream is still open. 175 """ 176 if self.params.decompress: 177 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) 178 return self._read_message(delegate) 179 180 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool: 181 need_delegate_close = False 182 try: 183 header_future = self.stream.read_until_regex( 184 b"\r?\n\r?\n", max_bytes=self.params.max_header_size 185 ) 186 if self.params.header_timeout is None: 187 header_data = await header_future 188 else: 189 try: 190 header_data = await gen.with_timeout( 191 self.stream.io_loop.time() + self.params.header_timeout, 192 header_future, 193 quiet_exceptions=iostream.StreamClosedError, 194 ) 195 except gen.TimeoutError: 196 self.close() 197 return False 198 start_line_str, headers = self._parse_headers(header_data) 199 if self.is_client: 200 resp_start_line = httputil.parse_response_start_line(start_line_str) 201 self._response_start_line = resp_start_line 202 start_line = ( 203 resp_start_line 204 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine] 205 # TODO: this will need to change to support client-side keepalive 206 self._disconnect_on_finish = False 207 else: 208 req_start_line = httputil.parse_request_start_line(start_line_str) 209 self._request_start_line = req_start_line 210 self._request_headers = headers 211 start_line = req_start_line 212 self._disconnect_on_finish = not self._can_keep_alive( 213 req_start_line, headers 214 ) 215 need_delegate_close = True 216 with _ExceptionLoggingContext(app_log): 217 header_recv_future = delegate.headers_received(start_line, headers) 218 if header_recv_future is not None: 219 await header_recv_future 220 if self.stream is None: 221 # We've been detached. 222 need_delegate_close = False 223 return False 224 skip_body = False 225 if self.is_client: 226 assert isinstance(start_line, httputil.ResponseStartLine) 227 if ( 228 self._request_start_line is not None 229 and self._request_start_line.method == "HEAD" 230 ): 231 skip_body = True 232 code = start_line.code 233 if code == 304: 234 # 304 responses may include the content-length header 235 # but do not actually have a body. 236 # http://tools.ietf.org/html/rfc7230#section-3.3 237 skip_body = True 238 if 100 <= code < 200: 239 # 1xx responses should never indicate the presence of 240 # a body. 241 if "Content-Length" in headers or "Transfer-Encoding" in headers: 242 raise httputil.HTTPInputError( 243 "Response code %d cannot have body" % code 244 ) 245 # TODO: client delegates will get headers_received twice 246 # in the case of a 100-continue. Document or change? 247 await self._read_message(delegate) 248 else: 249 if headers.get("Expect") == "100-continue" and not self._write_finished: 250 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") 251 if not skip_body: 252 body_future = self._read_body( 253 resp_start_line.code if self.is_client else 0, headers, delegate 254 ) 255 if body_future is not None: 256 if self._body_timeout is None: 257 await body_future 258 else: 259 try: 260 await gen.with_timeout( 261 self.stream.io_loop.time() + self._body_timeout, 262 body_future, 263 quiet_exceptions=iostream.StreamClosedError, 264 ) 265 except gen.TimeoutError: 266 gen_log.info("Timeout reading body from %s", self.context) 267 self.stream.close() 268 return False 269 self._read_finished = True 270 if not self._write_finished or self.is_client: 271 need_delegate_close = False 272 with _ExceptionLoggingContext(app_log): 273 delegate.finish() 274 # If we're waiting for the application to produce an asynchronous 275 # response, and we're not detached, register a close callback 276 # on the stream (we didn't need one while we were reading) 277 if ( 278 not self._finish_future.done() 279 and self.stream is not None 280 and not self.stream.closed() 281 ): 282 self.stream.set_close_callback(self._on_connection_close) 283 await self._finish_future 284 if self.is_client and self._disconnect_on_finish: 285 self.close() 286 if self.stream is None: 287 return False 288 except httputil.HTTPInputError as e: 289 gen_log.info("Malformed HTTP message from %s: %s", self.context, e) 290 if not self.is_client: 291 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") 292 self.close() 293 return False 294 finally: 295 if need_delegate_close: 296 with _ExceptionLoggingContext(app_log): 297 delegate.on_connection_close() 298 header_future = None # type: ignore 299 self._clear_callbacks() 300 return True 301 302 def _clear_callbacks(self) -> None: 303 """Clears the callback attributes. 304 305 This allows the request handler to be garbage collected more 306 quickly in CPython by breaking up reference cycles. 307 """ 308 self._write_callback = None 309 self._write_future = None # type: Optional[Future[None]] 310 self._close_callback = None # type: Optional[Callable[[], None]] 311 if self.stream is not None: 312 self.stream.set_close_callback(None) 313 314 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 315 """Sets a callback that will be run when the connection is closed. 316 317 Note that this callback is slightly different from 318 `.HTTPMessageDelegate.on_connection_close`: The 319 `.HTTPMessageDelegate` method is called when the connection is 320 closed while receiving a message. This callback is used when 321 there is not an active delegate (for example, on the server 322 side this callback is used if the client closes the connection 323 after sending its request but before receiving all the 324 response. 325 """ 326 self._close_callback = callback 327 328 def _on_connection_close(self) -> None: 329 # Note that this callback is only registered on the IOStream 330 # when we have finished reading the request and are waiting for 331 # the application to produce its response. 332 if self._close_callback is not None: 333 callback = self._close_callback 334 self._close_callback = None 335 callback() 336 if not self._finish_future.done(): 337 future_set_result_unless_cancelled(self._finish_future, None) 338 self._clear_callbacks() 339 340 def close(self) -> None: 341 if self.stream is not None: 342 self.stream.close() 343 self._clear_callbacks() 344 if not self._finish_future.done(): 345 future_set_result_unless_cancelled(self._finish_future, None) 346 347 def detach(self) -> iostream.IOStream: 348 """Take control of the underlying stream. 349 350 Returns the underlying `.IOStream` object and stops all further 351 HTTP processing. May only be called during 352 `.HTTPMessageDelegate.headers_received`. Intended for implementing 353 protocols like websockets that tunnel over an HTTP handshake. 354 """ 355 self._clear_callbacks() 356 stream = self.stream 357 self.stream = None # type: ignore 358 if not self._finish_future.done(): 359 future_set_result_unless_cancelled(self._finish_future, None) 360 return stream 361 362 def set_body_timeout(self, timeout: float) -> None: 363 """Sets the body timeout for a single request. 364 365 Overrides the value from `.HTTP1ConnectionParameters`. 366 """ 367 self._body_timeout = timeout 368 369 def set_max_body_size(self, max_body_size: int) -> None: 370 """Sets the body size limit for a single request. 371 372 Overrides the value from `.HTTP1ConnectionParameters`. 373 """ 374 self._max_body_size = max_body_size 375 376 def write_headers( 377 self, 378 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 379 headers: httputil.HTTPHeaders, 380 chunk: Optional[bytes] = None, 381 ) -> "Future[None]": 382 """Implements `.HTTPConnection.write_headers`.""" 383 lines = [] 384 if self.is_client: 385 assert isinstance(start_line, httputil.RequestStartLine) 386 self._request_start_line = start_line 387 lines.append(utf8("%s %s HTTP/1.1" % (start_line[0], start_line[1]))) 388 # Client requests with a non-empty body must have either a 389 # Content-Length or a Transfer-Encoding. 390 self._chunking_output = ( 391 start_line.method in ("POST", "PUT", "PATCH") 392 and "Content-Length" not in headers 393 and ( 394 "Transfer-Encoding" not in headers 395 or headers["Transfer-Encoding"] == "chunked" 396 ) 397 ) 398 else: 399 assert isinstance(start_line, httputil.ResponseStartLine) 400 assert self._request_start_line is not None 401 assert self._request_headers is not None 402 self._response_start_line = start_line 403 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2]))) 404 self._chunking_output = ( 405 # TODO: should this use 406 # self._request_start_line.version or 407 # start_line.version? 408 self._request_start_line.version == "HTTP/1.1" 409 # Omit payload header field for HEAD request. 410 and self._request_start_line.method != "HEAD" 411 # 1xx, 204 and 304 responses have no body (not even a zero-length 412 # body), and so should not have either Content-Length or 413 # Transfer-Encoding headers. 414 and start_line.code not in (204, 304) 415 and (start_line.code < 100 or start_line.code >= 200) 416 # No need to chunk the output if a Content-Length is specified. 417 and "Content-Length" not in headers 418 # Applications are discouraged from touching Transfer-Encoding, 419 # but if they do, leave it alone. 420 and "Transfer-Encoding" not in headers 421 ) 422 # If connection to a 1.1 client will be closed, inform client 423 if ( 424 self._request_start_line.version == "HTTP/1.1" 425 and self._disconnect_on_finish 426 ): 427 headers["Connection"] = "close" 428 # If a 1.0 client asked for keep-alive, add the header. 429 if ( 430 self._request_start_line.version == "HTTP/1.0" 431 and self._request_headers.get("Connection", "").lower() == "keep-alive" 432 ): 433 headers["Connection"] = "Keep-Alive" 434 if self._chunking_output: 435 headers["Transfer-Encoding"] = "chunked" 436 if not self.is_client and ( 437 self._request_start_line.method == "HEAD" 438 or cast(httputil.ResponseStartLine, start_line).code == 304 439 ): 440 self._expected_content_remaining = 0 441 elif "Content-Length" in headers: 442 self._expected_content_remaining = int(headers["Content-Length"]) 443 else: 444 self._expected_content_remaining = None 445 # TODO: headers are supposed to be of type str, but we still have some 446 # cases that let bytes slip through. Remove these native_str calls when those 447 # are fixed. 448 header_lines = ( 449 native_str(n) + ": " + native_str(v) for n, v in headers.get_all() 450 ) 451 lines.extend(line.encode("latin1") for line in header_lines) 452 for line in lines: 453 if b"\n" in line: 454 raise ValueError("Newline in header: " + repr(line)) 455 future = None 456 if self.stream.closed(): 457 future = self._write_future = Future() 458 future.set_exception(iostream.StreamClosedError()) 459 future.exception() 460 else: 461 future = self._write_future = Future() 462 data = b"\r\n".join(lines) + b"\r\n\r\n" 463 if chunk: 464 data += self._format_chunk(chunk) 465 self._pending_write = self.stream.write(data) 466 future_add_done_callback(self._pending_write, self._on_write_complete) 467 return future 468 469 def _format_chunk(self, chunk: bytes) -> bytes: 470 if self._expected_content_remaining is not None: 471 self._expected_content_remaining -= len(chunk) 472 if self._expected_content_remaining < 0: 473 # Close the stream now to stop further framing errors. 474 self.stream.close() 475 raise httputil.HTTPOutputError( 476 "Tried to write more data than Content-Length" 477 ) 478 if self._chunking_output and chunk: 479 # Don't write out empty chunks because that means END-OF-STREAM 480 # with chunked encoding 481 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" 482 else: 483 return chunk 484 485 def write(self, chunk: bytes) -> "Future[None]": 486 """Implements `.HTTPConnection.write`. 487 488 For backwards compatibility it is allowed but deprecated to 489 skip `write_headers` and instead call `write()` with a 490 pre-encoded header block. 491 """ 492 future = None 493 if self.stream.closed(): 494 future = self._write_future = Future() 495 self._write_future.set_exception(iostream.StreamClosedError()) 496 self._write_future.exception() 497 else: 498 future = self._write_future = Future() 499 self._pending_write = self.stream.write(self._format_chunk(chunk)) 500 future_add_done_callback(self._pending_write, self._on_write_complete) 501 return future 502 503 def finish(self) -> None: 504 """Implements `.HTTPConnection.finish`.""" 505 if ( 506 self._expected_content_remaining is not None 507 and self._expected_content_remaining != 0 508 and not self.stream.closed() 509 ): 510 self.stream.close() 511 raise httputil.HTTPOutputError( 512 "Tried to write %d bytes less than Content-Length" 513 % self._expected_content_remaining 514 ) 515 if self._chunking_output: 516 if not self.stream.closed(): 517 self._pending_write = self.stream.write(b"0\r\n\r\n") 518 self._pending_write.add_done_callback(self._on_write_complete) 519 self._write_finished = True 520 # If the app finished the request while we're still reading, 521 # divert any remaining data away from the delegate and 522 # close the connection when we're done sending our response. 523 # Closing the connection is the only way to avoid reading the 524 # whole input body. 525 if not self._read_finished: 526 self._disconnect_on_finish = True 527 # No more data is coming, so instruct TCP to send any remaining 528 # data immediately instead of waiting for a full packet or ack. 529 self.stream.set_nodelay(True) 530 if self._pending_write is None: 531 self._finish_request(None) 532 else: 533 future_add_done_callback(self._pending_write, self._finish_request) 534 535 def _on_write_complete(self, future: "Future[None]") -> None: 536 exc = future.exception() 537 if exc is not None and not isinstance(exc, iostream.StreamClosedError): 538 future.result() 539 if self._write_callback is not None: 540 callback = self._write_callback 541 self._write_callback = None 542 self.stream.io_loop.add_callback(callback) 543 if self._write_future is not None: 544 future = self._write_future 545 self._write_future = None 546 future_set_result_unless_cancelled(future, None) 547 548 def _can_keep_alive( 549 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders 550 ) -> bool: 551 if self.params.no_keep_alive: 552 return False 553 connection_header = headers.get("Connection") 554 if connection_header is not None: 555 connection_header = connection_header.lower() 556 if start_line.version == "HTTP/1.1": 557 return connection_header != "close" 558 elif ( 559 "Content-Length" in headers 560 or headers.get("Transfer-Encoding", "").lower() == "chunked" 561 or getattr(start_line, "method", None) in ("HEAD", "GET") 562 ): 563 # start_line may be a request or response start line; only 564 # the former has a method attribute. 565 return connection_header == "keep-alive" 566 return False 567 568 def _finish_request(self, future: "Optional[Future[None]]") -> None: 569 self._clear_callbacks() 570 if not self.is_client and self._disconnect_on_finish: 571 self.close() 572 return 573 # Turn Nagle's algorithm back on, leaving the stream in its 574 # default state for the next request. 575 self.stream.set_nodelay(False) 576 if not self._finish_future.done(): 577 future_set_result_unless_cancelled(self._finish_future, None) 578 579 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: 580 # The lstrip removes newlines that some implementations sometimes 581 # insert between messages of a reused connection. Per RFC 7230, 582 # we SHOULD ignore at least one empty line before the request. 583 # http://tools.ietf.org/html/rfc7230#section-3.5 584 data_str = native_str(data.decode("latin1")).lstrip("\r\n") 585 # RFC 7230 section allows for both CRLF and bare LF. 586 eol = data_str.find("\n") 587 start_line = data_str[:eol].rstrip("\r") 588 headers = httputil.HTTPHeaders.parse(data_str[eol:]) 589 return start_line, headers 590 591 def _read_body( 592 self, 593 code: int, 594 headers: httputil.HTTPHeaders, 595 delegate: httputil.HTTPMessageDelegate, 596 ) -> Optional[Awaitable[None]]: 597 if "Content-Length" in headers: 598 if "Transfer-Encoding" in headers: 599 # Response cannot contain both Content-Length and 600 # Transfer-Encoding headers. 601 # http://tools.ietf.org/html/rfc7230#section-3.3.3 602 raise httputil.HTTPInputError( 603 "Response with both Transfer-Encoding and Content-Length" 604 ) 605 if "," in headers["Content-Length"]: 606 # Proxies sometimes cause Content-Length headers to get 607 # duplicated. If all the values are identical then we can 608 # use them but if they differ it's an error. 609 pieces = re.split(r",\s*", headers["Content-Length"]) 610 if any(i != pieces[0] for i in pieces): 611 raise httputil.HTTPInputError( 612 "Multiple unequal Content-Lengths: %r" 613 % headers["Content-Length"] 614 ) 615 headers["Content-Length"] = pieces[0] 616 617 try: 618 content_length = int(headers["Content-Length"]) # type: Optional[int] 619 except ValueError: 620 # Handles non-integer Content-Length value. 621 raise httputil.HTTPInputError( 622 "Only integer Content-Length is allowed: %s" 623 % headers["Content-Length"] 624 ) 625 626 if cast(int, content_length) > self._max_body_size: 627 raise httputil.HTTPInputError("Content-Length too long") 628 else: 629 content_length = None 630 631 if code == 204: 632 # This response code is not allowed to have a non-empty body, 633 # and has an implicit length of zero instead of read-until-close. 634 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 635 if "Transfer-Encoding" in headers or content_length not in (None, 0): 636 raise httputil.HTTPInputError( 637 "Response with code %d should not have body" % code 638 ) 639 content_length = 0 640 641 if content_length is not None: 642 return self._read_fixed_body(content_length, delegate) 643 if headers.get("Transfer-Encoding", "").lower() == "chunked": 644 return self._read_chunked_body(delegate) 645 if self.is_client: 646 return self._read_body_until_close(delegate) 647 return None 648 649 async def _read_fixed_body( 650 self, content_length: int, delegate: httputil.HTTPMessageDelegate 651 ) -> None: 652 while content_length > 0: 653 body = await self.stream.read_bytes( 654 min(self.params.chunk_size, content_length), partial=True 655 ) 656 content_length -= len(body) 657 if not self._write_finished or self.is_client: 658 with _ExceptionLoggingContext(app_log): 659 ret = delegate.data_received(body) 660 if ret is not None: 661 await ret 662 663 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None: 664 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 665 total_size = 0 666 while True: 667 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64) 668 chunk_len = int(chunk_len_str.strip(), 16) 669 if chunk_len == 0: 670 crlf = await self.stream.read_bytes(2) 671 if crlf != b"\r\n": 672 raise httputil.HTTPInputError( 673 "improperly terminated chunked request" 674 ) 675 return 676 total_size += chunk_len 677 if total_size > self._max_body_size: 678 raise httputil.HTTPInputError("chunked body too large") 679 bytes_to_read = chunk_len 680 while bytes_to_read: 681 chunk = await self.stream.read_bytes( 682 min(bytes_to_read, self.params.chunk_size), partial=True 683 ) 684 bytes_to_read -= len(chunk) 685 if not self._write_finished or self.is_client: 686 with _ExceptionLoggingContext(app_log): 687 ret = delegate.data_received(chunk) 688 if ret is not None: 689 await ret 690 # chunk ends with \r\n 691 crlf = await self.stream.read_bytes(2) 692 assert crlf == b"\r\n" 693 694 async def _read_body_until_close( 695 self, delegate: httputil.HTTPMessageDelegate 696 ) -> None: 697 body = await self.stream.read_until_close() 698 if not self._write_finished or self.is_client: 699 with _ExceptionLoggingContext(app_log): 700 ret = delegate.data_received(body) 701 if ret is not None: 702 await ret 703 704 705class _GzipMessageDelegate(httputil.HTTPMessageDelegate): 706 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``. 707 """ 708 709 def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None: 710 self._delegate = delegate 711 self._chunk_size = chunk_size 712 self._decompressor = None # type: Optional[GzipDecompressor] 713 714 def headers_received( 715 self, 716 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 717 headers: httputil.HTTPHeaders, 718 ) -> Optional[Awaitable[None]]: 719 if headers.get("Content-Encoding") == "gzip": 720 self._decompressor = GzipDecompressor() 721 # Downstream delegates will only see uncompressed data, 722 # so rename the content-encoding header. 723 # (but note that curl_httpclient doesn't do this). 724 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"]) 725 del headers["Content-Encoding"] 726 return self._delegate.headers_received(start_line, headers) 727 728 async def data_received(self, chunk: bytes) -> None: 729 if self._decompressor: 730 compressed_data = chunk 731 while compressed_data: 732 decompressed = self._decompressor.decompress( 733 compressed_data, self._chunk_size 734 ) 735 if decompressed: 736 ret = self._delegate.data_received(decompressed) 737 if ret is not None: 738 await ret 739 compressed_data = self._decompressor.unconsumed_tail 740 if compressed_data and not decompressed: 741 raise httputil.HTTPInputError( 742 "encountered unconsumed gzip data without making progress" 743 ) 744 else: 745 ret = self._delegate.data_received(chunk) 746 if ret is not None: 747 await ret 748 749 def finish(self) -> None: 750 if self._decompressor is not None: 751 tail = self._decompressor.flush() 752 if tail: 753 # The tail should always be empty: decompress returned 754 # all that it can in data_received and the only 755 # purpose of the flush call is to detect errors such 756 # as truncated input. If we did legitimately get a new 757 # chunk at this point we'd need to change the 758 # interface to make finish() a coroutine. 759 raise ValueError( 760 "decompressor.flush returned data; possible truncated input" 761 ) 762 return self._delegate.finish() 763 764 def on_connection_close(self) -> None: 765 return self._delegate.on_connection_close() 766 767 768class HTTP1ServerConnection(object): 769 """An HTTP/1.x server.""" 770 771 def __init__( 772 self, 773 stream: iostream.IOStream, 774 params: Optional[HTTP1ConnectionParameters] = None, 775 context: Optional[object] = None, 776 ) -> None: 777 """ 778 :arg stream: an `.IOStream` 779 :arg params: a `.HTTP1ConnectionParameters` or None 780 :arg context: an opaque application-defined object that is accessible 781 as ``connection.context`` 782 """ 783 self.stream = stream 784 if params is None: 785 params = HTTP1ConnectionParameters() 786 self.params = params 787 self.context = context 788 self._serving_future = None # type: Optional[Future[None]] 789 790 async def close(self) -> None: 791 """Closes the connection. 792 793 Returns a `.Future` that resolves after the serving loop has exited. 794 """ 795 self.stream.close() 796 # Block until the serving loop is done, but ignore any exceptions 797 # (start_serving is already responsible for logging them). 798 assert self._serving_future is not None 799 try: 800 await self._serving_future 801 except Exception: 802 pass 803 804 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None: 805 """Starts serving requests on this connection. 806 807 :arg delegate: a `.HTTPServerConnectionDelegate` 808 """ 809 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 810 fut = gen.convert_yielded(self._server_request_loop(delegate)) 811 self._serving_future = fut 812 # Register the future on the IOLoop so its errors get logged. 813 self.stream.io_loop.add_future(fut, lambda f: f.result()) 814 815 async def _server_request_loop( 816 self, delegate: httputil.HTTPServerConnectionDelegate 817 ) -> None: 818 try: 819 while True: 820 conn = HTTP1Connection(self.stream, False, self.params, self.context) 821 request_delegate = delegate.start_request(self, conn) 822 try: 823 ret = await conn.read_response(request_delegate) 824 except ( 825 iostream.StreamClosedError, 826 iostream.UnsatisfiableReadError, 827 asyncio.CancelledError, 828 ): 829 return 830 except _QuietException: 831 # This exception was already logged. 832 conn.close() 833 return 834 except Exception: 835 gen_log.error("Uncaught exception", exc_info=True) 836 conn.close() 837 return 838 if not ret: 839 return 840 await asyncio.sleep(0) 841 finally: 842 delegate.on_close(self) 843