1import getpass 2import hashlib 3import json 4import mimetypes 5import os 6import pkgutil 7import re 8import sys 9import time 10import typing as t 11import uuid 12from itertools import chain 13from os.path import basename 14from os.path import join 15 16from .._internal import _log 17from ..http import parse_cookie 18from ..security import gen_salt 19from ..wrappers.request import Request 20from ..wrappers.response import Response 21from .console import Console 22from .tbtools import Frame 23from .tbtools import get_current_traceback 24from .tbtools import render_console_html 25from .tbtools import Traceback 26 27if t.TYPE_CHECKING: 28 from _typeshed.wsgi import StartResponse 29 from _typeshed.wsgi import WSGIApplication 30 from _typeshed.wsgi import WSGIEnvironment 31 32# A week 33PIN_TIME = 60 * 60 * 24 * 7 34 35 36def hash_pin(pin: str) -> str: 37 return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12] 38 39 40_machine_id: t.Optional[t.Union[str, bytes]] = None 41 42 43def get_machine_id() -> t.Optional[t.Union[str, bytes]]: 44 global _machine_id 45 46 if _machine_id is not None: 47 return _machine_id 48 49 def _generate() -> t.Optional[t.Union[str, bytes]]: 50 linux = b"" 51 52 # machine-id is stable across boots, boot_id is not. 53 for filename in "/etc/machine-id", "/proc/sys/kernel/random/boot_id": 54 try: 55 with open(filename, "rb") as f: 56 value = f.readline().strip() 57 except OSError: 58 continue 59 60 if value: 61 linux += value 62 break 63 64 # Containers share the same machine id, add some cgroup 65 # information. This is used outside containers too but should be 66 # relatively stable across boots. 67 try: 68 with open("/proc/self/cgroup", "rb") as f: 69 linux += f.readline().strip().rpartition(b"/")[2] 70 except OSError: 71 pass 72 73 if linux: 74 return linux 75 76 # On OS X, use ioreg to get the computer's serial number. 77 try: 78 # subprocess may not be available, e.g. Google App Engine 79 # https://github.com/pallets/werkzeug/issues/925 80 from subprocess import Popen, PIPE 81 82 dump = Popen( 83 ["ioreg", "-c", "IOPlatformExpertDevice", "-d", "2"], stdout=PIPE 84 ).communicate()[0] 85 match = re.search(b'"serial-number" = <([^>]+)', dump) 86 87 if match is not None: 88 return match.group(1) 89 except (OSError, ImportError): 90 pass 91 92 # On Windows, use winreg to get the machine guid. 93 try: 94 import winreg 95 except ImportError: 96 pass 97 else: 98 try: 99 with winreg.OpenKey( 100 winreg.HKEY_LOCAL_MACHINE, 101 "SOFTWARE\\Microsoft\\Cryptography", 102 0, 103 winreg.KEY_READ | winreg.KEY_WOW64_64KEY, 104 ) as rk: 105 guid: t.Union[str, bytes] 106 guid_type: int 107 guid, guid_type = winreg.QueryValueEx(rk, "MachineGuid") 108 109 if guid_type == winreg.REG_SZ: 110 return guid.encode("utf-8") # type: ignore 111 112 return guid 113 except OSError: 114 pass 115 116 return None 117 118 _machine_id = _generate() 119 return _machine_id 120 121 122class _ConsoleFrame: 123 """Helper class so that we can reuse the frame console code for the 124 standalone console. 125 """ 126 127 def __init__(self, namespace: t.Dict[str, t.Any]): 128 self.console = Console(namespace) 129 self.id = 0 130 131 132def get_pin_and_cookie_name( 133 app: "WSGIApplication", 134) -> t.Union[t.Tuple[str, str], t.Tuple[None, None]]: 135 """Given an application object this returns a semi-stable 9 digit pin 136 code and a random key. The hope is that this is stable between 137 restarts to not make debugging particularly frustrating. If the pin 138 was forcefully disabled this returns `None`. 139 140 Second item in the resulting tuple is the cookie name for remembering. 141 """ 142 pin = os.environ.get("WERKZEUG_DEBUG_PIN") 143 rv = None 144 num = None 145 146 # Pin was explicitly disabled 147 if pin == "off": 148 return None, None 149 150 # Pin was provided explicitly 151 if pin is not None and pin.replace("-", "").isdigit(): 152 # If there are separators in the pin, return it directly 153 if "-" in pin: 154 rv = pin 155 else: 156 num = pin 157 158 modname = getattr(app, "__module__", t.cast(object, app).__class__.__module__) 159 username: t.Optional[str] 160 161 try: 162 # getuser imports the pwd module, which does not exist in Google 163 # App Engine. It may also raise a KeyError if the UID does not 164 # have a username, such as in Docker. 165 username = getpass.getuser() 166 except (ImportError, KeyError): 167 username = None 168 169 mod = sys.modules.get(modname) 170 171 # This information only exists to make the cookie unique on the 172 # computer, not as a security feature. 173 probably_public_bits = [ 174 username, 175 modname, 176 getattr(app, "__name__", type(app).__name__), 177 getattr(mod, "__file__", None), 178 ] 179 180 # This information is here to make it harder for an attacker to 181 # guess the cookie name. They are unlikely to be contained anywhere 182 # within the unauthenticated debug page. 183 private_bits = [str(uuid.getnode()), get_machine_id()] 184 185 h = hashlib.sha1() 186 for bit in chain(probably_public_bits, private_bits): 187 if not bit: 188 continue 189 if isinstance(bit, str): 190 bit = bit.encode("utf-8") 191 h.update(bit) 192 h.update(b"cookiesalt") 193 194 cookie_name = f"__wzd{h.hexdigest()[:20]}" 195 196 # If we need to generate a pin we salt it a bit more so that we don't 197 # end up with the same value and generate out 9 digits 198 if num is None: 199 h.update(b"pinsalt") 200 num = f"{int(h.hexdigest(), 16):09d}"[:9] 201 202 # Format the pincode in groups of digits for easier remembering if 203 # we don't have a result yet. 204 if rv is None: 205 for group_size in 5, 4, 3: 206 if len(num) % group_size == 0: 207 rv = "-".join( 208 num[x : x + group_size].rjust(group_size, "0") 209 for x in range(0, len(num), group_size) 210 ) 211 break 212 else: 213 rv = num 214 215 return rv, cookie_name 216 217 218class DebuggedApplication: 219 """Enables debugging support for a given application:: 220 221 from werkzeug.debug import DebuggedApplication 222 from myapp import app 223 app = DebuggedApplication(app, evalex=True) 224 225 The `evalex` keyword argument allows evaluating expressions in a 226 traceback's frame context. 227 228 :param app: the WSGI application to run debugged. 229 :param evalex: enable exception evaluation feature (interactive 230 debugging). This requires a non-forking server. 231 :param request_key: The key that points to the request object in ths 232 environment. This parameter is ignored in current 233 versions. 234 :param console_path: the URL for a general purpose console. 235 :param console_init_func: the function that is executed before starting 236 the general purpose console. The return value 237 is used as initial namespace. 238 :param show_hidden_frames: by default hidden traceback frames are skipped. 239 You can show them by setting this parameter 240 to `True`. 241 :param pin_security: can be used to disable the pin based security system. 242 :param pin_logging: enables the logging of the pin system. 243 """ 244 245 _pin: str 246 _pin_cookie: str 247 248 def __init__( 249 self, 250 app: "WSGIApplication", 251 evalex: bool = False, 252 request_key: str = "werkzeug.request", 253 console_path: str = "/console", 254 console_init_func: t.Optional[t.Callable[[], t.Dict[str, t.Any]]] = None, 255 show_hidden_frames: bool = False, 256 pin_security: bool = True, 257 pin_logging: bool = True, 258 ) -> None: 259 if not console_init_func: 260 console_init_func = None 261 self.app = app 262 self.evalex = evalex 263 self.frames: t.Dict[int, t.Union[Frame, _ConsoleFrame]] = {} 264 self.tracebacks: t.Dict[int, Traceback] = {} 265 self.request_key = request_key 266 self.console_path = console_path 267 self.console_init_func = console_init_func 268 self.show_hidden_frames = show_hidden_frames 269 self.secret = gen_salt(20) 270 self._failed_pin_auth = 0 271 272 self.pin_logging = pin_logging 273 if pin_security: 274 # Print out the pin for the debugger on standard out. 275 if os.environ.get("WERKZEUG_RUN_MAIN") == "true" and pin_logging: 276 _log("warning", " * Debugger is active!") 277 if self.pin is None: 278 _log("warning", " * Debugger PIN disabled. DEBUGGER UNSECURED!") 279 else: 280 _log("info", " * Debugger PIN: %s", self.pin) 281 else: 282 self.pin = None 283 284 @property 285 def pin(self) -> t.Optional[str]: 286 if not hasattr(self, "_pin"): 287 pin_cookie = get_pin_and_cookie_name(self.app) 288 self._pin, self._pin_cookie = pin_cookie # type: ignore 289 return self._pin 290 291 @pin.setter 292 def pin(self, value: str) -> None: 293 self._pin = value 294 295 @property 296 def pin_cookie_name(self) -> str: 297 """The name of the pin cookie.""" 298 if not hasattr(self, "_pin_cookie"): 299 pin_cookie = get_pin_and_cookie_name(self.app) 300 self._pin, self._pin_cookie = pin_cookie # type: ignore 301 return self._pin_cookie 302 303 def debug_application( 304 self, environ: "WSGIEnvironment", start_response: "StartResponse" 305 ) -> t.Iterator[bytes]: 306 """Run the application and conserve the traceback frames.""" 307 app_iter = None 308 try: 309 app_iter = self.app(environ, start_response) 310 yield from app_iter 311 if hasattr(app_iter, "close"): 312 app_iter.close() # type: ignore 313 except Exception: 314 if hasattr(app_iter, "close"): 315 app_iter.close() # type: ignore 316 traceback = get_current_traceback( 317 skip=1, 318 show_hidden_frames=self.show_hidden_frames, 319 ignore_system_exceptions=True, 320 ) 321 for frame in traceback.frames: 322 self.frames[frame.id] = frame 323 self.tracebacks[traceback.id] = traceback 324 325 try: 326 start_response( 327 "500 INTERNAL SERVER ERROR", 328 [ 329 ("Content-Type", "text/html; charset=utf-8"), 330 # Disable Chrome's XSS protection, the debug 331 # output can cause false-positives. 332 ("X-XSS-Protection", "0"), 333 ], 334 ) 335 except Exception: 336 # if we end up here there has been output but an error 337 # occurred. in that situation we can do nothing fancy any 338 # more, better log something into the error log and fall 339 # back gracefully. 340 environ["wsgi.errors"].write( 341 "Debugging middleware caught exception in streamed " 342 "response at a point where response headers were already " 343 "sent.\n" 344 ) 345 else: 346 is_trusted = bool(self.check_pin_trust(environ)) 347 yield traceback.render_full( 348 evalex=self.evalex, evalex_trusted=is_trusted, secret=self.secret 349 ).encode("utf-8", "replace") 350 351 traceback.log(environ["wsgi.errors"]) 352 353 def execute_command( 354 self, request: Request, command: str, frame: t.Union[Frame, _ConsoleFrame] 355 ) -> Response: 356 """Execute a command in a console.""" 357 return Response(frame.console.eval(command), mimetype="text/html") 358 359 def display_console(self, request: Request) -> Response: 360 """Display a standalone shell.""" 361 if 0 not in self.frames: 362 if self.console_init_func is None: 363 ns = {} 364 else: 365 ns = dict(self.console_init_func()) 366 ns.setdefault("app", self.app) 367 self.frames[0] = _ConsoleFrame(ns) 368 is_trusted = bool(self.check_pin_trust(request.environ)) 369 return Response( 370 render_console_html(secret=self.secret, evalex_trusted=is_trusted), 371 mimetype="text/html", 372 ) 373 374 def get_resource(self, request: Request, filename: str) -> Response: 375 """Return a static resource from the shared folder.""" 376 filename = join("shared", basename(filename)) 377 try: 378 data = pkgutil.get_data(__package__, filename) 379 except OSError: 380 data = None 381 if data is not None: 382 mimetype = mimetypes.guess_type(filename)[0] or "application/octet-stream" 383 return Response(data, mimetype=mimetype) 384 return Response("Not Found", status=404) 385 386 def check_pin_trust(self, environ: "WSGIEnvironment") -> t.Optional[bool]: 387 """Checks if the request passed the pin test. This returns `True` if the 388 request is trusted on a pin/cookie basis and returns `False` if not. 389 Additionally if the cookie's stored pin hash is wrong it will return 390 `None` so that appropriate action can be taken. 391 """ 392 if self.pin is None: 393 return True 394 val = parse_cookie(environ).get(self.pin_cookie_name) 395 if not val or "|" not in val: 396 return False 397 ts, pin_hash = val.split("|", 1) 398 if not ts.isdigit(): 399 return False 400 if pin_hash != hash_pin(self.pin): 401 return None 402 return (time.time() - PIN_TIME) < int(ts) 403 404 def _fail_pin_auth(self) -> None: 405 time.sleep(5.0 if self._failed_pin_auth > 5 else 0.5) 406 self._failed_pin_auth += 1 407 408 def pin_auth(self, request: Request) -> Response: 409 """Authenticates with the pin.""" 410 exhausted = False 411 auth = False 412 trust = self.check_pin_trust(request.environ) 413 pin = t.cast(str, self.pin) 414 415 # If the trust return value is `None` it means that the cookie is 416 # set but the stored pin hash value is bad. This means that the 417 # pin was changed. In this case we count a bad auth and unset the 418 # cookie. This way it becomes harder to guess the cookie name 419 # instead of the pin as we still count up failures. 420 bad_cookie = False 421 if trust is None: 422 self._fail_pin_auth() 423 bad_cookie = True 424 425 # If we're trusted, we're authenticated. 426 elif trust: 427 auth = True 428 429 # If we failed too many times, then we're locked out. 430 elif self._failed_pin_auth > 10: 431 exhausted = True 432 433 # Otherwise go through pin based authentication 434 else: 435 entered_pin = request.args["pin"] 436 437 if entered_pin.strip().replace("-", "") == pin.replace("-", ""): 438 self._failed_pin_auth = 0 439 auth = True 440 else: 441 self._fail_pin_auth() 442 443 rv = Response( 444 json.dumps({"auth": auth, "exhausted": exhausted}), 445 mimetype="application/json", 446 ) 447 if auth: 448 rv.set_cookie( 449 self.pin_cookie_name, 450 f"{int(time.time())}|{hash_pin(pin)}", 451 httponly=True, 452 samesite="Strict", 453 secure=request.is_secure, 454 ) 455 elif bad_cookie: 456 rv.delete_cookie(self.pin_cookie_name) 457 return rv 458 459 def log_pin_request(self) -> Response: 460 """Log the pin if needed.""" 461 if self.pin_logging and self.pin is not None: 462 _log( 463 "info", " * To enable the debugger you need to enter the security pin:" 464 ) 465 _log("info", " * Debugger pin code: %s", self.pin) 466 return Response("") 467 468 def __call__( 469 self, environ: "WSGIEnvironment", start_response: "StartResponse" 470 ) -> t.Iterable[bytes]: 471 """Dispatch the requests.""" 472 # important: don't ever access a function here that reads the incoming 473 # form data! Otherwise the application won't have access to that data 474 # any more! 475 request = Request(environ) 476 response = self.debug_application 477 if request.args.get("__debugger__") == "yes": 478 cmd = request.args.get("cmd") 479 arg = request.args.get("f") 480 secret = request.args.get("s") 481 frame = self.frames.get(request.args.get("frm", type=int)) # type: ignore 482 if cmd == "resource" and arg: 483 response = self.get_resource(request, arg) # type: ignore 484 elif cmd == "pinauth" and secret == self.secret: 485 response = self.pin_auth(request) # type: ignore 486 elif cmd == "printpin" and secret == self.secret: 487 response = self.log_pin_request() # type: ignore 488 elif ( 489 self.evalex 490 and cmd is not None 491 and frame is not None 492 and self.secret == secret 493 and self.check_pin_trust(environ) 494 ): 495 response = self.execute_command(request, cmd, frame) # type: ignore 496 elif ( 497 self.evalex 498 and self.console_path is not None 499 and request.path == self.console_path 500 ): 501 response = self.display_console(request) # type: ignore 502 return response(environ, start_response) 503