1import io
2import re
3import typing as t
4from functools import partial
5from functools import update_wrapper
6from itertools import chain
7
8from ._internal import _make_encode_wrapper
9from ._internal import _to_bytes
10from ._internal import _to_str
11from .sansio import utils as _sansio_utils
12from .sansio.utils import host_is_trusted  # noqa: F401 # Imported as part of API
13from .urls import _URLTuple
14from .urls import uri_to_iri
15from .urls import url_join
16from .urls import url_parse
17from .urls import url_quote
18
19if t.TYPE_CHECKING:
20    from _typeshed.wsgi import WSGIApplication
21    from _typeshed.wsgi import WSGIEnvironment
22
23
24def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication":
25    """Marks a function as responder.  Decorate a function with it and it
26    will automatically call the return value as WSGI application.
27
28    Example::
29
30        @responder
31        def application(environ, start_response):
32            return Response('Hello World!')
33    """
34    return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
35
36
37def get_current_url(
38    environ: "WSGIEnvironment",
39    root_only: bool = False,
40    strip_querystring: bool = False,
41    host_only: bool = False,
42    trusted_hosts: t.Optional[t.Iterable[str]] = None,
43) -> str:
44    """Recreate the URL for a request from the parts in a WSGI
45    environment.
46
47    The URL is an IRI, not a URI, so it may contain Unicode characters.
48    Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
49
50    :param environ: The WSGI environment to get the URL parts from.
51    :param root_only: Only build the root path, don't include the
52        remaining path or query string.
53    :param strip_querystring: Don't include the query string.
54    :param host_only: Only build the scheme and host.
55    :param trusted_hosts: A list of trusted host names to validate the
56        host against.
57    """
58    parts = {
59        "scheme": environ["wsgi.url_scheme"],
60        "host": get_host(environ, trusted_hosts),
61    }
62
63    if not host_only:
64        parts["root_path"] = environ.get("SCRIPT_NAME", "")
65
66        if not root_only:
67            parts["path"] = environ.get("PATH_INFO", "")
68
69            if not strip_querystring:
70                parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
71
72    return _sansio_utils.get_current_url(**parts)
73
74
75def _get_server(
76    environ: "WSGIEnvironment",
77) -> t.Optional[t.Tuple[str, t.Optional[int]]]:
78    name = environ.get("SERVER_NAME")
79
80    if name is None:
81        return None
82
83    try:
84        port: t.Optional[int] = int(environ.get("SERVER_PORT", None))
85    except (TypeError, ValueError):
86        # unix socket
87        port = None
88
89    return name, port
90
91
92def get_host(
93    environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None
94) -> str:
95    """Return the host for the given WSGI environment.
96
97    The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
98    set. The returned host will only contain the port if it is different
99    than the standard port for the protocol.
100
101    Optionally, verify that the host is trusted using
102    :func:`host_is_trusted` and raise a
103    :exc:`~werkzeug.exceptions.SecurityError` if it is not.
104
105    :param environ: A WSGI environment dict.
106    :param trusted_hosts: A list of trusted host names.
107
108    :return: Host, with port if necessary.
109    :raise ~werkzeug.exceptions.SecurityError: If the host is not
110        trusted.
111    """
112    return _sansio_utils.get_host(
113        environ["wsgi.url_scheme"],
114        environ.get("HTTP_HOST"),
115        _get_server(environ),
116        trusted_hosts,
117    )
118
119
120def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
121    """Returns the content length from the WSGI environment as
122    integer. If it's not available or chunked transfer encoding is used,
123    ``None`` is returned.
124
125    .. versionadded:: 0.9
126
127    :param environ: the WSGI environ to fetch the content length from.
128    """
129    if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked":
130        return None
131
132    content_length = environ.get("CONTENT_LENGTH")
133    if content_length is not None:
134        try:
135            return max(0, int(content_length))
136        except (ValueError, TypeError):
137            pass
138    return None
139
140
141def get_input_stream(
142    environ: "WSGIEnvironment", safe_fallback: bool = True
143) -> t.IO[bytes]:
144    """Returns the input stream from the WSGI environment and wraps it
145    in the most sensible way possible. The stream returned is not the
146    raw WSGI stream in most cases but one that is safe to read from
147    without taking into account the content length.
148
149    If content length is not set, the stream will be empty for safety reasons.
150    If the WSGI server supports chunked or infinite streams, it should set
151    the ``wsgi.input_terminated`` value in the WSGI environ to indicate that.
152
153    .. versionadded:: 0.9
154
155    :param environ: the WSGI environ to fetch the stream from.
156    :param safe_fallback: use an empty stream as a safe fallback when the
157        content length is not set. Disabling this allows infinite streams,
158        which can be a denial-of-service risk.
159    """
160    stream = t.cast(t.IO[bytes], environ["wsgi.input"])
161    content_length = get_content_length(environ)
162
163    # A wsgi extension that tells us if the input is terminated.  In
164    # that case we return the stream unchanged as we know we can safely
165    # read it until the end.
166    if environ.get("wsgi.input_terminated"):
167        return stream
168
169    # If the request doesn't specify a content length, returning the stream is
170    # potentially dangerous because it could be infinite, malicious or not. If
171    # safe_fallback is true, return an empty stream instead for safety.
172    if content_length is None:
173        return io.BytesIO() if safe_fallback else stream
174
175    # Otherwise limit the stream to the content length
176    return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
177
178
179def get_query_string(environ: "WSGIEnvironment") -> str:
180    """Returns the ``QUERY_STRING`` from the WSGI environment. This also
181    takes care of the WSGI decoding dance. The string returned will be
182    restricted to ASCII characters.
183
184    :param environ: WSGI environment to get the query string from.
185
186    .. versionadded:: 0.9
187    """
188    qs = environ.get("QUERY_STRING", "").encode("latin1")
189    # QUERY_STRING really should be ascii safe but some browsers
190    # will send us some unicode stuff (I am looking at you IE).
191    # In that case we want to urllib quote it badly.
192    return url_quote(qs, safe=":&%=+$!*'(),")
193
194
195def get_path_info(
196    environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
197) -> str:
198    """Return the ``PATH_INFO`` from the WSGI environment and decode it
199    unless ``charset`` is ``None``.
200
201    :param environ: WSGI environment to get the path from.
202    :param charset: The charset for the path info, or ``None`` if no
203        decoding should be performed.
204    :param errors: The decoding error handling.
205
206    .. versionadded:: 0.9
207    """
208    path = environ.get("PATH_INFO", "").encode("latin1")
209    return _to_str(path, charset, errors, allow_none_charset=True)  # type: ignore
210
211
212def get_script_name(
213    environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
214) -> str:
215    """Return the ``SCRIPT_NAME`` from the WSGI environment and decode
216    it unless `charset` is set to ``None``.
217
218    :param environ: WSGI environment to get the path from.
219    :param charset: The charset for the path, or ``None`` if no decoding
220        should be performed.
221    :param errors: The decoding error handling.
222
223    .. versionadded:: 0.9
224    """
225    path = environ.get("SCRIPT_NAME", "").encode("latin1")
226    return _to_str(path, charset, errors, allow_none_charset=True)  # type: ignore
227
228
229def pop_path_info(
230    environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
231) -> t.Optional[str]:
232    """Removes and returns the next segment of `PATH_INFO`, pushing it onto
233    `SCRIPT_NAME`.  Returns `None` if there is nothing left on `PATH_INFO`.
234
235    If the `charset` is set to `None` bytes are returned.
236
237    If there are empty segments (``'/foo//bar``) these are ignored but
238    properly pushed to the `SCRIPT_NAME`:
239
240    >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
241    >>> pop_path_info(env)
242    'a'
243    >>> env['SCRIPT_NAME']
244    '/foo/a'
245    >>> pop_path_info(env)
246    'b'
247    >>> env['SCRIPT_NAME']
248    '/foo/a/b'
249
250    .. versionadded:: 0.5
251
252    .. versionchanged:: 0.9
253       The path is now decoded and a charset and encoding
254       parameter can be provided.
255
256    :param environ: the WSGI environment that is modified.
257    :param charset: The ``encoding`` parameter passed to
258        :func:`bytes.decode`.
259    :param errors: The ``errors`` paramater passed to
260        :func:`bytes.decode`.
261    """
262    path = environ.get("PATH_INFO")
263    if not path:
264        return None
265
266    script_name = environ.get("SCRIPT_NAME", "")
267
268    # shift multiple leading slashes over
269    old_path = path
270    path = path.lstrip("/")
271    if path != old_path:
272        script_name += "/" * (len(old_path) - len(path))
273
274    if "/" not in path:
275        environ["PATH_INFO"] = ""
276        environ["SCRIPT_NAME"] = script_name + path
277        rv = path.encode("latin1")
278    else:
279        segment, path = path.split("/", 1)
280        environ["PATH_INFO"] = f"/{path}"
281        environ["SCRIPT_NAME"] = script_name + segment
282        rv = segment.encode("latin1")
283
284    return _to_str(rv, charset, errors, allow_none_charset=True)  # type: ignore
285
286
287def peek_path_info(
288    environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace"
289) -> t.Optional[str]:
290    """Returns the next segment on the `PATH_INFO` or `None` if there
291    is none.  Works like :func:`pop_path_info` without modifying the
292    environment:
293
294    >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'}
295    >>> peek_path_info(env)
296    'a'
297    >>> peek_path_info(env)
298    'a'
299
300    If the `charset` is set to `None` bytes are returned.
301
302    .. versionadded:: 0.5
303
304    .. versionchanged:: 0.9
305       The path is now decoded and a charset and encoding
306       parameter can be provided.
307
308    :param environ: the WSGI environment that is checked.
309    """
310    segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1)
311    if segments:
312        return _to_str(  # type: ignore
313            segments[0].encode("latin1"), charset, errors, allow_none_charset=True
314        )
315    return None
316
317
318def extract_path_info(
319    environ_or_baseurl: t.Union[str, "WSGIEnvironment"],
320    path_or_url: t.Union[str, _URLTuple],
321    charset: str = "utf-8",
322    errors: str = "werkzeug.url_quote",
323    collapse_http_schemes: bool = True,
324) -> t.Optional[str]:
325    """Extracts the path info from the given URL (or WSGI environment) and
326    path. The path info returned is a string. The URLs might also be IRIs.
327
328    If the path info could not be determined, `None` is returned.
329
330    Some examples:
331
332    >>> extract_path_info('http://example.com/app', '/app/hello')
333    '/hello'
334    >>> extract_path_info('http://example.com/app',
335    ...                   'https://example.com/app/hello')
336    '/hello'
337    >>> extract_path_info('http://example.com/app',
338    ...                   'https://example.com/app/hello',
339    ...                   collapse_http_schemes=False) is None
340    True
341
342    Instead of providing a base URL you can also pass a WSGI environment.
343
344    :param environ_or_baseurl: a WSGI environment dict, a base URL or
345                               base IRI.  This is the root of the
346                               application.
347    :param path_or_url: an absolute path from the server root, a
348                        relative path (in which case it's the path info)
349                        or a full URL.
350    :param charset: the charset for byte data in URLs
351    :param errors: the error handling on decode
352    :param collapse_http_schemes: if set to `False` the algorithm does
353                                  not assume that http and https on the
354                                  same server point to the same
355                                  resource.
356
357    .. versionchanged:: 0.15
358        The ``errors`` parameter defaults to leaving invalid bytes
359        quoted instead of replacing them.
360
361    .. versionadded:: 0.6
362    """
363
364    def _normalize_netloc(scheme: str, netloc: str) -> str:
365        parts = netloc.split("@", 1)[-1].split(":", 1)
366        port: t.Optional[str]
367
368        if len(parts) == 2:
369            netloc, port = parts
370            if (scheme == "http" and port == "80") or (
371                scheme == "https" and port == "443"
372            ):
373                port = None
374        else:
375            netloc = parts[0]
376            port = None
377
378        if port is not None:
379            netloc += f":{port}"
380
381        return netloc
382
383    # make sure whatever we are working on is a IRI and parse it
384    path = uri_to_iri(path_or_url, charset, errors)
385    if isinstance(environ_or_baseurl, dict):
386        environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
387    base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
388    base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
389    cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]
390
391    # normalize the network location
392    base_netloc = _normalize_netloc(base_scheme, base_netloc)
393    cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)
394
395    # is that IRI even on a known HTTP scheme?
396    if collapse_http_schemes:
397        for scheme in base_scheme, cur_scheme:
398            if scheme not in ("http", "https"):
399                return None
400    else:
401        if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
402            return None
403
404    # are the netlocs compatible?
405    if base_netloc != cur_netloc:
406        return None
407
408    # are we below the application path?
409    base_path = base_path.rstrip("/")
410    if not cur_path.startswith(base_path):
411        return None
412
413    return f"/{cur_path[len(base_path) :].lstrip('/')}"
414
415
416class ClosingIterator:
417    """The WSGI specification requires that all middlewares and gateways
418    respect the `close` callback of the iterable returned by the application.
419    Because it is useful to add another close action to a returned iterable
420    and adding a custom iterable is a boring task this class can be used for
421    that::
422
423        return ClosingIterator(app(environ, start_response), [cleanup_session,
424                                                              cleanup_locals])
425
426    If there is just one close function it can be passed instead of the list.
427
428    A closing iterator is not needed if the application uses response objects
429    and finishes the processing if the response is started::
430
431        try:
432            return response(environ, start_response)
433        finally:
434            cleanup_session()
435            cleanup_locals()
436    """
437
438    def __init__(
439        self,
440        iterable: t.Iterable[bytes],
441        callbacks: t.Optional[
442            t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]]
443        ] = None,
444    ) -> None:
445        iterator = iter(iterable)
446        self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
447        if callbacks is None:
448            callbacks = []
449        elif callable(callbacks):
450            callbacks = [callbacks]
451        else:
452            callbacks = list(callbacks)
453        iterable_close = getattr(iterable, "close", None)
454        if iterable_close:
455            callbacks.insert(0, iterable_close)
456        self._callbacks = callbacks
457
458    def __iter__(self) -> "ClosingIterator":
459        return self
460
461    def __next__(self) -> bytes:
462        return self._next()
463
464    def close(self) -> None:
465        for callback in self._callbacks:
466            callback()
467
468
469def wrap_file(
470    environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192
471) -> t.Iterable[bytes]:
472    """Wraps a file.  This uses the WSGI server's file wrapper if available
473    or otherwise the generic :class:`FileWrapper`.
474
475    .. versionadded:: 0.5
476
477    If the file wrapper from the WSGI server is used it's important to not
478    iterate over it from inside the application but to pass it through
479    unchanged.  If you want to pass out a file wrapper inside a response
480    object you have to set :attr:`Response.direct_passthrough` to `True`.
481
482    More information about file wrappers are available in :pep:`333`.
483
484    :param file: a :class:`file`-like object with a :meth:`~file.read` method.
485    :param buffer_size: number of bytes for one iteration.
486    """
487    return environ.get("wsgi.file_wrapper", FileWrapper)(  # type: ignore
488        file, buffer_size
489    )
490
491
492class FileWrapper:
493    """This class can be used to convert a :class:`file`-like object into
494    an iterable.  It yields `buffer_size` blocks until the file is fully
495    read.
496
497    You should not use this class directly but rather use the
498    :func:`wrap_file` function that uses the WSGI server's file wrapper
499    support if it's available.
500
501    .. versionadded:: 0.5
502
503    If you're using this object together with a :class:`Response` you have
504    to use the `direct_passthrough` mode.
505
506    :param file: a :class:`file`-like object with a :meth:`~file.read` method.
507    :param buffer_size: number of bytes for one iteration.
508    """
509
510    def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
511        self.file = file
512        self.buffer_size = buffer_size
513
514    def close(self) -> None:
515        if hasattr(self.file, "close"):
516            self.file.close()
517
518    def seekable(self) -> bool:
519        if hasattr(self.file, "seekable"):
520            return self.file.seekable()
521        if hasattr(self.file, "seek"):
522            return True
523        return False
524
525    def seek(self, *args: t.Any) -> None:
526        if hasattr(self.file, "seek"):
527            self.file.seek(*args)
528
529    def tell(self) -> t.Optional[int]:
530        if hasattr(self.file, "tell"):
531            return self.file.tell()
532        return None
533
534    def __iter__(self) -> "FileWrapper":
535        return self
536
537    def __next__(self) -> bytes:
538        data = self.file.read(self.buffer_size)
539        if data:
540            return data
541        raise StopIteration()
542
543
544class _RangeWrapper:
545    # private for now, but should we make it public in the future ?
546
547    """This class can be used to convert an iterable object into
548    an iterable that will only yield a piece of the underlying content.
549    It yields blocks until the underlying stream range is fully read.
550    The yielded blocks will have a size that can't exceed the original
551    iterator defined block size, but that can be smaller.
552
553    If you're using this object together with a :class:`Response` you have
554    to use the `direct_passthrough` mode.
555
556    :param iterable: an iterable object with a :meth:`__next__` method.
557    :param start_byte: byte from which read will start.
558    :param byte_range: how many bytes to read.
559    """
560
561    def __init__(
562        self,
563        iterable: t.Union[t.Iterable[bytes], t.IO[bytes]],
564        start_byte: int = 0,
565        byte_range: t.Optional[int] = None,
566    ):
567        self.iterable = iter(iterable)
568        self.byte_range = byte_range
569        self.start_byte = start_byte
570        self.end_byte = None
571
572        if byte_range is not None:
573            self.end_byte = start_byte + byte_range
574
575        self.read_length = 0
576        self.seekable = (
577            hasattr(iterable, "seekable") and iterable.seekable()  # type: ignore
578        )
579        self.end_reached = False
580
581    def __iter__(self) -> "_RangeWrapper":
582        return self
583
584    def _next_chunk(self) -> bytes:
585        try:
586            chunk = next(self.iterable)
587            self.read_length += len(chunk)
588            return chunk
589        except StopIteration:
590            self.end_reached = True
591            raise
592
593    def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]:
594        chunk = None
595        if self.seekable:
596            self.iterable.seek(self.start_byte)  # type: ignore
597            self.read_length = self.iterable.tell()  # type: ignore
598            contextual_read_length = self.read_length
599        else:
600            while self.read_length <= self.start_byte:
601                chunk = self._next_chunk()
602            if chunk is not None:
603                chunk = chunk[self.start_byte - self.read_length :]
604            contextual_read_length = self.start_byte
605        return chunk, contextual_read_length
606
607    def _next(self) -> bytes:
608        if self.end_reached:
609            raise StopIteration()
610        chunk = None
611        contextual_read_length = self.read_length
612        if self.read_length == 0:
613            chunk, contextual_read_length = self._first_iteration()
614        if chunk is None:
615            chunk = self._next_chunk()
616        if self.end_byte is not None and self.read_length >= self.end_byte:
617            self.end_reached = True
618            return chunk[: self.end_byte - contextual_read_length]
619        return chunk
620
621    def __next__(self) -> bytes:
622        chunk = self._next()
623        if chunk:
624            return chunk
625        self.end_reached = True
626        raise StopIteration()
627
628    def close(self) -> None:
629        if hasattr(self.iterable, "close"):
630            self.iterable.close()  # type: ignore
631
632
633def _make_chunk_iter(
634    stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
635    limit: t.Optional[int],
636    buffer_size: int,
637) -> t.Iterator[bytes]:
638    """Helper for the line and chunk iter functions."""
639    if isinstance(stream, (bytes, bytearray, str)):
640        raise TypeError(
641            "Passed a string or byte object instead of true iterator or stream."
642        )
643    if not hasattr(stream, "read"):
644        for item in stream:
645            if item:
646                yield item
647        return
648    stream = t.cast(t.IO[bytes], stream)
649    if not isinstance(stream, LimitedStream) and limit is not None:
650        stream = t.cast(t.IO[bytes], LimitedStream(stream, limit))
651    _read = stream.read
652    while True:
653        item = _read(buffer_size)
654        if not item:
655            break
656        yield item
657
658
659def make_line_iter(
660    stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
661    limit: t.Optional[int] = None,
662    buffer_size: int = 10 * 1024,
663    cap_at_buffer: bool = False,
664) -> t.Iterator[bytes]:
665    """Safely iterates line-based over an input stream.  If the input stream
666    is not a :class:`LimitedStream` the `limit` parameter is mandatory.
667
668    This uses the stream's :meth:`~file.read` method internally as opposite
669    to the :meth:`~file.readline` method that is unsafe and can only be used
670    in violation of the WSGI specification.  The same problem applies to the
671    `__iter__` function of the input stream which calls :meth:`~file.readline`
672    without arguments.
673
674    If you need line-by-line processing it's strongly recommended to iterate
675    over the input stream using this helper function.
676
677    .. versionchanged:: 0.8
678       This function now ensures that the limit was reached.
679
680    .. versionadded:: 0.9
681       added support for iterators as input stream.
682
683    .. versionadded:: 0.11.10
684       added support for the `cap_at_buffer` parameter.
685
686    :param stream: the stream or iterate to iterate over.
687    :param limit: the limit in bytes for the stream.  (Usually
688                  content length.  Not necessary if the `stream`
689                  is a :class:`LimitedStream`.
690    :param buffer_size: The optional buffer size.
691    :param cap_at_buffer: if this is set chunks are split if they are longer
692                          than the buffer size.  Internally this is implemented
693                          that the buffer size might be exhausted by a factor
694                          of two however.
695    """
696    _iter = _make_chunk_iter(stream, limit, buffer_size)
697
698    first_item = next(_iter, "")
699    if not first_item:
700        return
701
702    s = _make_encode_wrapper(first_item)
703    empty = t.cast(bytes, s(""))
704    cr = t.cast(bytes, s("\r"))
705    lf = t.cast(bytes, s("\n"))
706    crlf = t.cast(bytes, s("\r\n"))
707
708    _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
709
710    def _iter_basic_lines() -> t.Iterator[bytes]:
711        _join = empty.join
712        buffer: t.List[bytes] = []
713        while True:
714            new_data = next(_iter, "")
715            if not new_data:
716                break
717            new_buf: t.List[bytes] = []
718            buf_size = 0
719            for item in t.cast(
720                t.Iterator[bytes], chain(buffer, new_data.splitlines(True))
721            ):
722                new_buf.append(item)
723                buf_size += len(item)
724                if item and item[-1:] in crlf:
725                    yield _join(new_buf)
726                    new_buf = []
727                elif cap_at_buffer and buf_size >= buffer_size:
728                    rv = _join(new_buf)
729                    while len(rv) >= buffer_size:
730                        yield rv[:buffer_size]
731                        rv = rv[buffer_size:]
732                    new_buf = [rv]
733            buffer = new_buf
734        if buffer:
735            yield _join(buffer)
736
737    # This hackery is necessary to merge 'foo\r' and '\n' into one item
738    # of 'foo\r\n' if we were unlucky and we hit a chunk boundary.
739    previous = empty
740    for item in _iter_basic_lines():
741        if item == lf and previous[-1:] == cr:
742            previous += item
743            item = empty
744        if previous:
745            yield previous
746        previous = item
747    if previous:
748        yield previous
749
750
751def make_chunk_iter(
752    stream: t.Union[t.Iterable[bytes], t.IO[bytes]],
753    separator: bytes,
754    limit: t.Optional[int] = None,
755    buffer_size: int = 10 * 1024,
756    cap_at_buffer: bool = False,
757) -> t.Iterator[bytes]:
758    """Works like :func:`make_line_iter` but accepts a separator
759    which divides chunks.  If you want newline based processing
760    you should use :func:`make_line_iter` instead as it
761    supports arbitrary newline markers.
762
763    .. versionadded:: 0.8
764
765    .. versionadded:: 0.9
766       added support for iterators as input stream.
767
768    .. versionadded:: 0.11.10
769       added support for the `cap_at_buffer` parameter.
770
771    :param stream: the stream or iterate to iterate over.
772    :param separator: the separator that divides chunks.
773    :param limit: the limit in bytes for the stream.  (Usually
774                  content length.  Not necessary if the `stream`
775                  is otherwise already limited).
776    :param buffer_size: The optional buffer size.
777    :param cap_at_buffer: if this is set chunks are split if they are longer
778                          than the buffer size.  Internally this is implemented
779                          that the buffer size might be exhausted by a factor
780                          of two however.
781    """
782    _iter = _make_chunk_iter(stream, limit, buffer_size)
783
784    first_item = next(_iter, b"")
785    if not first_item:
786        return
787
788    _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter))
789    if isinstance(first_item, str):
790        separator = _to_str(separator)
791        _split = re.compile(f"({re.escape(separator)})").split
792        _join = "".join
793    else:
794        separator = _to_bytes(separator)
795        _split = re.compile(b"(" + re.escape(separator) + b")").split
796        _join = b"".join
797
798    buffer: t.List[bytes] = []
799    while True:
800        new_data = next(_iter, b"")
801        if not new_data:
802            break
803        chunks = _split(new_data)
804        new_buf: t.List[bytes] = []
805        buf_size = 0
806        for item in chain(buffer, chunks):
807            if item == separator:
808                yield _join(new_buf)
809                new_buf = []
810                buf_size = 0
811            else:
812                buf_size += len(item)
813                new_buf.append(item)
814
815                if cap_at_buffer and buf_size >= buffer_size:
816                    rv = _join(new_buf)
817                    while len(rv) >= buffer_size:
818                        yield rv[:buffer_size]
819                        rv = rv[buffer_size:]
820                    new_buf = [rv]
821                    buf_size = len(rv)
822
823        buffer = new_buf
824    if buffer:
825        yield _join(buffer)
826
827
828class LimitedStream(io.IOBase):
829    """Wraps a stream so that it doesn't read more than n bytes.  If the
830    stream is exhausted and the caller tries to get more bytes from it
831    :func:`on_exhausted` is called which by default returns an empty
832    string.  The return value of that function is forwarded
833    to the reader function.  So if it returns an empty string
834    :meth:`read` will return an empty string as well.
835
836    The limit however must never be higher than what the stream can
837    output.  Otherwise :meth:`readlines` will try to read past the
838    limit.
839
840    .. admonition:: Note on WSGI compliance
841
842       calls to :meth:`readline` and :meth:`readlines` are not
843       WSGI compliant because it passes a size argument to the
844       readline methods.  Unfortunately the WSGI PEP is not safely
845       implementable without a size argument to :meth:`readline`
846       because there is no EOF marker in the stream.  As a result
847       of that the use of :meth:`readline` is discouraged.
848
849       For the same reason iterating over the :class:`LimitedStream`
850       is not portable.  It internally calls :meth:`readline`.
851
852       We strongly suggest using :meth:`read` only or using the
853       :func:`make_line_iter` which safely iterates line-based
854       over a WSGI input stream.
855
856    :param stream: the stream to wrap.
857    :param limit: the limit for the stream, must not be longer than
858                  what the string can provide if the stream does not
859                  end with `EOF` (like `wsgi.input`)
860    """
861
862    def __init__(self, stream: t.IO[bytes], limit: int) -> None:
863        self._read = stream.read
864        self._readline = stream.readline
865        self._pos = 0
866        self.limit = limit
867
868    def __iter__(self) -> "LimitedStream":
869        return self
870
871    @property
872    def is_exhausted(self) -> bool:
873        """If the stream is exhausted this attribute is `True`."""
874        return self._pos >= self.limit
875
876    def on_exhausted(self) -> bytes:
877        """This is called when the stream tries to read past the limit.
878        The return value of this function is returned from the reading
879        function.
880        """
881        # Read null bytes from the stream so that we get the
882        # correct end of stream marker.
883        return self._read(0)
884
885    def on_disconnect(self) -> bytes:
886        """What should happen if a disconnect is detected?  The return
887        value of this function is returned from read functions in case
888        the client went away.  By default a
889        :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised.
890        """
891        from .exceptions import ClientDisconnected
892
893        raise ClientDisconnected()
894
895    def exhaust(self, chunk_size: int = 1024 * 64) -> None:
896        """Exhaust the stream.  This consumes all the data left until the
897        limit is reached.
898
899        :param chunk_size: the size for a chunk.  It will read the chunk
900                           until the stream is exhausted and throw away
901                           the results.
902        """
903        to_read = self.limit - self._pos
904        chunk = chunk_size
905        while to_read > 0:
906            chunk = min(to_read, chunk)
907            self.read(chunk)
908            to_read -= chunk
909
910    def read(self, size: t.Optional[int] = None) -> bytes:
911        """Read `size` bytes or if size is not provided everything is read.
912
913        :param size: the number of bytes read.
914        """
915        if self._pos >= self.limit:
916            return self.on_exhausted()
917        if size is None or size == -1:  # -1 is for consistence with file
918            size = self.limit
919        to_read = min(self.limit - self._pos, size)
920        try:
921            read = self._read(to_read)
922        except (OSError, ValueError):
923            return self.on_disconnect()
924        if to_read and len(read) != to_read:
925            return self.on_disconnect()
926        self._pos += len(read)
927        return read
928
929    def readline(self, size: t.Optional[int] = None) -> bytes:
930        """Reads one line from the stream."""
931        if self._pos >= self.limit:
932            return self.on_exhausted()
933        if size is None:
934            size = self.limit - self._pos
935        else:
936            size = min(size, self.limit - self._pos)
937        try:
938            line = self._readline(size)
939        except (ValueError, OSError):
940            return self.on_disconnect()
941        if size and not line:
942            return self.on_disconnect()
943        self._pos += len(line)
944        return line
945
946    def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]:
947        """Reads a file into a list of strings.  It calls :meth:`readline`
948        until the file is read to the end.  It does support the optional
949        `size` argument if the underlying stream supports it for
950        `readline`.
951        """
952        last_pos = self._pos
953        result = []
954        if size is not None:
955            end = min(self.limit, last_pos + size)
956        else:
957            end = self.limit
958        while True:
959            if size is not None:
960                size -= last_pos - self._pos
961            if self._pos >= end:
962                break
963            result.append(self.readline(size))
964            if size is not None:
965                last_pos = self._pos
966        return result
967
968    def tell(self) -> int:
969        """Returns the position of the stream.
970
971        .. versionadded:: 0.9
972        """
973        return self._pos
974
975    def __next__(self) -> bytes:
976        line = self.readline()
977        if not line:
978            raise StopIteration()
979        return line
980
981    def readable(self) -> bool:
982        return True
983