1import inspect
2import logging
3import operator
4import re
5import string
6import sys
7import typing
8import typing as t
9from datetime import date
10from datetime import datetime
11from datetime import timezone
12from itertools import chain
13from weakref import WeakKeyDictionary
14
15if t.TYPE_CHECKING:
16    from _typeshed.wsgi import StartResponse
17    from _typeshed.wsgi import WSGIApplication
18    from _typeshed.wsgi import WSGIEnvironment
19    from .wrappers.request import Request  # noqa: F401
20
21_logger: t.Optional[logging.Logger] = None
22_signature_cache = WeakKeyDictionary()  # type: ignore
23_epoch_ord = date(1970, 1, 1).toordinal()
24_legal_cookie_chars = frozenset(
25    c.encode("ascii")
26    for c in f"{string.ascii_letters}{string.digits}/=!#$%&'*+-.^_`|~:"
27)
28
29_cookie_quoting_map = {b",": b"\\054", b";": b"\\073", b'"': b'\\"', b"\\": b"\\\\"}
30for _i in chain(range(32), range(127, 256)):
31    _cookie_quoting_map[_i.to_bytes(1, sys.byteorder)] = f"\\{_i:03o}".encode("latin1")
32
33_octal_re = re.compile(br"\\[0-3][0-7][0-7]")
34_quote_re = re.compile(br"[\\].")
35_legal_cookie_chars_re = br"[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]"
36_cookie_re = re.compile(
37    br"""
38    (?P<key>[^=;]+)
39    (?:\s*=\s*
40        (?P<val>
41            "(?:[^\\"]|\\.)*" |
42             (?:.*?)
43        )
44    )?
45    \s*;
46""",
47    flags=re.VERBOSE,
48)
49
50
51class _Missing:
52    def __repr__(self) -> str:
53        return "no value"
54
55    def __reduce__(self) -> str:
56        return "_missing"
57
58
59_missing = _Missing()
60
61
62@typing.overload
63def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]:
64    ...
65
66
67@typing.overload
68def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]:
69    ...
70
71
72def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]:
73    """Create a function that will be called with a string argument. If
74    the reference is bytes, values will be encoded to bytes.
75    """
76    if isinstance(reference, str):
77        return lambda x: x
78
79    return operator.methodcaller("encode", "latin1")
80
81
82def _check_str_tuple(value: t.Tuple[t.AnyStr, ...]) -> None:
83    """Ensure tuple items are all strings or all bytes."""
84    if not value:
85        return
86
87    item_type = str if isinstance(value[0], str) else bytes
88
89    if any(not isinstance(item, item_type) for item in value):
90        raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})")
91
92
93_default_encoding = sys.getdefaultencoding()
94
95
96def _to_bytes(
97    x: t.Union[str, bytes], charset: str = _default_encoding, errors: str = "strict"
98) -> bytes:
99    if x is None or isinstance(x, bytes):
100        return x
101
102    if isinstance(x, (bytearray, memoryview)):
103        return bytes(x)
104
105    if isinstance(x, str):
106        return x.encode(charset, errors)
107
108    raise TypeError("Expected bytes")
109
110
111@typing.overload
112def _to_str(  # type: ignore
113    x: None,
114    charset: t.Optional[str] = ...,
115    errors: str = ...,
116    allow_none_charset: bool = ...,
117) -> None:
118    ...
119
120
121@typing.overload
122def _to_str(
123    x: t.Any,
124    charset: t.Optional[str] = ...,
125    errors: str = ...,
126    allow_none_charset: bool = ...,
127) -> str:
128    ...
129
130
131def _to_str(
132    x: t.Optional[t.Any],
133    charset: t.Optional[str] = _default_encoding,
134    errors: str = "strict",
135    allow_none_charset: bool = False,
136) -> t.Optional[t.Union[str, bytes]]:
137    if x is None or isinstance(x, str):
138        return x
139
140    if not isinstance(x, (bytes, bytearray)):
141        return str(x)
142
143    if charset is None:
144        if allow_none_charset:
145            return x
146
147    return x.decode(charset, errors)  # type: ignore
148
149
150def _wsgi_decoding_dance(
151    s: str, charset: str = "utf-8", errors: str = "replace"
152) -> str:
153    return s.encode("latin1").decode(charset, errors)
154
155
156def _wsgi_encoding_dance(
157    s: str, charset: str = "utf-8", errors: str = "replace"
158) -> str:
159    if isinstance(s, bytes):
160        return s.decode("latin1", errors)
161
162    return s.encode(charset).decode("latin1", errors)
163
164
165def _get_environ(obj: t.Union["WSGIEnvironment", "Request"]) -> "WSGIEnvironment":
166    env = getattr(obj, "environ", obj)
167    assert isinstance(
168        env, dict
169    ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
170    return env
171
172
173def _has_level_handler(logger: logging.Logger) -> bool:
174    """Check if there is a handler in the logging chain that will handle
175    the given logger's effective level.
176    """
177    level = logger.getEffectiveLevel()
178    current = logger
179
180    while current:
181        if any(handler.level <= level for handler in current.handlers):
182            return True
183
184        if not current.propagate:
185            break
186
187        current = current.parent  # type: ignore
188
189    return False
190
191
192class _ColorStreamHandler(logging.StreamHandler):
193    """On Windows, wrap stream with Colorama for ANSI style support."""
194
195    def __init__(self) -> None:
196        try:
197            import colorama
198        except ImportError:
199            stream = None
200        else:
201            stream = colorama.AnsiToWin32(sys.stderr)
202
203        super().__init__(stream)
204
205
206def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
207    """Log a message to the 'werkzeug' logger.
208
209    The logger is created the first time it is needed. If there is no
210    level set, it is set to :data:`logging.INFO`. If there is no handler
211    for the logger's effective level, a :class:`logging.StreamHandler`
212    is added.
213    """
214    global _logger
215
216    if _logger is None:
217        _logger = logging.getLogger("werkzeug")
218
219        if _logger.level == logging.NOTSET:
220            _logger.setLevel(logging.INFO)
221
222        if not _has_level_handler(_logger):
223            _logger.addHandler(_ColorStreamHandler())
224
225    getattr(_logger, type)(message.rstrip(), *args, **kwargs)
226
227
228def _parse_signature(func):  # type: ignore
229    """Return a signature object for the function.
230
231    .. deprecated:: 2.0
232        Will be removed in Werkzeug 2.1 along with ``utils.bind`` and
233        ``validate_arguments``.
234    """
235    # if we have a cached validator for this function, return it
236    parse = _signature_cache.get(func)
237    if parse is not None:
238        return parse
239
240    # inspect the function signature and collect all the information
241    tup = inspect.getfullargspec(func)
242    positional, vararg_var, kwarg_var, defaults = tup[:4]
243    defaults = defaults or ()
244    arg_count = len(positional)
245    arguments = []
246    for idx, name in enumerate(positional):
247        if isinstance(name, list):
248            raise TypeError(
249                "cannot parse functions that unpack tuples in the function signature"
250            )
251        try:
252            default = defaults[idx - arg_count]
253        except IndexError:
254            param = (name, False, None)
255        else:
256            param = (name, True, default)
257        arguments.append(param)
258    arguments = tuple(arguments)
259
260    def parse(args, kwargs):  # type: ignore
261        new_args = []
262        missing = []
263        extra = {}
264
265        # consume as many arguments as positional as possible
266        for idx, (name, has_default, default) in enumerate(arguments):
267            try:
268                new_args.append(args[idx])
269            except IndexError:
270                try:
271                    new_args.append(kwargs.pop(name))
272                except KeyError:
273                    if has_default:
274                        new_args.append(default)
275                    else:
276                        missing.append(name)
277            else:
278                if name in kwargs:
279                    extra[name] = kwargs.pop(name)
280
281        # handle extra arguments
282        extra_positional = args[arg_count:]
283        if vararg_var is not None:
284            new_args.extend(extra_positional)
285            extra_positional = ()
286        if kwargs and kwarg_var is None:
287            extra.update(kwargs)
288            kwargs = {}
289
290        return (
291            new_args,
292            kwargs,
293            missing,
294            extra,
295            extra_positional,
296            arguments,
297            vararg_var,
298            kwarg_var,
299        )
300
301    _signature_cache[func] = parse
302    return parse
303
304
305@typing.overload
306def _dt_as_utc(dt: None) -> None:
307    ...
308
309
310@typing.overload
311def _dt_as_utc(dt: datetime) -> datetime:
312    ...
313
314
315def _dt_as_utc(dt: t.Optional[datetime]) -> t.Optional[datetime]:
316    if dt is None:
317        return dt
318
319    if dt.tzinfo is None:
320        return dt.replace(tzinfo=timezone.utc)
321    elif dt.tzinfo != timezone.utc:
322        return dt.astimezone(timezone.utc)
323
324    return dt
325
326
327_TAccessorValue = t.TypeVar("_TAccessorValue")
328
329
330class _DictAccessorProperty(t.Generic[_TAccessorValue]):
331    """Baseclass for `environ_property` and `header_property`."""
332
333    read_only = False
334
335    def __init__(
336        self,
337        name: str,
338        default: t.Optional[_TAccessorValue] = None,
339        load_func: t.Optional[t.Callable[[str], _TAccessorValue]] = None,
340        dump_func: t.Optional[t.Callable[[_TAccessorValue], str]] = None,
341        read_only: t.Optional[bool] = None,
342        doc: t.Optional[str] = None,
343    ) -> None:
344        self.name = name
345        self.default = default
346        self.load_func = load_func
347        self.dump_func = dump_func
348        if read_only is not None:
349            self.read_only = read_only
350        self.__doc__ = doc
351
352    def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
353        raise NotImplementedError
354
355    @typing.overload
356    def __get__(
357        self, instance: None, owner: type
358    ) -> "_DictAccessorProperty[_TAccessorValue]":
359        ...
360
361    @typing.overload
362    def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
363        ...
364
365    def __get__(
366        self, instance: t.Optional[t.Any], owner: type
367    ) -> t.Union[_TAccessorValue, "_DictAccessorProperty[_TAccessorValue]"]:
368        if instance is None:
369            return self
370
371        storage = self.lookup(instance)
372
373        if self.name not in storage:
374            return self.default  # type: ignore
375
376        value = storage[self.name]
377
378        if self.load_func is not None:
379            try:
380                return self.load_func(value)
381            except (ValueError, TypeError):
382                return self.default  # type: ignore
383
384        return value  # type: ignore
385
386    def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
387        if self.read_only:
388            raise AttributeError("read only property")
389
390        if self.dump_func is not None:
391            self.lookup(instance)[self.name] = self.dump_func(value)
392        else:
393            self.lookup(instance)[self.name] = value
394
395    def __delete__(self, instance: t.Any) -> None:
396        if self.read_only:
397            raise AttributeError("read only property")
398
399        self.lookup(instance).pop(self.name, None)
400
401    def __repr__(self) -> str:
402        return f"<{type(self).__name__} {self.name}>"
403
404
405def _cookie_quote(b: bytes) -> bytes:
406    buf = bytearray()
407    all_legal = True
408    _lookup = _cookie_quoting_map.get
409    _push = buf.extend
410
411    for char_int in b:
412        char = char_int.to_bytes(1, sys.byteorder)
413        if char not in _legal_cookie_chars:
414            all_legal = False
415            char = _lookup(char, char)
416        _push(char)
417
418    if all_legal:
419        return bytes(buf)
420    return bytes(b'"' + buf + b'"')
421
422
423def _cookie_unquote(b: bytes) -> bytes:
424    if len(b) < 2:
425        return b
426    if b[:1] != b'"' or b[-1:] != b'"':
427        return b
428
429    b = b[1:-1]
430
431    i = 0
432    n = len(b)
433    rv = bytearray()
434    _push = rv.extend
435
436    while 0 <= i < n:
437        o_match = _octal_re.search(b, i)
438        q_match = _quote_re.search(b, i)
439        if not o_match and not q_match:
440            rv.extend(b[i:])
441            break
442        j = k = -1
443        if o_match:
444            j = o_match.start(0)
445        if q_match:
446            k = q_match.start(0)
447        if q_match and (not o_match or k < j):
448            _push(b[i:k])
449            _push(b[k + 1 : k + 2])
450            i = k + 2
451        else:
452            _push(b[i:j])
453            rv.append(int(b[j + 1 : j + 4], 8))
454            i = j + 4
455
456    return bytes(rv)
457
458
459def _cookie_parse_impl(b: bytes) -> t.Iterator[t.Tuple[bytes, bytes]]:
460    """Lowlevel cookie parsing facility that operates on bytes."""
461    i = 0
462    n = len(b)
463
464    while i < n:
465        match = _cookie_re.search(b + b";", i)
466        if not match:
467            break
468
469        key = match.group("key").strip()
470        value = match.group("val") or b""
471        i = match.end(0)
472
473        yield key, _cookie_unquote(value)
474
475
476def _encode_idna(domain: str) -> bytes:
477    # If we're given bytes, make sure they fit into ASCII
478    if isinstance(domain, bytes):
479        domain.decode("ascii")
480        return domain
481
482    # Otherwise check if it's already ascii, then return
483    try:
484        return domain.encode("ascii")
485    except UnicodeError:
486        pass
487
488    # Otherwise encode each part separately
489    return b".".join(p.encode("idna") for p in domain.split("."))
490
491
492def _decode_idna(domain: t.Union[str, bytes]) -> str:
493    # If the input is a string try to encode it to ascii to do the idna
494    # decoding. If that fails because of a unicode error, then we
495    # already have a decoded idna domain.
496    if isinstance(domain, str):
497        try:
498            domain = domain.encode("ascii")
499        except UnicodeError:
500            return domain  # type: ignore
501
502    # Decode each part separately. If a part fails, try to decode it
503    # with ascii and silently ignore errors. This makes sense because
504    # the idna codec does not have error handling.
505    def decode_part(part: bytes) -> str:
506        try:
507            return part.decode("idna")
508        except UnicodeError:
509            return part.decode("ascii", "ignore")
510
511    return ".".join(decode_part(p) for p in domain.split(b"."))
512
513
514@typing.overload
515def _make_cookie_domain(domain: None) -> None:
516    ...
517
518
519@typing.overload
520def _make_cookie_domain(domain: str) -> bytes:
521    ...
522
523
524def _make_cookie_domain(domain: t.Optional[str]) -> t.Optional[bytes]:
525    if domain is None:
526        return None
527    domain = _encode_idna(domain)
528    if b":" in domain:
529        domain = domain.split(b":", 1)[0]
530    if b"." in domain:
531        return domain
532    raise ValueError(
533        "Setting 'domain' for a cookie on a server running locally (ex: "
534        "localhost) is not supported by complying browsers. You should "
535        "have something like: '127.0.0.1 localhost dev.localhost' on "
536        "your hosts file and then point your server to run on "
537        "'dev.localhost' and also set 'domain' for 'dev.localhost'"
538    )
539
540
541def _easteregg(app: t.Optional["WSGIApplication"] = None) -> "WSGIApplication":
542    """Like the name says.  But who knows how it works?"""
543
544    def bzzzzzzz(gyver: bytes) -> str:
545        import base64
546        import zlib
547
548        return zlib.decompress(base64.b64decode(gyver)).decode("ascii")
549
550    gyver = "\n".join(
551        [
552            x + (77 - len(x)) * " "
553            for x in bzzzzzzz(
554                b"""
555eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m
5569Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz
5574QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY
558jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc
559q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5
560jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317
5618P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ
562v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r
563XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu
564LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk
565iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI
566tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE
5671R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI
568GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN
569Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A
570QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG
5718MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu
572jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz
573DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8
574MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4
575GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i
576RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi
577Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c
578NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS
579pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ
580sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm
581p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ
582krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/
583nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt
584mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p
5857f2zLkGNv8b191cD/3vs9Q833z8t"""
586            ).splitlines()
587        ]
588    )
589
590    def easteregged(
591        environ: "WSGIEnvironment", start_response: "StartResponse"
592    ) -> t.Iterable[bytes]:
593        def injecting_start_response(
594            status: str, headers: t.List[t.Tuple[str, str]], exc_info: t.Any = None
595        ) -> t.Callable[[bytes], t.Any]:
596            headers.append(("X-Powered-By", "Werkzeug"))
597            return start_response(status, headers, exc_info)
598
599        if app is not None and environ.get("QUERY_STRING") != "macgybarchakku":
600            return app(environ, injecting_start_response)
601        injecting_start_response("200 OK", [("Content-Type", "text/html")])
602        return [
603            f"""\
604<!DOCTYPE html>
605<html>
606<head>
607<title>About Werkzeug</title>
608<style type="text/css">
609  body {{ font: 15px Georgia, serif; text-align: center; }}
610  a {{ color: #333; text-decoration: none; }}
611  h1 {{ font-size: 30px; margin: 20px 0 10px 0; }}
612  p {{ margin: 0 0 30px 0; }}
613  pre {{ font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }}
614</style>
615</head>
616<body>
617<h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1>
618<p>the Swiss Army knife of Python web development.</p>
619<pre>{gyver}\n\n\n</pre>
620</body>
621</html>""".encode(
622                "latin1"
623            )
624        ]
625
626    return easteregged
627