1import getpass
2import hashlib
3import json
4import mimetypes
5import os
6import pkgutil
7import re
8import sys
9import time
10import typing as t
11import uuid
12from itertools import chain
13from os.path import basename
14from os.path import join
15
16from .._internal import _log
17from ..http import parse_cookie
18from ..security import gen_salt
19from ..wrappers.request import Request
20from ..wrappers.response import Response
21from .console import Console
22from .tbtools import Frame
23from .tbtools import get_current_traceback
24from .tbtools import render_console_html
25from .tbtools import Traceback
26
27if t.TYPE_CHECKING:
28    from _typeshed.wsgi import StartResponse
29    from _typeshed.wsgi import WSGIApplication
30    from _typeshed.wsgi import WSGIEnvironment
31
32# A week
33PIN_TIME = 60 * 60 * 24 * 7
34
35
36def hash_pin(pin: str) -> str:
37    return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12]
38
39
40_machine_id: t.Optional[t.Union[str, bytes]] = None
41
42
43def get_machine_id() -> t.Optional[t.Union[str, bytes]]:
44    global _machine_id
45
46    if _machine_id is not None:
47        return _machine_id
48
49    def _generate() -> t.Optional[t.Union[str, bytes]]:
50        linux = b""
51
52        # machine-id is stable across boots, boot_id is not.
53        for filename in "/etc/machine-id", "/proc/sys/kernel/random/boot_id":
54            try:
55                with open(filename, "rb") as f:
56                    value = f.readline().strip()
57            except OSError:
58                continue
59
60            if value:
61                linux += value
62                break
63
64        # Containers share the same machine id, add some cgroup
65        # information. This is used outside containers too but should be
66        # relatively stable across boots.
67        try:
68            with open("/proc/self/cgroup", "rb") as f:
69                linux += f.readline().strip().rpartition(b"/")[2]
70        except OSError:
71            pass
72
73        if linux:
74            return linux
75
76        # On OS X, use ioreg to get the computer's serial number.
77        try:
78            # subprocess may not be available, e.g. Google App Engine
79            # https://github.com/pallets/werkzeug/issues/925
80            from subprocess import Popen, PIPE
81
82            dump = Popen(
83                ["ioreg", "-c", "IOPlatformExpertDevice", "-d", "2"], stdout=PIPE
84            ).communicate()[0]
85            match = re.search(b'"serial-number" = <([^>]+)', dump)
86
87            if match is not None:
88                return match.group(1)
89        except (OSError, ImportError):
90            pass
91
92        # On Windows, use winreg to get the machine guid.
93        try:
94            import winreg
95        except ImportError:
96            pass
97        else:
98            try:
99                with winreg.OpenKey(
100                    winreg.HKEY_LOCAL_MACHINE,
101                    "SOFTWARE\\Microsoft\\Cryptography",
102                    0,
103                    winreg.KEY_READ | winreg.KEY_WOW64_64KEY,
104                ) as rk:
105                    guid: t.Union[str, bytes]
106                    guid_type: int
107                    guid, guid_type = winreg.QueryValueEx(rk, "MachineGuid")
108
109                    if guid_type == winreg.REG_SZ:
110                        return guid.encode("utf-8")  # type: ignore
111
112                    return guid
113            except OSError:
114                pass
115
116        return None
117
118    _machine_id = _generate()
119    return _machine_id
120
121
122class _ConsoleFrame:
123    """Helper class so that we can reuse the frame console code for the
124    standalone console.
125    """
126
127    def __init__(self, namespace: t.Dict[str, t.Any]):
128        self.console = Console(namespace)
129        self.id = 0
130
131
132def get_pin_and_cookie_name(
133    app: "WSGIApplication",
134) -> t.Union[t.Tuple[str, str], t.Tuple[None, None]]:
135    """Given an application object this returns a semi-stable 9 digit pin
136    code and a random key.  The hope is that this is stable between
137    restarts to not make debugging particularly frustrating.  If the pin
138    was forcefully disabled this returns `None`.
139
140    Second item in the resulting tuple is the cookie name for remembering.
141    """
142    pin = os.environ.get("WERKZEUG_DEBUG_PIN")
143    rv = None
144    num = None
145
146    # Pin was explicitly disabled
147    if pin == "off":
148        return None, None
149
150    # Pin was provided explicitly
151    if pin is not None and pin.replace("-", "").isdigit():
152        # If there are separators in the pin, return it directly
153        if "-" in pin:
154            rv = pin
155        else:
156            num = pin
157
158    modname = getattr(app, "__module__", t.cast(object, app).__class__.__module__)
159    username: t.Optional[str]
160
161    try:
162        # getuser imports the pwd module, which does not exist in Google
163        # App Engine. It may also raise a KeyError if the UID does not
164        # have a username, such as in Docker.
165        username = getpass.getuser()
166    except (ImportError, KeyError):
167        username = None
168
169    mod = sys.modules.get(modname)
170
171    # This information only exists to make the cookie unique on the
172    # computer, not as a security feature.
173    probably_public_bits = [
174        username,
175        modname,
176        getattr(app, "__name__", type(app).__name__),
177        getattr(mod, "__file__", None),
178    ]
179
180    # This information is here to make it harder for an attacker to
181    # guess the cookie name.  They are unlikely to be contained anywhere
182    # within the unauthenticated debug page.
183    private_bits = [str(uuid.getnode()), get_machine_id()]
184
185    h = hashlib.sha1()
186    for bit in chain(probably_public_bits, private_bits):
187        if not bit:
188            continue
189        if isinstance(bit, str):
190            bit = bit.encode("utf-8")
191        h.update(bit)
192    h.update(b"cookiesalt")
193
194    cookie_name = f"__wzd{h.hexdigest()[:20]}"
195
196    # If we need to generate a pin we salt it a bit more so that we don't
197    # end up with the same value and generate out 9 digits
198    if num is None:
199        h.update(b"pinsalt")
200        num = f"{int(h.hexdigest(), 16):09d}"[:9]
201
202    # Format the pincode in groups of digits for easier remembering if
203    # we don't have a result yet.
204    if rv is None:
205        for group_size in 5, 4, 3:
206            if len(num) % group_size == 0:
207                rv = "-".join(
208                    num[x : x + group_size].rjust(group_size, "0")
209                    for x in range(0, len(num), group_size)
210                )
211                break
212        else:
213            rv = num
214
215    return rv, cookie_name
216
217
218class DebuggedApplication:
219    """Enables debugging support for a given application::
220
221        from werkzeug.debug import DebuggedApplication
222        from myapp import app
223        app = DebuggedApplication(app, evalex=True)
224
225    The `evalex` keyword argument allows evaluating expressions in a
226    traceback's frame context.
227
228    :param app: the WSGI application to run debugged.
229    :param evalex: enable exception evaluation feature (interactive
230                   debugging).  This requires a non-forking server.
231    :param request_key: The key that points to the request object in ths
232                        environment.  This parameter is ignored in current
233                        versions.
234    :param console_path: the URL for a general purpose console.
235    :param console_init_func: the function that is executed before starting
236                              the general purpose console.  The return value
237                              is used as initial namespace.
238    :param show_hidden_frames: by default hidden traceback frames are skipped.
239                               You can show them by setting this parameter
240                               to `True`.
241    :param pin_security: can be used to disable the pin based security system.
242    :param pin_logging: enables the logging of the pin system.
243    """
244
245    _pin: str
246    _pin_cookie: str
247
248    def __init__(
249        self,
250        app: "WSGIApplication",
251        evalex: bool = False,
252        request_key: str = "werkzeug.request",
253        console_path: str = "/console",
254        console_init_func: t.Optional[t.Callable[[], t.Dict[str, t.Any]]] = None,
255        show_hidden_frames: bool = False,
256        pin_security: bool = True,
257        pin_logging: bool = True,
258    ) -> None:
259        if not console_init_func:
260            console_init_func = None
261        self.app = app
262        self.evalex = evalex
263        self.frames: t.Dict[int, t.Union[Frame, _ConsoleFrame]] = {}
264        self.tracebacks: t.Dict[int, Traceback] = {}
265        self.request_key = request_key
266        self.console_path = console_path
267        self.console_init_func = console_init_func
268        self.show_hidden_frames = show_hidden_frames
269        self.secret = gen_salt(20)
270        self._failed_pin_auth = 0
271
272        self.pin_logging = pin_logging
273        if pin_security:
274            # Print out the pin for the debugger on standard out.
275            if os.environ.get("WERKZEUG_RUN_MAIN") == "true" and pin_logging:
276                _log("warning", " * Debugger is active!")
277                if self.pin is None:
278                    _log("warning", " * Debugger PIN disabled. DEBUGGER UNSECURED!")
279                else:
280                    _log("info", " * Debugger PIN: %s", self.pin)
281        else:
282            self.pin = None
283
284    @property
285    def pin(self) -> t.Optional[str]:
286        if not hasattr(self, "_pin"):
287            pin_cookie = get_pin_and_cookie_name(self.app)
288            self._pin, self._pin_cookie = pin_cookie  # type: ignore
289        return self._pin
290
291    @pin.setter
292    def pin(self, value: str) -> None:
293        self._pin = value
294
295    @property
296    def pin_cookie_name(self) -> str:
297        """The name of the pin cookie."""
298        if not hasattr(self, "_pin_cookie"):
299            pin_cookie = get_pin_and_cookie_name(self.app)
300            self._pin, self._pin_cookie = pin_cookie  # type: ignore
301        return self._pin_cookie
302
303    def debug_application(
304        self, environ: "WSGIEnvironment", start_response: "StartResponse"
305    ) -> t.Iterator[bytes]:
306        """Run the application and conserve the traceback frames."""
307        app_iter = None
308        try:
309            app_iter = self.app(environ, start_response)
310            yield from app_iter
311            if hasattr(app_iter, "close"):
312                app_iter.close()  # type: ignore
313        except Exception:
314            if hasattr(app_iter, "close"):
315                app_iter.close()  # type: ignore
316            traceback = get_current_traceback(
317                skip=1,
318                show_hidden_frames=self.show_hidden_frames,
319                ignore_system_exceptions=True,
320            )
321            for frame in traceback.frames:
322                self.frames[frame.id] = frame
323            self.tracebacks[traceback.id] = traceback
324
325            try:
326                start_response(
327                    "500 INTERNAL SERVER ERROR",
328                    [
329                        ("Content-Type", "text/html; charset=utf-8"),
330                        # Disable Chrome's XSS protection, the debug
331                        # output can cause false-positives.
332                        ("X-XSS-Protection", "0"),
333                    ],
334                )
335            except Exception:
336                # if we end up here there has been output but an error
337                # occurred.  in that situation we can do nothing fancy any
338                # more, better log something into the error log and fall
339                # back gracefully.
340                environ["wsgi.errors"].write(
341                    "Debugging middleware caught exception in streamed "
342                    "response at a point where response headers were already "
343                    "sent.\n"
344                )
345            else:
346                is_trusted = bool(self.check_pin_trust(environ))
347                yield traceback.render_full(
348                    evalex=self.evalex, evalex_trusted=is_trusted, secret=self.secret
349                ).encode("utf-8", "replace")
350
351            traceback.log(environ["wsgi.errors"])
352
353    def execute_command(
354        self, request: Request, command: str, frame: t.Union[Frame, _ConsoleFrame]
355    ) -> Response:
356        """Execute a command in a console."""
357        return Response(frame.console.eval(command), mimetype="text/html")
358
359    def display_console(self, request: Request) -> Response:
360        """Display a standalone shell."""
361        if 0 not in self.frames:
362            if self.console_init_func is None:
363                ns = {}
364            else:
365                ns = dict(self.console_init_func())
366            ns.setdefault("app", self.app)
367            self.frames[0] = _ConsoleFrame(ns)
368        is_trusted = bool(self.check_pin_trust(request.environ))
369        return Response(
370            render_console_html(secret=self.secret, evalex_trusted=is_trusted),
371            mimetype="text/html",
372        )
373
374    def get_resource(self, request: Request, filename: str) -> Response:
375        """Return a static resource from the shared folder."""
376        filename = join("shared", basename(filename))
377        try:
378            data = pkgutil.get_data(__package__, filename)
379        except OSError:
380            data = None
381        if data is not None:
382            mimetype = mimetypes.guess_type(filename)[0] or "application/octet-stream"
383            return Response(data, mimetype=mimetype)
384        return Response("Not Found", status=404)
385
386    def check_pin_trust(self, environ: "WSGIEnvironment") -> t.Optional[bool]:
387        """Checks if the request passed the pin test.  This returns `True` if the
388        request is trusted on a pin/cookie basis and returns `False` if not.
389        Additionally if the cookie's stored pin hash is wrong it will return
390        `None` so that appropriate action can be taken.
391        """
392        if self.pin is None:
393            return True
394        val = parse_cookie(environ).get(self.pin_cookie_name)
395        if not val or "|" not in val:
396            return False
397        ts, pin_hash = val.split("|", 1)
398        if not ts.isdigit():
399            return False
400        if pin_hash != hash_pin(self.pin):
401            return None
402        return (time.time() - PIN_TIME) < int(ts)
403
404    def _fail_pin_auth(self) -> None:
405        time.sleep(5.0 if self._failed_pin_auth > 5 else 0.5)
406        self._failed_pin_auth += 1
407
408    def pin_auth(self, request: Request) -> Response:
409        """Authenticates with the pin."""
410        exhausted = False
411        auth = False
412        trust = self.check_pin_trust(request.environ)
413        pin = t.cast(str, self.pin)
414
415        # If the trust return value is `None` it means that the cookie is
416        # set but the stored pin hash value is bad.  This means that the
417        # pin was changed.  In this case we count a bad auth and unset the
418        # cookie.  This way it becomes harder to guess the cookie name
419        # instead of the pin as we still count up failures.
420        bad_cookie = False
421        if trust is None:
422            self._fail_pin_auth()
423            bad_cookie = True
424
425        # If we're trusted, we're authenticated.
426        elif trust:
427            auth = True
428
429        # If we failed too many times, then we're locked out.
430        elif self._failed_pin_auth > 10:
431            exhausted = True
432
433        # Otherwise go through pin based authentication
434        else:
435            entered_pin = request.args["pin"]
436
437            if entered_pin.strip().replace("-", "") == pin.replace("-", ""):
438                self._failed_pin_auth = 0
439                auth = True
440            else:
441                self._fail_pin_auth()
442
443        rv = Response(
444            json.dumps({"auth": auth, "exhausted": exhausted}),
445            mimetype="application/json",
446        )
447        if auth:
448            rv.set_cookie(
449                self.pin_cookie_name,
450                f"{int(time.time())}|{hash_pin(pin)}",
451                httponly=True,
452                samesite="Strict",
453                secure=request.is_secure,
454            )
455        elif bad_cookie:
456            rv.delete_cookie(self.pin_cookie_name)
457        return rv
458
459    def log_pin_request(self) -> Response:
460        """Log the pin if needed."""
461        if self.pin_logging and self.pin is not None:
462            _log(
463                "info", " * To enable the debugger you need to enter the security pin:"
464            )
465            _log("info", " * Debugger pin code: %s", self.pin)
466        return Response("")
467
468    def __call__(
469        self, environ: "WSGIEnvironment", start_response: "StartResponse"
470    ) -> t.Iterable[bytes]:
471        """Dispatch the requests."""
472        # important: don't ever access a function here that reads the incoming
473        # form data!  Otherwise the application won't have access to that data
474        # any more!
475        request = Request(environ)
476        response = self.debug_application
477        if request.args.get("__debugger__") == "yes":
478            cmd = request.args.get("cmd")
479            arg = request.args.get("f")
480            secret = request.args.get("s")
481            frame = self.frames.get(request.args.get("frm", type=int))  # type: ignore
482            if cmd == "resource" and arg:
483                response = self.get_resource(request, arg)  # type: ignore
484            elif cmd == "pinauth" and secret == self.secret:
485                response = self.pin_auth(request)  # type: ignore
486            elif cmd == "printpin" and secret == self.secret:
487                response = self.log_pin_request()  # type: ignore
488            elif (
489                self.evalex
490                and cmd is not None
491                and frame is not None
492                and self.secret == secret
493                and self.check_pin_trust(environ)
494            ):
495                response = self.execute_command(request, cmd, frame)  # type: ignore
496        elif (
497            self.evalex
498            and self.console_path is not None
499            and request.path == self.console_path
500        ):
501            response = self.display_console(request)  # type: ignore
502        return response(environ, start_response)
503