1import io 2import re 3import typing as t 4from functools import partial 5from functools import update_wrapper 6from itertools import chain 7 8from ._internal import _make_encode_wrapper 9from ._internal import _to_bytes 10from ._internal import _to_str 11from .sansio import utils as _sansio_utils 12from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API 13from .urls import _URLTuple 14from .urls import uri_to_iri 15from .urls import url_join 16from .urls import url_parse 17from .urls import url_quote 18 19if t.TYPE_CHECKING: 20 from _typeshed.wsgi import WSGIApplication 21 from _typeshed.wsgi import WSGIEnvironment 22 23 24def responder(f: t.Callable[..., "WSGIApplication"]) -> "WSGIApplication": 25 """Marks a function as responder. Decorate a function with it and it 26 will automatically call the return value as WSGI application. 27 28 Example:: 29 30 @responder 31 def application(environ, start_response): 32 return Response('Hello World!') 33 """ 34 return update_wrapper(lambda *a: f(*a)(*a[-2:]), f) 35 36 37def get_current_url( 38 environ: "WSGIEnvironment", 39 root_only: bool = False, 40 strip_querystring: bool = False, 41 host_only: bool = False, 42 trusted_hosts: t.Optional[t.Iterable[str]] = None, 43) -> str: 44 """Recreate the URL for a request from the parts in a WSGI 45 environment. 46 47 The URL is an IRI, not a URI, so it may contain Unicode characters. 48 Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII. 49 50 :param environ: The WSGI environment to get the URL parts from. 51 :param root_only: Only build the root path, don't include the 52 remaining path or query string. 53 :param strip_querystring: Don't include the query string. 54 :param host_only: Only build the scheme and host. 55 :param trusted_hosts: A list of trusted host names to validate the 56 host against. 57 """ 58 parts = { 59 "scheme": environ["wsgi.url_scheme"], 60 "host": get_host(environ, trusted_hosts), 61 } 62 63 if not host_only: 64 parts["root_path"] = environ.get("SCRIPT_NAME", "") 65 66 if not root_only: 67 parts["path"] = environ.get("PATH_INFO", "") 68 69 if not strip_querystring: 70 parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1") 71 72 return _sansio_utils.get_current_url(**parts) 73 74 75def _get_server( 76 environ: "WSGIEnvironment", 77) -> t.Optional[t.Tuple[str, t.Optional[int]]]: 78 name = environ.get("SERVER_NAME") 79 80 if name is None: 81 return None 82 83 try: 84 port: t.Optional[int] = int(environ.get("SERVER_PORT", None)) 85 except (TypeError, ValueError): 86 # unix socket 87 port = None 88 89 return name, port 90 91 92def get_host( 93 environ: "WSGIEnvironment", trusted_hosts: t.Optional[t.Iterable[str]] = None 94) -> str: 95 """Return the host for the given WSGI environment. 96 97 The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not 98 set. The returned host will only contain the port if it is different 99 than the standard port for the protocol. 100 101 Optionally, verify that the host is trusted using 102 :func:`host_is_trusted` and raise a 103 :exc:`~werkzeug.exceptions.SecurityError` if it is not. 104 105 :param environ: A WSGI environment dict. 106 :param trusted_hosts: A list of trusted host names. 107 108 :return: Host, with port if necessary. 109 :raise ~werkzeug.exceptions.SecurityError: If the host is not 110 trusted. 111 """ 112 return _sansio_utils.get_host( 113 environ["wsgi.url_scheme"], 114 environ.get("HTTP_HOST"), 115 _get_server(environ), 116 trusted_hosts, 117 ) 118 119 120def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]: 121 """Returns the content length from the WSGI environment as 122 integer. If it's not available or chunked transfer encoding is used, 123 ``None`` is returned. 124 125 .. versionadded:: 0.9 126 127 :param environ: the WSGI environ to fetch the content length from. 128 """ 129 if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked": 130 return None 131 132 content_length = environ.get("CONTENT_LENGTH") 133 if content_length is not None: 134 try: 135 return max(0, int(content_length)) 136 except (ValueError, TypeError): 137 pass 138 return None 139 140 141def get_input_stream( 142 environ: "WSGIEnvironment", safe_fallback: bool = True 143) -> t.IO[bytes]: 144 """Returns the input stream from the WSGI environment and wraps it 145 in the most sensible way possible. The stream returned is not the 146 raw WSGI stream in most cases but one that is safe to read from 147 without taking into account the content length. 148 149 If content length is not set, the stream will be empty for safety reasons. 150 If the WSGI server supports chunked or infinite streams, it should set 151 the ``wsgi.input_terminated`` value in the WSGI environ to indicate that. 152 153 .. versionadded:: 0.9 154 155 :param environ: the WSGI environ to fetch the stream from. 156 :param safe_fallback: use an empty stream as a safe fallback when the 157 content length is not set. Disabling this allows infinite streams, 158 which can be a denial-of-service risk. 159 """ 160 stream = t.cast(t.IO[bytes], environ["wsgi.input"]) 161 content_length = get_content_length(environ) 162 163 # A wsgi extension that tells us if the input is terminated. In 164 # that case we return the stream unchanged as we know we can safely 165 # read it until the end. 166 if environ.get("wsgi.input_terminated"): 167 return stream 168 169 # If the request doesn't specify a content length, returning the stream is 170 # potentially dangerous because it could be infinite, malicious or not. If 171 # safe_fallback is true, return an empty stream instead for safety. 172 if content_length is None: 173 return io.BytesIO() if safe_fallback else stream 174 175 # Otherwise limit the stream to the content length 176 return t.cast(t.IO[bytes], LimitedStream(stream, content_length)) 177 178 179def get_query_string(environ: "WSGIEnvironment") -> str: 180 """Returns the ``QUERY_STRING`` from the WSGI environment. This also 181 takes care of the WSGI decoding dance. The string returned will be 182 restricted to ASCII characters. 183 184 :param environ: WSGI environment to get the query string from. 185 186 .. versionadded:: 0.9 187 """ 188 qs = environ.get("QUERY_STRING", "").encode("latin1") 189 # QUERY_STRING really should be ascii safe but some browsers 190 # will send us some unicode stuff (I am looking at you IE). 191 # In that case we want to urllib quote it badly. 192 return url_quote(qs, safe=":&%=+$!*'(),") 193 194 195def get_path_info( 196 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 197) -> str: 198 """Return the ``PATH_INFO`` from the WSGI environment and decode it 199 unless ``charset`` is ``None``. 200 201 :param environ: WSGI environment to get the path from. 202 :param charset: The charset for the path info, or ``None`` if no 203 decoding should be performed. 204 :param errors: The decoding error handling. 205 206 .. versionadded:: 0.9 207 """ 208 path = environ.get("PATH_INFO", "").encode("latin1") 209 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 210 211 212def get_script_name( 213 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 214) -> str: 215 """Return the ``SCRIPT_NAME`` from the WSGI environment and decode 216 it unless `charset` is set to ``None``. 217 218 :param environ: WSGI environment to get the path from. 219 :param charset: The charset for the path, or ``None`` if no decoding 220 should be performed. 221 :param errors: The decoding error handling. 222 223 .. versionadded:: 0.9 224 """ 225 path = environ.get("SCRIPT_NAME", "").encode("latin1") 226 return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore 227 228 229def pop_path_info( 230 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 231) -> t.Optional[str]: 232 """Removes and returns the next segment of `PATH_INFO`, pushing it onto 233 `SCRIPT_NAME`. Returns `None` if there is nothing left on `PATH_INFO`. 234 235 If the `charset` is set to `None` bytes are returned. 236 237 If there are empty segments (``'/foo//bar``) these are ignored but 238 properly pushed to the `SCRIPT_NAME`: 239 240 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 241 >>> pop_path_info(env) 242 'a' 243 >>> env['SCRIPT_NAME'] 244 '/foo/a' 245 >>> pop_path_info(env) 246 'b' 247 >>> env['SCRIPT_NAME'] 248 '/foo/a/b' 249 250 .. versionadded:: 0.5 251 252 .. versionchanged:: 0.9 253 The path is now decoded and a charset and encoding 254 parameter can be provided. 255 256 :param environ: the WSGI environment that is modified. 257 :param charset: The ``encoding`` parameter passed to 258 :func:`bytes.decode`. 259 :param errors: The ``errors`` paramater passed to 260 :func:`bytes.decode`. 261 """ 262 path = environ.get("PATH_INFO") 263 if not path: 264 return None 265 266 script_name = environ.get("SCRIPT_NAME", "") 267 268 # shift multiple leading slashes over 269 old_path = path 270 path = path.lstrip("/") 271 if path != old_path: 272 script_name += "/" * (len(old_path) - len(path)) 273 274 if "/" not in path: 275 environ["PATH_INFO"] = "" 276 environ["SCRIPT_NAME"] = script_name + path 277 rv = path.encode("latin1") 278 else: 279 segment, path = path.split("/", 1) 280 environ["PATH_INFO"] = f"/{path}" 281 environ["SCRIPT_NAME"] = script_name + segment 282 rv = segment.encode("latin1") 283 284 return _to_str(rv, charset, errors, allow_none_charset=True) # type: ignore 285 286 287def peek_path_info( 288 environ: "WSGIEnvironment", charset: str = "utf-8", errors: str = "replace" 289) -> t.Optional[str]: 290 """Returns the next segment on the `PATH_INFO` or `None` if there 291 is none. Works like :func:`pop_path_info` without modifying the 292 environment: 293 294 >>> env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b'} 295 >>> peek_path_info(env) 296 'a' 297 >>> peek_path_info(env) 298 'a' 299 300 If the `charset` is set to `None` bytes are returned. 301 302 .. versionadded:: 0.5 303 304 .. versionchanged:: 0.9 305 The path is now decoded and a charset and encoding 306 parameter can be provided. 307 308 :param environ: the WSGI environment that is checked. 309 """ 310 segments = environ.get("PATH_INFO", "").lstrip("/").split("/", 1) 311 if segments: 312 return _to_str( # type: ignore 313 segments[0].encode("latin1"), charset, errors, allow_none_charset=True 314 ) 315 return None 316 317 318def extract_path_info( 319 environ_or_baseurl: t.Union[str, "WSGIEnvironment"], 320 path_or_url: t.Union[str, _URLTuple], 321 charset: str = "utf-8", 322 errors: str = "werkzeug.url_quote", 323 collapse_http_schemes: bool = True, 324) -> t.Optional[str]: 325 """Extracts the path info from the given URL (or WSGI environment) and 326 path. The path info returned is a string. The URLs might also be IRIs. 327 328 If the path info could not be determined, `None` is returned. 329 330 Some examples: 331 332 >>> extract_path_info('http://example.com/app', '/app/hello') 333 '/hello' 334 >>> extract_path_info('http://example.com/app', 335 ... 'https://example.com/app/hello') 336 '/hello' 337 >>> extract_path_info('http://example.com/app', 338 ... 'https://example.com/app/hello', 339 ... collapse_http_schemes=False) is None 340 True 341 342 Instead of providing a base URL you can also pass a WSGI environment. 343 344 :param environ_or_baseurl: a WSGI environment dict, a base URL or 345 base IRI. This is the root of the 346 application. 347 :param path_or_url: an absolute path from the server root, a 348 relative path (in which case it's the path info) 349 or a full URL. 350 :param charset: the charset for byte data in URLs 351 :param errors: the error handling on decode 352 :param collapse_http_schemes: if set to `False` the algorithm does 353 not assume that http and https on the 354 same server point to the same 355 resource. 356 357 .. versionchanged:: 0.15 358 The ``errors`` parameter defaults to leaving invalid bytes 359 quoted instead of replacing them. 360 361 .. versionadded:: 0.6 362 """ 363 364 def _normalize_netloc(scheme: str, netloc: str) -> str: 365 parts = netloc.split("@", 1)[-1].split(":", 1) 366 port: t.Optional[str] 367 368 if len(parts) == 2: 369 netloc, port = parts 370 if (scheme == "http" and port == "80") or ( 371 scheme == "https" and port == "443" 372 ): 373 port = None 374 else: 375 netloc = parts[0] 376 port = None 377 378 if port is not None: 379 netloc += f":{port}" 380 381 return netloc 382 383 # make sure whatever we are working on is a IRI and parse it 384 path = uri_to_iri(path_or_url, charset, errors) 385 if isinstance(environ_or_baseurl, dict): 386 environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True) 387 base_iri = uri_to_iri(environ_or_baseurl, charset, errors) 388 base_scheme, base_netloc, base_path = url_parse(base_iri)[:3] 389 cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3] 390 391 # normalize the network location 392 base_netloc = _normalize_netloc(base_scheme, base_netloc) 393 cur_netloc = _normalize_netloc(cur_scheme, cur_netloc) 394 395 # is that IRI even on a known HTTP scheme? 396 if collapse_http_schemes: 397 for scheme in base_scheme, cur_scheme: 398 if scheme not in ("http", "https"): 399 return None 400 else: 401 if not (base_scheme in ("http", "https") and base_scheme == cur_scheme): 402 return None 403 404 # are the netlocs compatible? 405 if base_netloc != cur_netloc: 406 return None 407 408 # are we below the application path? 409 base_path = base_path.rstrip("/") 410 if not cur_path.startswith(base_path): 411 return None 412 413 return f"/{cur_path[len(base_path) :].lstrip('/')}" 414 415 416class ClosingIterator: 417 """The WSGI specification requires that all middlewares and gateways 418 respect the `close` callback of the iterable returned by the application. 419 Because it is useful to add another close action to a returned iterable 420 and adding a custom iterable is a boring task this class can be used for 421 that:: 422 423 return ClosingIterator(app(environ, start_response), [cleanup_session, 424 cleanup_locals]) 425 426 If there is just one close function it can be passed instead of the list. 427 428 A closing iterator is not needed if the application uses response objects 429 and finishes the processing if the response is started:: 430 431 try: 432 return response(environ, start_response) 433 finally: 434 cleanup_session() 435 cleanup_locals() 436 """ 437 438 def __init__( 439 self, 440 iterable: t.Iterable[bytes], 441 callbacks: t.Optional[ 442 t.Union[t.Callable[[], None], t.Iterable[t.Callable[[], None]]] 443 ] = None, 444 ) -> None: 445 iterator = iter(iterable) 446 self._next = t.cast(t.Callable[[], bytes], partial(next, iterator)) 447 if callbacks is None: 448 callbacks = [] 449 elif callable(callbacks): 450 callbacks = [callbacks] 451 else: 452 callbacks = list(callbacks) 453 iterable_close = getattr(iterable, "close", None) 454 if iterable_close: 455 callbacks.insert(0, iterable_close) 456 self._callbacks = callbacks 457 458 def __iter__(self) -> "ClosingIterator": 459 return self 460 461 def __next__(self) -> bytes: 462 return self._next() 463 464 def close(self) -> None: 465 for callback in self._callbacks: 466 callback() 467 468 469def wrap_file( 470 environ: "WSGIEnvironment", file: t.IO[bytes], buffer_size: int = 8192 471) -> t.Iterable[bytes]: 472 """Wraps a file. This uses the WSGI server's file wrapper if available 473 or otherwise the generic :class:`FileWrapper`. 474 475 .. versionadded:: 0.5 476 477 If the file wrapper from the WSGI server is used it's important to not 478 iterate over it from inside the application but to pass it through 479 unchanged. If you want to pass out a file wrapper inside a response 480 object you have to set :attr:`Response.direct_passthrough` to `True`. 481 482 More information about file wrappers are available in :pep:`333`. 483 484 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 485 :param buffer_size: number of bytes for one iteration. 486 """ 487 return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore 488 file, buffer_size 489 ) 490 491 492class FileWrapper: 493 """This class can be used to convert a :class:`file`-like object into 494 an iterable. It yields `buffer_size` blocks until the file is fully 495 read. 496 497 You should not use this class directly but rather use the 498 :func:`wrap_file` function that uses the WSGI server's file wrapper 499 support if it's available. 500 501 .. versionadded:: 0.5 502 503 If you're using this object together with a :class:`Response` you have 504 to use the `direct_passthrough` mode. 505 506 :param file: a :class:`file`-like object with a :meth:`~file.read` method. 507 :param buffer_size: number of bytes for one iteration. 508 """ 509 510 def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None: 511 self.file = file 512 self.buffer_size = buffer_size 513 514 def close(self) -> None: 515 if hasattr(self.file, "close"): 516 self.file.close() 517 518 def seekable(self) -> bool: 519 if hasattr(self.file, "seekable"): 520 return self.file.seekable() 521 if hasattr(self.file, "seek"): 522 return True 523 return False 524 525 def seek(self, *args: t.Any) -> None: 526 if hasattr(self.file, "seek"): 527 self.file.seek(*args) 528 529 def tell(self) -> t.Optional[int]: 530 if hasattr(self.file, "tell"): 531 return self.file.tell() 532 return None 533 534 def __iter__(self) -> "FileWrapper": 535 return self 536 537 def __next__(self) -> bytes: 538 data = self.file.read(self.buffer_size) 539 if data: 540 return data 541 raise StopIteration() 542 543 544class _RangeWrapper: 545 # private for now, but should we make it public in the future ? 546 547 """This class can be used to convert an iterable object into 548 an iterable that will only yield a piece of the underlying content. 549 It yields blocks until the underlying stream range is fully read. 550 The yielded blocks will have a size that can't exceed the original 551 iterator defined block size, but that can be smaller. 552 553 If you're using this object together with a :class:`Response` you have 554 to use the `direct_passthrough` mode. 555 556 :param iterable: an iterable object with a :meth:`__next__` method. 557 :param start_byte: byte from which read will start. 558 :param byte_range: how many bytes to read. 559 """ 560 561 def __init__( 562 self, 563 iterable: t.Union[t.Iterable[bytes], t.IO[bytes]], 564 start_byte: int = 0, 565 byte_range: t.Optional[int] = None, 566 ): 567 self.iterable = iter(iterable) 568 self.byte_range = byte_range 569 self.start_byte = start_byte 570 self.end_byte = None 571 572 if byte_range is not None: 573 self.end_byte = start_byte + byte_range 574 575 self.read_length = 0 576 self.seekable = ( 577 hasattr(iterable, "seekable") and iterable.seekable() # type: ignore 578 ) 579 self.end_reached = False 580 581 def __iter__(self) -> "_RangeWrapper": 582 return self 583 584 def _next_chunk(self) -> bytes: 585 try: 586 chunk = next(self.iterable) 587 self.read_length += len(chunk) 588 return chunk 589 except StopIteration: 590 self.end_reached = True 591 raise 592 593 def _first_iteration(self) -> t.Tuple[t.Optional[bytes], int]: 594 chunk = None 595 if self.seekable: 596 self.iterable.seek(self.start_byte) # type: ignore 597 self.read_length = self.iterable.tell() # type: ignore 598 contextual_read_length = self.read_length 599 else: 600 while self.read_length <= self.start_byte: 601 chunk = self._next_chunk() 602 if chunk is not None: 603 chunk = chunk[self.start_byte - self.read_length :] 604 contextual_read_length = self.start_byte 605 return chunk, contextual_read_length 606 607 def _next(self) -> bytes: 608 if self.end_reached: 609 raise StopIteration() 610 chunk = None 611 contextual_read_length = self.read_length 612 if self.read_length == 0: 613 chunk, contextual_read_length = self._first_iteration() 614 if chunk is None: 615 chunk = self._next_chunk() 616 if self.end_byte is not None and self.read_length >= self.end_byte: 617 self.end_reached = True 618 return chunk[: self.end_byte - contextual_read_length] 619 return chunk 620 621 def __next__(self) -> bytes: 622 chunk = self._next() 623 if chunk: 624 return chunk 625 self.end_reached = True 626 raise StopIteration() 627 628 def close(self) -> None: 629 if hasattr(self.iterable, "close"): 630 self.iterable.close() # type: ignore 631 632 633def _make_chunk_iter( 634 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 635 limit: t.Optional[int], 636 buffer_size: int, 637) -> t.Iterator[bytes]: 638 """Helper for the line and chunk iter functions.""" 639 if isinstance(stream, (bytes, bytearray, str)): 640 raise TypeError( 641 "Passed a string or byte object instead of true iterator or stream." 642 ) 643 if not hasattr(stream, "read"): 644 for item in stream: 645 if item: 646 yield item 647 return 648 stream = t.cast(t.IO[bytes], stream) 649 if not isinstance(stream, LimitedStream) and limit is not None: 650 stream = t.cast(t.IO[bytes], LimitedStream(stream, limit)) 651 _read = stream.read 652 while True: 653 item = _read(buffer_size) 654 if not item: 655 break 656 yield item 657 658 659def make_line_iter( 660 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 661 limit: t.Optional[int] = None, 662 buffer_size: int = 10 * 1024, 663 cap_at_buffer: bool = False, 664) -> t.Iterator[bytes]: 665 """Safely iterates line-based over an input stream. If the input stream 666 is not a :class:`LimitedStream` the `limit` parameter is mandatory. 667 668 This uses the stream's :meth:`~file.read` method internally as opposite 669 to the :meth:`~file.readline` method that is unsafe and can only be used 670 in violation of the WSGI specification. The same problem applies to the 671 `__iter__` function of the input stream which calls :meth:`~file.readline` 672 without arguments. 673 674 If you need line-by-line processing it's strongly recommended to iterate 675 over the input stream using this helper function. 676 677 .. versionchanged:: 0.8 678 This function now ensures that the limit was reached. 679 680 .. versionadded:: 0.9 681 added support for iterators as input stream. 682 683 .. versionadded:: 0.11.10 684 added support for the `cap_at_buffer` parameter. 685 686 :param stream: the stream or iterate to iterate over. 687 :param limit: the limit in bytes for the stream. (Usually 688 content length. Not necessary if the `stream` 689 is a :class:`LimitedStream`. 690 :param buffer_size: The optional buffer size. 691 :param cap_at_buffer: if this is set chunks are split if they are longer 692 than the buffer size. Internally this is implemented 693 that the buffer size might be exhausted by a factor 694 of two however. 695 """ 696 _iter = _make_chunk_iter(stream, limit, buffer_size) 697 698 first_item = next(_iter, "") 699 if not first_item: 700 return 701 702 s = _make_encode_wrapper(first_item) 703 empty = t.cast(bytes, s("")) 704 cr = t.cast(bytes, s("\r")) 705 lf = t.cast(bytes, s("\n")) 706 crlf = t.cast(bytes, s("\r\n")) 707 708 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 709 710 def _iter_basic_lines() -> t.Iterator[bytes]: 711 _join = empty.join 712 buffer: t.List[bytes] = [] 713 while True: 714 new_data = next(_iter, "") 715 if not new_data: 716 break 717 new_buf: t.List[bytes] = [] 718 buf_size = 0 719 for item in t.cast( 720 t.Iterator[bytes], chain(buffer, new_data.splitlines(True)) 721 ): 722 new_buf.append(item) 723 buf_size += len(item) 724 if item and item[-1:] in crlf: 725 yield _join(new_buf) 726 new_buf = [] 727 elif cap_at_buffer and buf_size >= buffer_size: 728 rv = _join(new_buf) 729 while len(rv) >= buffer_size: 730 yield rv[:buffer_size] 731 rv = rv[buffer_size:] 732 new_buf = [rv] 733 buffer = new_buf 734 if buffer: 735 yield _join(buffer) 736 737 # This hackery is necessary to merge 'foo\r' and '\n' into one item 738 # of 'foo\r\n' if we were unlucky and we hit a chunk boundary. 739 previous = empty 740 for item in _iter_basic_lines(): 741 if item == lf and previous[-1:] == cr: 742 previous += item 743 item = empty 744 if previous: 745 yield previous 746 previous = item 747 if previous: 748 yield previous 749 750 751def make_chunk_iter( 752 stream: t.Union[t.Iterable[bytes], t.IO[bytes]], 753 separator: bytes, 754 limit: t.Optional[int] = None, 755 buffer_size: int = 10 * 1024, 756 cap_at_buffer: bool = False, 757) -> t.Iterator[bytes]: 758 """Works like :func:`make_line_iter` but accepts a separator 759 which divides chunks. If you want newline based processing 760 you should use :func:`make_line_iter` instead as it 761 supports arbitrary newline markers. 762 763 .. versionadded:: 0.8 764 765 .. versionadded:: 0.9 766 added support for iterators as input stream. 767 768 .. versionadded:: 0.11.10 769 added support for the `cap_at_buffer` parameter. 770 771 :param stream: the stream or iterate to iterate over. 772 :param separator: the separator that divides chunks. 773 :param limit: the limit in bytes for the stream. (Usually 774 content length. Not necessary if the `stream` 775 is otherwise already limited). 776 :param buffer_size: The optional buffer size. 777 :param cap_at_buffer: if this is set chunks are split if they are longer 778 than the buffer size. Internally this is implemented 779 that the buffer size might be exhausted by a factor 780 of two however. 781 """ 782 _iter = _make_chunk_iter(stream, limit, buffer_size) 783 784 first_item = next(_iter, b"") 785 if not first_item: 786 return 787 788 _iter = t.cast(t.Iterator[bytes], chain((first_item,), _iter)) 789 if isinstance(first_item, str): 790 separator = _to_str(separator) 791 _split = re.compile(f"({re.escape(separator)})").split 792 _join = "".join 793 else: 794 separator = _to_bytes(separator) 795 _split = re.compile(b"(" + re.escape(separator) + b")").split 796 _join = b"".join 797 798 buffer: t.List[bytes] = [] 799 while True: 800 new_data = next(_iter, b"") 801 if not new_data: 802 break 803 chunks = _split(new_data) 804 new_buf: t.List[bytes] = [] 805 buf_size = 0 806 for item in chain(buffer, chunks): 807 if item == separator: 808 yield _join(new_buf) 809 new_buf = [] 810 buf_size = 0 811 else: 812 buf_size += len(item) 813 new_buf.append(item) 814 815 if cap_at_buffer and buf_size >= buffer_size: 816 rv = _join(new_buf) 817 while len(rv) >= buffer_size: 818 yield rv[:buffer_size] 819 rv = rv[buffer_size:] 820 new_buf = [rv] 821 buf_size = len(rv) 822 823 buffer = new_buf 824 if buffer: 825 yield _join(buffer) 826 827 828class LimitedStream(io.IOBase): 829 """Wraps a stream so that it doesn't read more than n bytes. If the 830 stream is exhausted and the caller tries to get more bytes from it 831 :func:`on_exhausted` is called which by default returns an empty 832 string. The return value of that function is forwarded 833 to the reader function. So if it returns an empty string 834 :meth:`read` will return an empty string as well. 835 836 The limit however must never be higher than what the stream can 837 output. Otherwise :meth:`readlines` will try to read past the 838 limit. 839 840 .. admonition:: Note on WSGI compliance 841 842 calls to :meth:`readline` and :meth:`readlines` are not 843 WSGI compliant because it passes a size argument to the 844 readline methods. Unfortunately the WSGI PEP is not safely 845 implementable without a size argument to :meth:`readline` 846 because there is no EOF marker in the stream. As a result 847 of that the use of :meth:`readline` is discouraged. 848 849 For the same reason iterating over the :class:`LimitedStream` 850 is not portable. It internally calls :meth:`readline`. 851 852 We strongly suggest using :meth:`read` only or using the 853 :func:`make_line_iter` which safely iterates line-based 854 over a WSGI input stream. 855 856 :param stream: the stream to wrap. 857 :param limit: the limit for the stream, must not be longer than 858 what the string can provide if the stream does not 859 end with `EOF` (like `wsgi.input`) 860 """ 861 862 def __init__(self, stream: t.IO[bytes], limit: int) -> None: 863 self._read = stream.read 864 self._readline = stream.readline 865 self._pos = 0 866 self.limit = limit 867 868 def __iter__(self) -> "LimitedStream": 869 return self 870 871 @property 872 def is_exhausted(self) -> bool: 873 """If the stream is exhausted this attribute is `True`.""" 874 return self._pos >= self.limit 875 876 def on_exhausted(self) -> bytes: 877 """This is called when the stream tries to read past the limit. 878 The return value of this function is returned from the reading 879 function. 880 """ 881 # Read null bytes from the stream so that we get the 882 # correct end of stream marker. 883 return self._read(0) 884 885 def on_disconnect(self) -> bytes: 886 """What should happen if a disconnect is detected? The return 887 value of this function is returned from read functions in case 888 the client went away. By default a 889 :exc:`~werkzeug.exceptions.ClientDisconnected` exception is raised. 890 """ 891 from .exceptions import ClientDisconnected 892 893 raise ClientDisconnected() 894 895 def exhaust(self, chunk_size: int = 1024 * 64) -> None: 896 """Exhaust the stream. This consumes all the data left until the 897 limit is reached. 898 899 :param chunk_size: the size for a chunk. It will read the chunk 900 until the stream is exhausted and throw away 901 the results. 902 """ 903 to_read = self.limit - self._pos 904 chunk = chunk_size 905 while to_read > 0: 906 chunk = min(to_read, chunk) 907 self.read(chunk) 908 to_read -= chunk 909 910 def read(self, size: t.Optional[int] = None) -> bytes: 911 """Read `size` bytes or if size is not provided everything is read. 912 913 :param size: the number of bytes read. 914 """ 915 if self._pos >= self.limit: 916 return self.on_exhausted() 917 if size is None or size == -1: # -1 is for consistence with file 918 size = self.limit 919 to_read = min(self.limit - self._pos, size) 920 try: 921 read = self._read(to_read) 922 except (OSError, ValueError): 923 return self.on_disconnect() 924 if to_read and len(read) != to_read: 925 return self.on_disconnect() 926 self._pos += len(read) 927 return read 928 929 def readline(self, size: t.Optional[int] = None) -> bytes: 930 """Reads one line from the stream.""" 931 if self._pos >= self.limit: 932 return self.on_exhausted() 933 if size is None: 934 size = self.limit - self._pos 935 else: 936 size = min(size, self.limit - self._pos) 937 try: 938 line = self._readline(size) 939 except (ValueError, OSError): 940 return self.on_disconnect() 941 if size and not line: 942 return self.on_disconnect() 943 self._pos += len(line) 944 return line 945 946 def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]: 947 """Reads a file into a list of strings. It calls :meth:`readline` 948 until the file is read to the end. It does support the optional 949 `size` argument if the underlying stream supports it for 950 `readline`. 951 """ 952 last_pos = self._pos 953 result = [] 954 if size is not None: 955 end = min(self.limit, last_pos + size) 956 else: 957 end = self.limit 958 while True: 959 if size is not None: 960 size -= last_pos - self._pos 961 if self._pos >= end: 962 break 963 result.append(self.readline(size)) 964 if size is not None: 965 last_pos = self._pos 966 return result 967 968 def tell(self) -> int: 969 """Returns the position of the stream. 970 971 .. versionadded:: 0.9 972 """ 973 return self._pos 974 975 def __next__(self) -> bytes: 976 line = self.readline() 977 if not line: 978 raise StopIteration() 979 return line 980 981 def readable(self) -> bool: 982 return True 983