1#
2# Copyright 2014 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Client and server implementations of HTTP/1.x.
17
18.. versionadded:: 4.0
19"""
20
21import asyncio
22import logging
23import re
24import types
25
26from tornado.concurrent import (
27    Future,
28    future_add_done_callback,
29    future_set_result_unless_cancelled,
30)
31from tornado.escape import native_str, utf8
32from tornado import gen
33from tornado import httputil
34from tornado import iostream
35from tornado.log import gen_log, app_log
36from tornado.util import GzipDecompressor
37
38
39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
40
41
42class _QuietException(Exception):
43    def __init__(self) -> None:
44        pass
45
46
47class _ExceptionLoggingContext(object):
48    """Used with the ``with`` statement when calling delegate methods to
49    log any exceptions with the given logger.  Any exceptions caught are
50    converted to _QuietException
51    """
52
53    def __init__(self, logger: logging.Logger) -> None:
54        self.logger = logger
55
56    def __enter__(self) -> None:
57        pass
58
59    def __exit__(
60        self,
61        typ: "Optional[Type[BaseException]]",
62        value: Optional[BaseException],
63        tb: types.TracebackType,
64    ) -> None:
65        if value is not None:
66            assert typ is not None
67            self.logger.error("Uncaught exception", exc_info=(typ, value, tb))
68            raise _QuietException
69
70
71class HTTP1ConnectionParameters(object):
72    """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.
73    """
74
75    def __init__(
76        self,
77        no_keep_alive: bool = False,
78        chunk_size: Optional[int] = None,
79        max_header_size: Optional[int] = None,
80        header_timeout: Optional[float] = None,
81        max_body_size: Optional[int] = None,
82        body_timeout: Optional[float] = None,
83        decompress: bool = False,
84    ) -> None:
85        """
86        :arg bool no_keep_alive: If true, always close the connection after
87            one request.
88        :arg int chunk_size: how much data to read into memory at once
89        :arg int max_header_size:  maximum amount of data for HTTP headers
90        :arg float header_timeout: how long to wait for all headers (seconds)
91        :arg int max_body_size: maximum amount of data for body
92        :arg float body_timeout: how long to wait while reading body (seconds)
93        :arg bool decompress: if true, decode incoming
94            ``Content-Encoding: gzip``
95        """
96        self.no_keep_alive = no_keep_alive
97        self.chunk_size = chunk_size or 65536
98        self.max_header_size = max_header_size or 65536
99        self.header_timeout = header_timeout
100        self.max_body_size = max_body_size
101        self.body_timeout = body_timeout
102        self.decompress = decompress
103
104
105class HTTP1Connection(httputil.HTTPConnection):
106    """Implements the HTTP/1.x protocol.
107
108    This class can be on its own for clients, or via `HTTP1ServerConnection`
109    for servers.
110    """
111
112    def __init__(
113        self,
114        stream: iostream.IOStream,
115        is_client: bool,
116        params: Optional[HTTP1ConnectionParameters] = None,
117        context: Optional[object] = None,
118    ) -> None:
119        """
120        :arg stream: an `.IOStream`
121        :arg bool is_client: client or server
122        :arg params: a `.HTTP1ConnectionParameters` instance or ``None``
123        :arg context: an opaque application-defined object that can be accessed
124            as ``connection.context``.
125        """
126        self.is_client = is_client
127        self.stream = stream
128        if params is None:
129            params = HTTP1ConnectionParameters()
130        self.params = params
131        self.context = context
132        self.no_keep_alive = params.no_keep_alive
133        # The body limits can be altered by the delegate, so save them
134        # here instead of just referencing self.params later.
135        self._max_body_size = self.params.max_body_size or self.stream.max_buffer_size
136        self._body_timeout = self.params.body_timeout
137        # _write_finished is set to True when finish() has been called,
138        # i.e. there will be no more data sent.  Data may still be in the
139        # stream's write buffer.
140        self._write_finished = False
141        # True when we have read the entire incoming body.
142        self._read_finished = False
143        # _finish_future resolves when all data has been written and flushed
144        # to the IOStream.
145        self._finish_future = Future()  # type: Future[None]
146        # If true, the connection should be closed after this request
147        # (after the response has been written in the server side,
148        # and after it has been read in the client)
149        self._disconnect_on_finish = False
150        self._clear_callbacks()
151        # Save the start lines after we read or write them; they
152        # affect later processing (e.g. 304 responses and HEAD methods
153        # have content-length but no bodies)
154        self._request_start_line = None  # type: Optional[httputil.RequestStartLine]
155        self._response_start_line = None  # type: Optional[httputil.ResponseStartLine]
156        self._request_headers = None  # type: Optional[httputil.HTTPHeaders]
157        # True if we are writing output with chunked encoding.
158        self._chunking_output = False
159        # While reading a body with a content-length, this is the
160        # amount left to read.
161        self._expected_content_remaining = None  # type: Optional[int]
162        # A Future for our outgoing writes, returned by IOStream.write.
163        self._pending_write = None  # type: Optional[Future[None]]
164
165    def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]:
166        """Read a single HTTP response.
167
168        Typical client-mode usage is to write a request using `write_headers`,
169        `write`, and `finish`, and then call ``read_response``.
170
171        :arg delegate: a `.HTTPMessageDelegate`
172
173        Returns a `.Future` that resolves to a bool after the full response has
174        been read. The result is true if the stream is still open.
175        """
176        if self.params.decompress:
177            delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
178        return self._read_message(delegate)
179
180    async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool:
181        need_delegate_close = False
182        try:
183            header_future = self.stream.read_until_regex(
184                b"\r?\n\r?\n", max_bytes=self.params.max_header_size
185            )
186            if self.params.header_timeout is None:
187                header_data = await header_future
188            else:
189                try:
190                    header_data = await gen.with_timeout(
191                        self.stream.io_loop.time() + self.params.header_timeout,
192                        header_future,
193                        quiet_exceptions=iostream.StreamClosedError,
194                    )
195                except gen.TimeoutError:
196                    self.close()
197                    return False
198            start_line_str, headers = self._parse_headers(header_data)
199            if self.is_client:
200                resp_start_line = httputil.parse_response_start_line(start_line_str)
201                self._response_start_line = resp_start_line
202                start_line = (
203                    resp_start_line
204                )  # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine]
205                # TODO: this will need to change to support client-side keepalive
206                self._disconnect_on_finish = False
207            else:
208                req_start_line = httputil.parse_request_start_line(start_line_str)
209                self._request_start_line = req_start_line
210                self._request_headers = headers
211                start_line = req_start_line
212                self._disconnect_on_finish = not self._can_keep_alive(
213                    req_start_line, headers
214                )
215            need_delegate_close = True
216            with _ExceptionLoggingContext(app_log):
217                header_recv_future = delegate.headers_received(start_line, headers)
218                if header_recv_future is not None:
219                    await header_recv_future
220            if self.stream is None:
221                # We've been detached.
222                need_delegate_close = False
223                return False
224            skip_body = False
225            if self.is_client:
226                assert isinstance(start_line, httputil.ResponseStartLine)
227                if (
228                    self._request_start_line is not None
229                    and self._request_start_line.method == "HEAD"
230                ):
231                    skip_body = True
232                code = start_line.code
233                if code == 304:
234                    # 304 responses may include the content-length header
235                    # but do not actually have a body.
236                    # http://tools.ietf.org/html/rfc7230#section-3.3
237                    skip_body = True
238                if 100 <= code < 200:
239                    # 1xx responses should never indicate the presence of
240                    # a body.
241                    if "Content-Length" in headers or "Transfer-Encoding" in headers:
242                        raise httputil.HTTPInputError(
243                            "Response code %d cannot have body" % code
244                        )
245                    # TODO: client delegates will get headers_received twice
246                    # in the case of a 100-continue.  Document or change?
247                    await self._read_message(delegate)
248            else:
249                if headers.get("Expect") == "100-continue" and not self._write_finished:
250                    self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
251            if not skip_body:
252                body_future = self._read_body(
253                    resp_start_line.code if self.is_client else 0, headers, delegate
254                )
255                if body_future is not None:
256                    if self._body_timeout is None:
257                        await body_future
258                    else:
259                        try:
260                            await gen.with_timeout(
261                                self.stream.io_loop.time() + self._body_timeout,
262                                body_future,
263                                quiet_exceptions=iostream.StreamClosedError,
264                            )
265                        except gen.TimeoutError:
266                            gen_log.info("Timeout reading body from %s", self.context)
267                            self.stream.close()
268                            return False
269            self._read_finished = True
270            if not self._write_finished or self.is_client:
271                need_delegate_close = False
272                with _ExceptionLoggingContext(app_log):
273                    delegate.finish()
274            # If we're waiting for the application to produce an asynchronous
275            # response, and we're not detached, register a close callback
276            # on the stream (we didn't need one while we were reading)
277            if (
278                not self._finish_future.done()
279                and self.stream is not None
280                and not self.stream.closed()
281            ):
282                self.stream.set_close_callback(self._on_connection_close)
283                await self._finish_future
284            if self.is_client and self._disconnect_on_finish:
285                self.close()
286            if self.stream is None:
287                return False
288        except httputil.HTTPInputError as e:
289            gen_log.info("Malformed HTTP message from %s: %s", self.context, e)
290            if not self.is_client:
291                await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
292            self.close()
293            return False
294        finally:
295            if need_delegate_close:
296                with _ExceptionLoggingContext(app_log):
297                    delegate.on_connection_close()
298            header_future = None  # type: ignore
299            self._clear_callbacks()
300        return True
301
302    def _clear_callbacks(self) -> None:
303        """Clears the callback attributes.
304
305        This allows the request handler to be garbage collected more
306        quickly in CPython by breaking up reference cycles.
307        """
308        self._write_callback = None
309        self._write_future = None  # type: Optional[Future[None]]
310        self._close_callback = None  # type: Optional[Callable[[], None]]
311        if self.stream is not None:
312            self.stream.set_close_callback(None)
313
314    def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
315        """Sets a callback that will be run when the connection is closed.
316
317        Note that this callback is slightly different from
318        `.HTTPMessageDelegate.on_connection_close`: The
319        `.HTTPMessageDelegate` method is called when the connection is
320        closed while receiving a message. This callback is used when
321        there is not an active delegate (for example, on the server
322        side this callback is used if the client closes the connection
323        after sending its request but before receiving all the
324        response.
325        """
326        self._close_callback = callback
327
328    def _on_connection_close(self) -> None:
329        # Note that this callback is only registered on the IOStream
330        # when we have finished reading the request and are waiting for
331        # the application to produce its response.
332        if self._close_callback is not None:
333            callback = self._close_callback
334            self._close_callback = None
335            callback()
336        if not self._finish_future.done():
337            future_set_result_unless_cancelled(self._finish_future, None)
338        self._clear_callbacks()
339
340    def close(self) -> None:
341        if self.stream is not None:
342            self.stream.close()
343        self._clear_callbacks()
344        if not self._finish_future.done():
345            future_set_result_unless_cancelled(self._finish_future, None)
346
347    def detach(self) -> iostream.IOStream:
348        """Take control of the underlying stream.
349
350        Returns the underlying `.IOStream` object and stops all further
351        HTTP processing.  May only be called during
352        `.HTTPMessageDelegate.headers_received`.  Intended for implementing
353        protocols like websockets that tunnel over an HTTP handshake.
354        """
355        self._clear_callbacks()
356        stream = self.stream
357        self.stream = None  # type: ignore
358        if not self._finish_future.done():
359            future_set_result_unless_cancelled(self._finish_future, None)
360        return stream
361
362    def set_body_timeout(self, timeout: float) -> None:
363        """Sets the body timeout for a single request.
364
365        Overrides the value from `.HTTP1ConnectionParameters`.
366        """
367        self._body_timeout = timeout
368
369    def set_max_body_size(self, max_body_size: int) -> None:
370        """Sets the body size limit for a single request.
371
372        Overrides the value from `.HTTP1ConnectionParameters`.
373        """
374        self._max_body_size = max_body_size
375
376    def write_headers(
377        self,
378        start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
379        headers: httputil.HTTPHeaders,
380        chunk: Optional[bytes] = None,
381    ) -> "Future[None]":
382        """Implements `.HTTPConnection.write_headers`."""
383        lines = []
384        if self.is_client:
385            assert isinstance(start_line, httputil.RequestStartLine)
386            self._request_start_line = start_line
387            lines.append(utf8("%s %s HTTP/1.1" % (start_line[0], start_line[1])))
388            # Client requests with a non-empty body must have either a
389            # Content-Length or a Transfer-Encoding.
390            self._chunking_output = (
391                start_line.method in ("POST", "PUT", "PATCH")
392                and "Content-Length" not in headers
393                and (
394                    "Transfer-Encoding" not in headers
395                    or headers["Transfer-Encoding"] == "chunked"
396                )
397            )
398        else:
399            assert isinstance(start_line, httputil.ResponseStartLine)
400            assert self._request_start_line is not None
401            assert self._request_headers is not None
402            self._response_start_line = start_line
403            lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2])))
404            self._chunking_output = (
405                # TODO: should this use
406                # self._request_start_line.version or
407                # start_line.version?
408                self._request_start_line.version == "HTTP/1.1"
409                # Omit payload header field for HEAD request.
410                and self._request_start_line.method != "HEAD"
411                # 1xx, 204 and 304 responses have no body (not even a zero-length
412                # body), and so should not have either Content-Length or
413                # Transfer-Encoding headers.
414                and start_line.code not in (204, 304)
415                and (start_line.code < 100 or start_line.code >= 200)
416                # No need to chunk the output if a Content-Length is specified.
417                and "Content-Length" not in headers
418                # Applications are discouraged from touching Transfer-Encoding,
419                # but if they do, leave it alone.
420                and "Transfer-Encoding" not in headers
421            )
422            # If connection to a 1.1 client will be closed, inform client
423            if (
424                self._request_start_line.version == "HTTP/1.1"
425                and self._disconnect_on_finish
426            ):
427                headers["Connection"] = "close"
428            # If a 1.0 client asked for keep-alive, add the header.
429            if (
430                self._request_start_line.version == "HTTP/1.0"
431                and self._request_headers.get("Connection", "").lower() == "keep-alive"
432            ):
433                headers["Connection"] = "Keep-Alive"
434        if self._chunking_output:
435            headers["Transfer-Encoding"] = "chunked"
436        if not self.is_client and (
437            self._request_start_line.method == "HEAD"
438            or cast(httputil.ResponseStartLine, start_line).code == 304
439        ):
440            self._expected_content_remaining = 0
441        elif "Content-Length" in headers:
442            self._expected_content_remaining = int(headers["Content-Length"])
443        else:
444            self._expected_content_remaining = None
445        # TODO: headers are supposed to be of type str, but we still have some
446        # cases that let bytes slip through. Remove these native_str calls when those
447        # are fixed.
448        header_lines = (
449            native_str(n) + ": " + native_str(v) for n, v in headers.get_all()
450        )
451        lines.extend(line.encode("latin1") for line in header_lines)
452        for line in lines:
453            if b"\n" in line:
454                raise ValueError("Newline in header: " + repr(line))
455        future = None
456        if self.stream.closed():
457            future = self._write_future = Future()
458            future.set_exception(iostream.StreamClosedError())
459            future.exception()
460        else:
461            future = self._write_future = Future()
462            data = b"\r\n".join(lines) + b"\r\n\r\n"
463            if chunk:
464                data += self._format_chunk(chunk)
465            self._pending_write = self.stream.write(data)
466            future_add_done_callback(self._pending_write, self._on_write_complete)
467        return future
468
469    def _format_chunk(self, chunk: bytes) -> bytes:
470        if self._expected_content_remaining is not None:
471            self._expected_content_remaining -= len(chunk)
472            if self._expected_content_remaining < 0:
473                # Close the stream now to stop further framing errors.
474                self.stream.close()
475                raise httputil.HTTPOutputError(
476                    "Tried to write more data than Content-Length"
477                )
478        if self._chunking_output and chunk:
479            # Don't write out empty chunks because that means END-OF-STREAM
480            # with chunked encoding
481            return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
482        else:
483            return chunk
484
485    def write(self, chunk: bytes) -> "Future[None]":
486        """Implements `.HTTPConnection.write`.
487
488        For backwards compatibility it is allowed but deprecated to
489        skip `write_headers` and instead call `write()` with a
490        pre-encoded header block.
491        """
492        future = None
493        if self.stream.closed():
494            future = self._write_future = Future()
495            self._write_future.set_exception(iostream.StreamClosedError())
496            self._write_future.exception()
497        else:
498            future = self._write_future = Future()
499            self._pending_write = self.stream.write(self._format_chunk(chunk))
500            future_add_done_callback(self._pending_write, self._on_write_complete)
501        return future
502
503    def finish(self) -> None:
504        """Implements `.HTTPConnection.finish`."""
505        if (
506            self._expected_content_remaining is not None
507            and self._expected_content_remaining != 0
508            and not self.stream.closed()
509        ):
510            self.stream.close()
511            raise httputil.HTTPOutputError(
512                "Tried to write %d bytes less than Content-Length"
513                % self._expected_content_remaining
514            )
515        if self._chunking_output:
516            if not self.stream.closed():
517                self._pending_write = self.stream.write(b"0\r\n\r\n")
518                self._pending_write.add_done_callback(self._on_write_complete)
519        self._write_finished = True
520        # If the app finished the request while we're still reading,
521        # divert any remaining data away from the delegate and
522        # close the connection when we're done sending our response.
523        # Closing the connection is the only way to avoid reading the
524        # whole input body.
525        if not self._read_finished:
526            self._disconnect_on_finish = True
527        # No more data is coming, so instruct TCP to send any remaining
528        # data immediately instead of waiting for a full packet or ack.
529        self.stream.set_nodelay(True)
530        if self._pending_write is None:
531            self._finish_request(None)
532        else:
533            future_add_done_callback(self._pending_write, self._finish_request)
534
535    def _on_write_complete(self, future: "Future[None]") -> None:
536        exc = future.exception()
537        if exc is not None and not isinstance(exc, iostream.StreamClosedError):
538            future.result()
539        if self._write_callback is not None:
540            callback = self._write_callback
541            self._write_callback = None
542            self.stream.io_loop.add_callback(callback)
543        if self._write_future is not None:
544            future = self._write_future
545            self._write_future = None
546            future_set_result_unless_cancelled(future, None)
547
548    def _can_keep_alive(
549        self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders
550    ) -> bool:
551        if self.params.no_keep_alive:
552            return False
553        connection_header = headers.get("Connection")
554        if connection_header is not None:
555            connection_header = connection_header.lower()
556        if start_line.version == "HTTP/1.1":
557            return connection_header != "close"
558        elif (
559            "Content-Length" in headers
560            or headers.get("Transfer-Encoding", "").lower() == "chunked"
561            or getattr(start_line, "method", None) in ("HEAD", "GET")
562        ):
563            # start_line may be a request or response start line; only
564            # the former has a method attribute.
565            return connection_header == "keep-alive"
566        return False
567
568    def _finish_request(self, future: "Optional[Future[None]]") -> None:
569        self._clear_callbacks()
570        if not self.is_client and self._disconnect_on_finish:
571            self.close()
572            return
573        # Turn Nagle's algorithm back on, leaving the stream in its
574        # default state for the next request.
575        self.stream.set_nodelay(False)
576        if not self._finish_future.done():
577            future_set_result_unless_cancelled(self._finish_future, None)
578
579    def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]:
580        # The lstrip removes newlines that some implementations sometimes
581        # insert between messages of a reused connection.  Per RFC 7230,
582        # we SHOULD ignore at least one empty line before the request.
583        # http://tools.ietf.org/html/rfc7230#section-3.5
584        data_str = native_str(data.decode("latin1")).lstrip("\r\n")
585        # RFC 7230 section allows for both CRLF and bare LF.
586        eol = data_str.find("\n")
587        start_line = data_str[:eol].rstrip("\r")
588        headers = httputil.HTTPHeaders.parse(data_str[eol:])
589        return start_line, headers
590
591    def _read_body(
592        self,
593        code: int,
594        headers: httputil.HTTPHeaders,
595        delegate: httputil.HTTPMessageDelegate,
596    ) -> Optional[Awaitable[None]]:
597        if "Content-Length" in headers:
598            if "Transfer-Encoding" in headers:
599                # Response cannot contain both Content-Length and
600                # Transfer-Encoding headers.
601                # http://tools.ietf.org/html/rfc7230#section-3.3.3
602                raise httputil.HTTPInputError(
603                    "Response with both Transfer-Encoding and Content-Length"
604                )
605            if "," in headers["Content-Length"]:
606                # Proxies sometimes cause Content-Length headers to get
607                # duplicated.  If all the values are identical then we can
608                # use them but if they differ it's an error.
609                pieces = re.split(r",\s*", headers["Content-Length"])
610                if any(i != pieces[0] for i in pieces):
611                    raise httputil.HTTPInputError(
612                        "Multiple unequal Content-Lengths: %r"
613                        % headers["Content-Length"]
614                    )
615                headers["Content-Length"] = pieces[0]
616
617            try:
618                content_length = int(headers["Content-Length"])  # type: Optional[int]
619            except ValueError:
620                # Handles non-integer Content-Length value.
621                raise httputil.HTTPInputError(
622                    "Only integer Content-Length is allowed: %s"
623                    % headers["Content-Length"]
624                )
625
626            if cast(int, content_length) > self._max_body_size:
627                raise httputil.HTTPInputError("Content-Length too long")
628        else:
629            content_length = None
630
631        if code == 204:
632            # This response code is not allowed to have a non-empty body,
633            # and has an implicit length of zero instead of read-until-close.
634            # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
635            if "Transfer-Encoding" in headers or content_length not in (None, 0):
636                raise httputil.HTTPInputError(
637                    "Response with code %d should not have body" % code
638                )
639            content_length = 0
640
641        if content_length is not None:
642            return self._read_fixed_body(content_length, delegate)
643        if headers.get("Transfer-Encoding", "").lower() == "chunked":
644            return self._read_chunked_body(delegate)
645        if self.is_client:
646            return self._read_body_until_close(delegate)
647        return None
648
649    async def _read_fixed_body(
650        self, content_length: int, delegate: httputil.HTTPMessageDelegate
651    ) -> None:
652        while content_length > 0:
653            body = await self.stream.read_bytes(
654                min(self.params.chunk_size, content_length), partial=True
655            )
656            content_length -= len(body)
657            if not self._write_finished or self.is_client:
658                with _ExceptionLoggingContext(app_log):
659                    ret = delegate.data_received(body)
660                    if ret is not None:
661                        await ret
662
663    async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None:
664        # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
665        total_size = 0
666        while True:
667            chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64)
668            chunk_len = int(chunk_len_str.strip(), 16)
669            if chunk_len == 0:
670                crlf = await self.stream.read_bytes(2)
671                if crlf != b"\r\n":
672                    raise httputil.HTTPInputError(
673                        "improperly terminated chunked request"
674                    )
675                return
676            total_size += chunk_len
677            if total_size > self._max_body_size:
678                raise httputil.HTTPInputError("chunked body too large")
679            bytes_to_read = chunk_len
680            while bytes_to_read:
681                chunk = await self.stream.read_bytes(
682                    min(bytes_to_read, self.params.chunk_size), partial=True
683                )
684                bytes_to_read -= len(chunk)
685                if not self._write_finished or self.is_client:
686                    with _ExceptionLoggingContext(app_log):
687                        ret = delegate.data_received(chunk)
688                        if ret is not None:
689                            await ret
690            # chunk ends with \r\n
691            crlf = await self.stream.read_bytes(2)
692            assert crlf == b"\r\n"
693
694    async def _read_body_until_close(
695        self, delegate: httputil.HTTPMessageDelegate
696    ) -> None:
697        body = await self.stream.read_until_close()
698        if not self._write_finished or self.is_client:
699            with _ExceptionLoggingContext(app_log):
700                ret = delegate.data_received(body)
701                if ret is not None:
702                    await ret
703
704
705class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
706    """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.
707    """
708
709    def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None:
710        self._delegate = delegate
711        self._chunk_size = chunk_size
712        self._decompressor = None  # type: Optional[GzipDecompressor]
713
714    def headers_received(
715        self,
716        start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
717        headers: httputil.HTTPHeaders,
718    ) -> Optional[Awaitable[None]]:
719        if headers.get("Content-Encoding") == "gzip":
720            self._decompressor = GzipDecompressor()
721            # Downstream delegates will only see uncompressed data,
722            # so rename the content-encoding header.
723            # (but note that curl_httpclient doesn't do this).
724            headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"])
725            del headers["Content-Encoding"]
726        return self._delegate.headers_received(start_line, headers)
727
728    async def data_received(self, chunk: bytes) -> None:
729        if self._decompressor:
730            compressed_data = chunk
731            while compressed_data:
732                decompressed = self._decompressor.decompress(
733                    compressed_data, self._chunk_size
734                )
735                if decompressed:
736                    ret = self._delegate.data_received(decompressed)
737                    if ret is not None:
738                        await ret
739                compressed_data = self._decompressor.unconsumed_tail
740                if compressed_data and not decompressed:
741                    raise httputil.HTTPInputError(
742                        "encountered unconsumed gzip data without making progress"
743                    )
744        else:
745            ret = self._delegate.data_received(chunk)
746            if ret is not None:
747                await ret
748
749    def finish(self) -> None:
750        if self._decompressor is not None:
751            tail = self._decompressor.flush()
752            if tail:
753                # The tail should always be empty: decompress returned
754                # all that it can in data_received and the only
755                # purpose of the flush call is to detect errors such
756                # as truncated input. If we did legitimately get a new
757                # chunk at this point we'd need to change the
758                # interface to make finish() a coroutine.
759                raise ValueError(
760                    "decompressor.flush returned data; possible truncated input"
761                )
762        return self._delegate.finish()
763
764    def on_connection_close(self) -> None:
765        return self._delegate.on_connection_close()
766
767
768class HTTP1ServerConnection(object):
769    """An HTTP/1.x server."""
770
771    def __init__(
772        self,
773        stream: iostream.IOStream,
774        params: Optional[HTTP1ConnectionParameters] = None,
775        context: Optional[object] = None,
776    ) -> None:
777        """
778        :arg stream: an `.IOStream`
779        :arg params: a `.HTTP1ConnectionParameters` or None
780        :arg context: an opaque application-defined object that is accessible
781            as ``connection.context``
782        """
783        self.stream = stream
784        if params is None:
785            params = HTTP1ConnectionParameters()
786        self.params = params
787        self.context = context
788        self._serving_future = None  # type: Optional[Future[None]]
789
790    async def close(self) -> None:
791        """Closes the connection.
792
793        Returns a `.Future` that resolves after the serving loop has exited.
794        """
795        self.stream.close()
796        # Block until the serving loop is done, but ignore any exceptions
797        # (start_serving is already responsible for logging them).
798        assert self._serving_future is not None
799        try:
800            await self._serving_future
801        except Exception:
802            pass
803
804    def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None:
805        """Starts serving requests on this connection.
806
807        :arg delegate: a `.HTTPServerConnectionDelegate`
808        """
809        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
810        fut = gen.convert_yielded(self._server_request_loop(delegate))
811        self._serving_future = fut
812        # Register the future on the IOLoop so its errors get logged.
813        self.stream.io_loop.add_future(fut, lambda f: f.result())
814
815    async def _server_request_loop(
816        self, delegate: httputil.HTTPServerConnectionDelegate
817    ) -> None:
818        try:
819            while True:
820                conn = HTTP1Connection(self.stream, False, self.params, self.context)
821                request_delegate = delegate.start_request(self, conn)
822                try:
823                    ret = await conn.read_response(request_delegate)
824                except (
825                    iostream.StreamClosedError,
826                    iostream.UnsatisfiableReadError,
827                    asyncio.CancelledError,
828                ):
829                    return
830                except _QuietException:
831                    # This exception was already logged.
832                    conn.close()
833                    return
834                except Exception:
835                    gen_log.error("Uncaught exception", exc_info=True)
836                    conn.close()
837                    return
838                if not ret:
839                    return
840                await asyncio.sleep(0)
841        finally:
842            delegate.on_close(self)
843