1import inspect 2import logging 3import operator 4import re 5import string 6import sys 7import typing 8import typing as t 9from datetime import date 10from datetime import datetime 11from datetime import timezone 12from itertools import chain 13from weakref import WeakKeyDictionary 14 15if t.TYPE_CHECKING: 16 from _typeshed.wsgi import StartResponse 17 from _typeshed.wsgi import WSGIApplication 18 from _typeshed.wsgi import WSGIEnvironment 19 from .wrappers.request import Request # noqa: F401 20 21_logger: t.Optional[logging.Logger] = None 22_signature_cache = WeakKeyDictionary() # type: ignore 23_epoch_ord = date(1970, 1, 1).toordinal() 24_legal_cookie_chars = frozenset( 25 c.encode("ascii") 26 for c in f"{string.ascii_letters}{string.digits}/=!#$%&'*+-.^_`|~:" 27) 28 29_cookie_quoting_map = {b",": b"\\054", b";": b"\\073", b'"': b'\\"', b"\\": b"\\\\"} 30for _i in chain(range(32), range(127, 256)): 31 _cookie_quoting_map[_i.to_bytes(1, sys.byteorder)] = f"\\{_i:03o}".encode("latin1") 32 33_octal_re = re.compile(br"\\[0-3][0-7][0-7]") 34_quote_re = re.compile(br"[\\].") 35_legal_cookie_chars_re = br"[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]" 36_cookie_re = re.compile( 37 br""" 38 (?P<key>[^=;]+) 39 (?:\s*=\s* 40 (?P<val> 41 "(?:[^\\"]|\\.)*" | 42 (?:.*?) 43 ) 44 )? 45 \s*; 46""", 47 flags=re.VERBOSE, 48) 49 50 51class _Missing: 52 def __repr__(self) -> str: 53 return "no value" 54 55 def __reduce__(self) -> str: 56 return "_missing" 57 58 59_missing = _Missing() 60 61 62@typing.overload 63def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]: 64 ... 65 66 67@typing.overload 68def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]: 69 ... 70 71 72def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]: 73 """Create a function that will be called with a string argument. If 74 the reference is bytes, values will be encoded to bytes. 75 """ 76 if isinstance(reference, str): 77 return lambda x: x 78 79 return operator.methodcaller("encode", "latin1") 80 81 82def _check_str_tuple(value: t.Tuple[t.AnyStr, ...]) -> None: 83 """Ensure tuple items are all strings or all bytes.""" 84 if not value: 85 return 86 87 item_type = str if isinstance(value[0], str) else bytes 88 89 if any(not isinstance(item, item_type) for item in value): 90 raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})") 91 92 93_default_encoding = sys.getdefaultencoding() 94 95 96def _to_bytes( 97 x: t.Union[str, bytes], charset: str = _default_encoding, errors: str = "strict" 98) -> bytes: 99 if x is None or isinstance(x, bytes): 100 return x 101 102 if isinstance(x, (bytearray, memoryview)): 103 return bytes(x) 104 105 if isinstance(x, str): 106 return x.encode(charset, errors) 107 108 raise TypeError("Expected bytes") 109 110 111@typing.overload 112def _to_str( # type: ignore 113 x: None, 114 charset: t.Optional[str] = ..., 115 errors: str = ..., 116 allow_none_charset: bool = ..., 117) -> None: 118 ... 119 120 121@typing.overload 122def _to_str( 123 x: t.Any, 124 charset: t.Optional[str] = ..., 125 errors: str = ..., 126 allow_none_charset: bool = ..., 127) -> str: 128 ... 129 130 131def _to_str( 132 x: t.Optional[t.Any], 133 charset: t.Optional[str] = _default_encoding, 134 errors: str = "strict", 135 allow_none_charset: bool = False, 136) -> t.Optional[t.Union[str, bytes]]: 137 if x is None or isinstance(x, str): 138 return x 139 140 if not isinstance(x, (bytes, bytearray)): 141 return str(x) 142 143 if charset is None: 144 if allow_none_charset: 145 return x 146 147 return x.decode(charset, errors) # type: ignore 148 149 150def _wsgi_decoding_dance( 151 s: str, charset: str = "utf-8", errors: str = "replace" 152) -> str: 153 return s.encode("latin1").decode(charset, errors) 154 155 156def _wsgi_encoding_dance( 157 s: str, charset: str = "utf-8", errors: str = "replace" 158) -> str: 159 if isinstance(s, bytes): 160 return s.decode("latin1", errors) 161 162 return s.encode(charset).decode("latin1", errors) 163 164 165def _get_environ(obj: t.Union["WSGIEnvironment", "Request"]) -> "WSGIEnvironment": 166 env = getattr(obj, "environ", obj) 167 assert isinstance( 168 env, dict 169 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)" 170 return env 171 172 173def _has_level_handler(logger: logging.Logger) -> bool: 174 """Check if there is a handler in the logging chain that will handle 175 the given logger's effective level. 176 """ 177 level = logger.getEffectiveLevel() 178 current = logger 179 180 while current: 181 if any(handler.level <= level for handler in current.handlers): 182 return True 183 184 if not current.propagate: 185 break 186 187 current = current.parent # type: ignore 188 189 return False 190 191 192class _ColorStreamHandler(logging.StreamHandler): 193 """On Windows, wrap stream with Colorama for ANSI style support.""" 194 195 def __init__(self) -> None: 196 try: 197 import colorama 198 except ImportError: 199 stream = None 200 else: 201 stream = colorama.AnsiToWin32(sys.stderr) 202 203 super().__init__(stream) 204 205 206def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None: 207 """Log a message to the 'werkzeug' logger. 208 209 The logger is created the first time it is needed. If there is no 210 level set, it is set to :data:`logging.INFO`. If there is no handler 211 for the logger's effective level, a :class:`logging.StreamHandler` 212 is added. 213 """ 214 global _logger 215 216 if _logger is None: 217 _logger = logging.getLogger("werkzeug") 218 219 if _logger.level == logging.NOTSET: 220 _logger.setLevel(logging.INFO) 221 222 if not _has_level_handler(_logger): 223 _logger.addHandler(_ColorStreamHandler()) 224 225 getattr(_logger, type)(message.rstrip(), *args, **kwargs) 226 227 228def _parse_signature(func): # type: ignore 229 """Return a signature object for the function. 230 231 .. deprecated:: 2.0 232 Will be removed in Werkzeug 2.1 along with ``utils.bind`` and 233 ``validate_arguments``. 234 """ 235 # if we have a cached validator for this function, return it 236 parse = _signature_cache.get(func) 237 if parse is not None: 238 return parse 239 240 # inspect the function signature and collect all the information 241 tup = inspect.getfullargspec(func) 242 positional, vararg_var, kwarg_var, defaults = tup[:4] 243 defaults = defaults or () 244 arg_count = len(positional) 245 arguments = [] 246 for idx, name in enumerate(positional): 247 if isinstance(name, list): 248 raise TypeError( 249 "cannot parse functions that unpack tuples in the function signature" 250 ) 251 try: 252 default = defaults[idx - arg_count] 253 except IndexError: 254 param = (name, False, None) 255 else: 256 param = (name, True, default) 257 arguments.append(param) 258 arguments = tuple(arguments) 259 260 def parse(args, kwargs): # type: ignore 261 new_args = [] 262 missing = [] 263 extra = {} 264 265 # consume as many arguments as positional as possible 266 for idx, (name, has_default, default) in enumerate(arguments): 267 try: 268 new_args.append(args[idx]) 269 except IndexError: 270 try: 271 new_args.append(kwargs.pop(name)) 272 except KeyError: 273 if has_default: 274 new_args.append(default) 275 else: 276 missing.append(name) 277 else: 278 if name in kwargs: 279 extra[name] = kwargs.pop(name) 280 281 # handle extra arguments 282 extra_positional = args[arg_count:] 283 if vararg_var is not None: 284 new_args.extend(extra_positional) 285 extra_positional = () 286 if kwargs and kwarg_var is None: 287 extra.update(kwargs) 288 kwargs = {} 289 290 return ( 291 new_args, 292 kwargs, 293 missing, 294 extra, 295 extra_positional, 296 arguments, 297 vararg_var, 298 kwarg_var, 299 ) 300 301 _signature_cache[func] = parse 302 return parse 303 304 305@typing.overload 306def _dt_as_utc(dt: None) -> None: 307 ... 308 309 310@typing.overload 311def _dt_as_utc(dt: datetime) -> datetime: 312 ... 313 314 315def _dt_as_utc(dt: t.Optional[datetime]) -> t.Optional[datetime]: 316 if dt is None: 317 return dt 318 319 if dt.tzinfo is None: 320 return dt.replace(tzinfo=timezone.utc) 321 elif dt.tzinfo != timezone.utc: 322 return dt.astimezone(timezone.utc) 323 324 return dt 325 326 327_TAccessorValue = t.TypeVar("_TAccessorValue") 328 329 330class _DictAccessorProperty(t.Generic[_TAccessorValue]): 331 """Baseclass for `environ_property` and `header_property`.""" 332 333 read_only = False 334 335 def __init__( 336 self, 337 name: str, 338 default: t.Optional[_TAccessorValue] = None, 339 load_func: t.Optional[t.Callable[[str], _TAccessorValue]] = None, 340 dump_func: t.Optional[t.Callable[[_TAccessorValue], str]] = None, 341 read_only: t.Optional[bool] = None, 342 doc: t.Optional[str] = None, 343 ) -> None: 344 self.name = name 345 self.default = default 346 self.load_func = load_func 347 self.dump_func = dump_func 348 if read_only is not None: 349 self.read_only = read_only 350 self.__doc__ = doc 351 352 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]: 353 raise NotImplementedError 354 355 @typing.overload 356 def __get__( 357 self, instance: None, owner: type 358 ) -> "_DictAccessorProperty[_TAccessorValue]": 359 ... 360 361 @typing.overload 362 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: 363 ... 364 365 def __get__( 366 self, instance: t.Optional[t.Any], owner: type 367 ) -> t.Union[_TAccessorValue, "_DictAccessorProperty[_TAccessorValue]"]: 368 if instance is None: 369 return self 370 371 storage = self.lookup(instance) 372 373 if self.name not in storage: 374 return self.default # type: ignore 375 376 value = storage[self.name] 377 378 if self.load_func is not None: 379 try: 380 return self.load_func(value) 381 except (ValueError, TypeError): 382 return self.default # type: ignore 383 384 return value # type: ignore 385 386 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None: 387 if self.read_only: 388 raise AttributeError("read only property") 389 390 if self.dump_func is not None: 391 self.lookup(instance)[self.name] = self.dump_func(value) 392 else: 393 self.lookup(instance)[self.name] = value 394 395 def __delete__(self, instance: t.Any) -> None: 396 if self.read_only: 397 raise AttributeError("read only property") 398 399 self.lookup(instance).pop(self.name, None) 400 401 def __repr__(self) -> str: 402 return f"<{type(self).__name__} {self.name}>" 403 404 405def _cookie_quote(b: bytes) -> bytes: 406 buf = bytearray() 407 all_legal = True 408 _lookup = _cookie_quoting_map.get 409 _push = buf.extend 410 411 for char_int in b: 412 char = char_int.to_bytes(1, sys.byteorder) 413 if char not in _legal_cookie_chars: 414 all_legal = False 415 char = _lookup(char, char) 416 _push(char) 417 418 if all_legal: 419 return bytes(buf) 420 return bytes(b'"' + buf + b'"') 421 422 423def _cookie_unquote(b: bytes) -> bytes: 424 if len(b) < 2: 425 return b 426 if b[:1] != b'"' or b[-1:] != b'"': 427 return b 428 429 b = b[1:-1] 430 431 i = 0 432 n = len(b) 433 rv = bytearray() 434 _push = rv.extend 435 436 while 0 <= i < n: 437 o_match = _octal_re.search(b, i) 438 q_match = _quote_re.search(b, i) 439 if not o_match and not q_match: 440 rv.extend(b[i:]) 441 break 442 j = k = -1 443 if o_match: 444 j = o_match.start(0) 445 if q_match: 446 k = q_match.start(0) 447 if q_match and (not o_match or k < j): 448 _push(b[i:k]) 449 _push(b[k + 1 : k + 2]) 450 i = k + 2 451 else: 452 _push(b[i:j]) 453 rv.append(int(b[j + 1 : j + 4], 8)) 454 i = j + 4 455 456 return bytes(rv) 457 458 459def _cookie_parse_impl(b: bytes) -> t.Iterator[t.Tuple[bytes, bytes]]: 460 """Lowlevel cookie parsing facility that operates on bytes.""" 461 i = 0 462 n = len(b) 463 464 while i < n: 465 match = _cookie_re.search(b + b";", i) 466 if not match: 467 break 468 469 key = match.group("key").strip() 470 value = match.group("val") or b"" 471 i = match.end(0) 472 473 yield key, _cookie_unquote(value) 474 475 476def _encode_idna(domain: str) -> bytes: 477 # If we're given bytes, make sure they fit into ASCII 478 if isinstance(domain, bytes): 479 domain.decode("ascii") 480 return domain 481 482 # Otherwise check if it's already ascii, then return 483 try: 484 return domain.encode("ascii") 485 except UnicodeError: 486 pass 487 488 # Otherwise encode each part separately 489 return b".".join(p.encode("idna") for p in domain.split(".")) 490 491 492def _decode_idna(domain: t.Union[str, bytes]) -> str: 493 # If the input is a string try to encode it to ascii to do the idna 494 # decoding. If that fails because of a unicode error, then we 495 # already have a decoded idna domain. 496 if isinstance(domain, str): 497 try: 498 domain = domain.encode("ascii") 499 except UnicodeError: 500 return domain # type: ignore 501 502 # Decode each part separately. If a part fails, try to decode it 503 # with ascii and silently ignore errors. This makes sense because 504 # the idna codec does not have error handling. 505 def decode_part(part: bytes) -> str: 506 try: 507 return part.decode("idna") 508 except UnicodeError: 509 return part.decode("ascii", "ignore") 510 511 return ".".join(decode_part(p) for p in domain.split(b".")) 512 513 514@typing.overload 515def _make_cookie_domain(domain: None) -> None: 516 ... 517 518 519@typing.overload 520def _make_cookie_domain(domain: str) -> bytes: 521 ... 522 523 524def _make_cookie_domain(domain: t.Optional[str]) -> t.Optional[bytes]: 525 if domain is None: 526 return None 527 domain = _encode_idna(domain) 528 if b":" in domain: 529 domain = domain.split(b":", 1)[0] 530 if b"." in domain: 531 return domain 532 raise ValueError( 533 "Setting 'domain' for a cookie on a server running locally (ex: " 534 "localhost) is not supported by complying browsers. You should " 535 "have something like: '127.0.0.1 localhost dev.localhost' on " 536 "your hosts file and then point your server to run on " 537 "'dev.localhost' and also set 'domain' for 'dev.localhost'" 538 ) 539 540 541def _easteregg(app: t.Optional["WSGIApplication"] = None) -> "WSGIApplication": 542 """Like the name says. But who knows how it works?""" 543 544 def bzzzzzzz(gyver: bytes) -> str: 545 import base64 546 import zlib 547 548 return zlib.decompress(base64.b64decode(gyver)).decode("ascii") 549 550 gyver = "\n".join( 551 [ 552 x + (77 - len(x)) * " " 553 for x in bzzzzzzz( 554 b""" 555eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m 5569Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz 5574QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY 558jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc 559q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5 560jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317 5618P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ 562v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r 563XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu 564LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk 565iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI 566tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE 5671R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI 568GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN 569Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A 570QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG 5718MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu 572jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz 573DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8 574MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4 575GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i 576RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi 577Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c 578NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS 579pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ 580sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm 581p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ 582krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/ 583nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt 584mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p 5857f2zLkGNv8b191cD/3vs9Q833z8t""" 586 ).splitlines() 587 ] 588 ) 589 590 def easteregged( 591 environ: "WSGIEnvironment", start_response: "StartResponse" 592 ) -> t.Iterable[bytes]: 593 def injecting_start_response( 594 status: str, headers: t.List[t.Tuple[str, str]], exc_info: t.Any = None 595 ) -> t.Callable[[bytes], t.Any]: 596 headers.append(("X-Powered-By", "Werkzeug")) 597 return start_response(status, headers, exc_info) 598 599 if app is not None and environ.get("QUERY_STRING") != "macgybarchakku": 600 return app(environ, injecting_start_response) 601 injecting_start_response("200 OK", [("Content-Type", "text/html")]) 602 return [ 603 f"""\ 604<!DOCTYPE html> 605<html> 606<head> 607<title>About Werkzeug</title> 608<style type="text/css"> 609 body {{ font: 15px Georgia, serif; text-align: center; }} 610 a {{ color: #333; text-decoration: none; }} 611 h1 {{ font-size: 30px; margin: 20px 0 10px 0; }} 612 p {{ margin: 0 0 30px 0; }} 613 pre {{ font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }} 614</style> 615</head> 616<body> 617<h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1> 618<p>the Swiss Army knife of Python web development.</p> 619<pre>{gyver}\n\n\n</pre> 620</body> 621</html>""".encode( 622 "latin1" 623 ) 624 ] 625 626 return easteregged 627