1"""Base Tornado handlers for the Jupyter server."""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import datetime
5import functools
6import ipaddress
7import json
8import mimetypes
9import os
10import re
11import traceback
12import types
13import warnings
14from http.client import responses
15from http.cookies import Morsel
16from urllib.parse import urlparse
17
18import prometheus_client
19from ipython_genutils.path import filefind
20from jinja2 import TemplateNotFound
21from jupyter_core.paths import is_hidden
22from tornado import escape
23from tornado import httputil
24from tornado import web
25from tornado.log import app_log
26from traitlets.config import Application
27
28import jupyter_server
29from jupyter_server._sysinfo import get_sys_info
30from jupyter_server._tz import utcnow
31from jupyter_server.i18n import combine_translations
32from jupyter_server.services.security import csp_report_uri
33from jupyter_server.utils import ensure_async
34from jupyter_server.utils import url_escape
35from jupyter_server.utils import url_is_absolute
36from jupyter_server.utils import url_path_join
37from jupyter_server.utils import urldecode_unix_socket_path
38
39# -----------------------------------------------------------------------------
40# Top-level handlers
41# -----------------------------------------------------------------------------
42non_alphanum = re.compile(r"[^A-Za-z0-9]")
43
44_sys_info_cache = None
45
46
47def json_sys_info():
48    global _sys_info_cache
49    if _sys_info_cache is None:
50        _sys_info_cache = json.dumps(get_sys_info())
51    return _sys_info_cache
52
53
54def log():
55    if Application.initialized():
56        return Application.instance().log
57    else:
58        return app_log
59
60
61class AuthenticatedHandler(web.RequestHandler):
62    """A RequestHandler with an authenticated user."""
63
64    @property
65    def content_security_policy(self):
66        """The default Content-Security-Policy header
67
68        Can be overridden by defining Content-Security-Policy in settings['headers']
69        """
70        if "Content-Security-Policy" in self.settings.get("headers", {}):
71            # user-specified, don't override
72            return self.settings["headers"]["Content-Security-Policy"]
73
74        return "; ".join(
75            [
76                "frame-ancestors 'self'",
77                # Make sure the report-uri is relative to the base_url
78                "report-uri "
79                + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)),
80            ]
81        )
82
83    def set_default_headers(self):
84        headers = {}
85        headers["X-Content-Type-Options"] = "nosniff"
86        headers.update(self.settings.get("headers", {}))
87
88        headers["Content-Security-Policy"] = self.content_security_policy
89
90        # Allow for overriding headers
91        for header_name, value in headers.items():
92            try:
93                self.set_header(header_name, value)
94            except Exception as e:
95                # tornado raise Exception (not a subclass)
96                # if method is unsupported (websocket and Access-Control-Allow-Origin
97                # for example, so just ignore)
98                self.log.debug(e)
99
100    def force_clear_cookie(self, name, path="/", domain=None):
101        """Deletes the cookie with the given name.
102
103        Tornado's cookie handling currently (Jan 2018) stores cookies in a dict
104        keyed by name, so it can only modify one cookie with a given name per
105        response. The browser can store multiple cookies with the same name
106        but different domains and/or paths. This method lets us clear multiple
107        cookies with the same name.
108
109        Due to limitations of the cookie protocol, you must pass the same
110        path and domain to clear a cookie as were used when that cookie
111        was set (but there is no way to find out on the server side
112        which values were used for a given cookie).
113        """
114        name = escape.native_str(name)
115        expires = datetime.datetime.utcnow() - datetime.timedelta(days=365)
116
117        morsel = Morsel()
118        morsel.set(name, "", '""')
119        morsel["expires"] = httputil.format_timestamp(expires)
120        morsel["path"] = path
121        if domain:
122            morsel["domain"] = domain
123        self.add_header("Set-Cookie", morsel.OutputString())
124
125    def clear_login_cookie(self):
126        cookie_options = self.settings.get("cookie_options", {})
127        path = cookie_options.setdefault("path", self.base_url)
128        self.clear_cookie(self.cookie_name, path=path)
129        if path and path != "/":
130            # also clear cookie on / to ensure old cookies are cleared
131            # after the change in path behavior.
132            # N.B. This bypasses the normal cookie handling, which can't update
133            # two cookies with the same name. See the method above.
134            self.force_clear_cookie(self.cookie_name)
135
136    def get_current_user(self):
137        if self.login_handler is None:
138            return "anonymous"
139        return self.login_handler.get_user(self)
140
141    def skip_check_origin(self):
142        """Ask my login_handler if I should skip the origin_check
143
144        For example: in the default LoginHandler, if a request is token-authenticated,
145        origin checking should be skipped.
146        """
147        if self.request.method == "OPTIONS":
148            # no origin-check on options requests, which are used to check origins!
149            return True
150        if self.login_handler is None or not hasattr(self.login_handler, "should_check_origin"):
151            return False
152        return not self.login_handler.should_check_origin(self)
153
154    @property
155    def token_authenticated(self):
156        """Have I been authenticated with a token?"""
157        if self.login_handler is None or not hasattr(self.login_handler, "is_token_authenticated"):
158            return False
159        return self.login_handler.is_token_authenticated(self)
160
161    @property
162    def cookie_name(self):
163        default_cookie_name = non_alphanum.sub("-", "username-{}".format(self.request.host))
164        return self.settings.get("cookie_name", default_cookie_name)
165
166    @property
167    def logged_in(self):
168        """Is a user currently logged in?"""
169        user = self.get_current_user()
170        return user and not user == "anonymous"
171
172    @property
173    def login_handler(self):
174        """Return the login handler for this application, if any."""
175        return self.settings.get("login_handler_class", None)
176
177    @property
178    def token(self):
179        """Return the login token for this application, if any."""
180        return self.settings.get("token", None)
181
182    @property
183    def login_available(self):
184        """May a user proceed to log in?
185
186        This returns True if login capability is available, irrespective of
187        whether the user is already logged in or not.
188
189        """
190        if self.login_handler is None:
191            return False
192        return bool(self.login_handler.get_login_available(self.settings))
193
194
195class JupyterHandler(AuthenticatedHandler):
196    """Jupyter-specific extensions to authenticated handling
197
198    Mostly property shortcuts to Jupyter-specific settings.
199    """
200
201    @property
202    def config(self):
203        return self.settings.get("config", None)
204
205    @property
206    def log(self):
207        """use the Jupyter log by default, falling back on tornado's logger"""
208        return log()
209
210    @property
211    def jinja_template_vars(self):
212        """User-supplied values to supply to jinja templates."""
213        return self.settings.get("jinja_template_vars", {})
214
215    @property
216    def serverapp(self):
217        return self.settings["serverapp"]
218
219    # ---------------------------------------------------------------
220    # URLs
221    # ---------------------------------------------------------------
222
223    @property
224    def version_hash(self):
225        """The version hash to use for cache hints for static files"""
226        return self.settings.get("version_hash", "")
227
228    @property
229    def mathjax_url(self):
230        url = self.settings.get("mathjax_url", "")
231        if not url or url_is_absolute(url):
232            return url
233        return url_path_join(self.base_url, url)
234
235    @property
236    def mathjax_config(self):
237        return self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe")
238
239    @property
240    def base_url(self):
241        return self.settings.get("base_url", "/")
242
243    @property
244    def default_url(self):
245        return self.settings.get("default_url", "")
246
247    @property
248    def ws_url(self):
249        return self.settings.get("websocket_url", "")
250
251    @property
252    def contents_js_source(self):
253        self.log.debug(
254            "Using contents: %s", self.settings.get("contents_js_source", "services/contents")
255        )
256        return self.settings.get("contents_js_source", "services/contents")
257
258    # ---------------------------------------------------------------
259    # Manager objects
260    # ---------------------------------------------------------------
261
262    @property
263    def kernel_manager(self):
264        return self.settings["kernel_manager"]
265
266    @property
267    def contents_manager(self):
268        return self.settings["contents_manager"]
269
270    @property
271    def session_manager(self):
272        return self.settings["session_manager"]
273
274    @property
275    def terminal_manager(self):
276        return self.settings["terminal_manager"]
277
278    @property
279    def kernel_spec_manager(self):
280        return self.settings["kernel_spec_manager"]
281
282    @property
283    def config_manager(self):
284        return self.settings["config_manager"]
285
286    # ---------------------------------------------------------------
287    # CORS
288    # ---------------------------------------------------------------
289
290    @property
291    def allow_origin(self):
292        """Normal Access-Control-Allow-Origin"""
293        return self.settings.get("allow_origin", "")
294
295    @property
296    def allow_origin_pat(self):
297        """Regular expression version of allow_origin"""
298        return self.settings.get("allow_origin_pat", None)
299
300    @property
301    def allow_credentials(self):
302        """Whether to set Access-Control-Allow-Credentials"""
303        return self.settings.get("allow_credentials", False)
304
305    def set_default_headers(self):
306        """Add CORS headers, if defined"""
307        super(JupyterHandler, self).set_default_headers()
308        if self.allow_origin:
309            self.set_header("Access-Control-Allow-Origin", self.allow_origin)
310        elif self.allow_origin_pat:
311            origin = self.get_origin()
312            if origin and re.match(self.allow_origin_pat, origin):
313                self.set_header("Access-Control-Allow-Origin", origin)
314        elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get(
315            "headers", {}
316        ):
317            # allow token-authenticated requests cross-origin by default.
318            # only apply this exception if allow-origin has not been specified.
319            self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
320
321        if self.allow_credentials:
322            self.set_header("Access-Control-Allow-Credentials", "true")
323
324    def set_attachment_header(self, filename):
325        """Set Content-Disposition: attachment header
326
327        As a method to ensure handling of filename encoding
328        """
329        escaped_filename = url_escape(filename)
330        self.set_header(
331            "Content-Disposition",
332            "attachment;"
333            " filename*=utf-8''{utf8}".format(
334                utf8=escaped_filename,
335            ),
336        )
337
338    def get_origin(self):
339        # Handle WebSocket Origin naming convention differences
340        # The difference between version 8 and 13 is that in 8 the
341        # client sends a "Sec-Websocket-Origin" header and in 13 it's
342        # simply "Origin".
343        if "Origin" in self.request.headers:
344            origin = self.request.headers.get("Origin")
345        else:
346            origin = self.request.headers.get("Sec-Websocket-Origin", None)
347        return origin
348
349    # origin_to_satisfy_tornado is present because tornado requires
350    # check_origin to take an origin argument, but we don't use it
351    def check_origin(self, origin_to_satisfy_tornado=""):
352        """Check Origin for cross-site API requests, including websockets
353
354        Copied from WebSocket with changes:
355
356        - allow unspecified host/origin (e.g. scripts)
357        - allow token-authenticated requests
358        """
359        if self.allow_origin == "*" or self.skip_check_origin():
360            return True
361
362        host = self.request.headers.get("Host")
363        origin = self.request.headers.get("Origin")
364
365        # If no header is provided, let the request through.
366        # Origin can be None for:
367        # - same-origin (IE, Firefox)
368        # - Cross-site POST form (IE, Firefox)
369        # - Scripts
370        # The cross-site POST (XSRF) case is handled by tornado's xsrf_token
371        if origin is None or host is None:
372            return True
373
374        origin = origin.lower()
375        origin_host = urlparse(origin).netloc
376
377        # OK if origin matches host
378        if origin_host == host:
379            return True
380
381        # Check CORS headers
382        if self.allow_origin:
383            allow = self.allow_origin == origin
384        elif self.allow_origin_pat:
385            allow = bool(re.match(self.allow_origin_pat, origin))
386        else:
387            # No CORS headers deny the request
388            allow = False
389        if not allow:
390            self.log.warning(
391                "Blocking Cross Origin API request for %s.  Origin: %s, Host: %s",
392                self.request.path,
393                origin,
394                host,
395            )
396        return allow
397
398    def check_referer(self):
399        """Check Referer for cross-site requests.
400        Disables requests to certain endpoints with
401        external or missing Referer.
402        If set, allow_origin settings are applied to the Referer
403        to whitelist specific cross-origin sites.
404        Used on GET for api endpoints and /files/
405        to block cross-site inclusion (XSSI).
406        """
407        if self.allow_origin == "*" or self.skip_check_origin():
408            return True
409
410        host = self.request.headers.get("Host")
411        referer = self.request.headers.get("Referer")
412
413        if not host:
414            self.log.warning("Blocking request with no host")
415            return False
416        if not referer:
417            self.log.warning("Blocking request with no referer")
418            return False
419
420        referer_url = urlparse(referer)
421        referer_host = referer_url.netloc
422        if referer_host == host:
423            return True
424
425        # apply cross-origin checks to Referer:
426        origin = "{}://{}".format(referer_url.scheme, referer_url.netloc)
427        if self.allow_origin:
428            allow = self.allow_origin == origin
429        elif self.allow_origin_pat:
430            allow = bool(re.match(self.allow_origin_pat, origin))
431        else:
432            # No CORS settings, deny the request
433            allow = False
434
435        if not allow:
436            self.log.warning(
437                "Blocking Cross Origin request for %s.  Referer: %s, Host: %s",
438                self.request.path,
439                origin,
440                host,
441            )
442        return allow
443
444    def check_xsrf_cookie(self):
445        """Bypass xsrf cookie checks when token-authenticated"""
446        if self.token_authenticated or self.settings.get("disable_check_xsrf", False):
447            # Token-authenticated requests do not need additional XSRF-check
448            # Servers without authentication are vulnerable to XSRF
449            return
450        try:
451            return super(JupyterHandler, self).check_xsrf_cookie()
452        except web.HTTPError as e:
453            if self.request.method in {"GET", "HEAD"}:
454                # Consider Referer a sufficient cross-origin check for GET requests
455                if not self.check_referer():
456                    referer = self.request.headers.get("Referer")
457                    if referer:
458                        msg = "Blocking Cross Origin request from {}.".format(referer)
459                    else:
460                        msg = "Blocking request from unknown origin"
461                    raise web.HTTPError(403, msg)
462            else:
463                raise
464
465    def check_host(self):
466        """Check the host header if remote access disallowed.
467
468        Returns True if the request should continue, False otherwise.
469        """
470        if self.settings.get("allow_remote_access", False):
471            return True
472
473        # Remove port (e.g. ':8888') from host
474        host = re.match(r"^(.*?)(:\d+)?$", self.request.host).group(1)
475
476        # Browsers format IPv6 addresses like [::1]; we need to remove the []
477        if host.startswith("[") and host.endswith("]"):
478            host = host[1:-1]
479
480        # UNIX socket handling
481        check_host = urldecode_unix_socket_path(host)
482        if check_host.startswith("/") and os.path.exists(check_host):
483            allow = True
484        else:
485            try:
486                addr = ipaddress.ip_address(host)
487            except ValueError:
488                # Not an IP address: check against hostnames
489                allow = host in self.settings.get("local_hostnames", ["localhost"])
490            else:
491                allow = addr.is_loopback
492
493        if not allow:
494            self.log.warning(
495                (
496                    "Blocking request with non-local 'Host' %s (%s). "
497                    "If the server should be accessible at that name, "
498                    "set ServerApp.allow_remote_access to disable the check."
499                ),
500                host,
501                self.request.host,
502            )
503        return allow
504
505    def prepare(self):
506        if not self.check_host():
507            raise web.HTTPError(403)
508        return super(JupyterHandler, self).prepare()
509
510    # ---------------------------------------------------------------
511    # template rendering
512    # ---------------------------------------------------------------
513
514    def get_template(self, name):
515        """Return the jinja template object for a given name"""
516        return self.settings["jinja2_env"].get_template(name)
517
518    def render_template(self, name, **ns):
519        ns.update(self.template_namespace)
520        template = self.get_template(name)
521        return template.render(**ns)
522
523    @property
524    def template_namespace(self):
525        return dict(
526            base_url=self.base_url,
527            default_url=self.default_url,
528            ws_url=self.ws_url,
529            logged_in=self.logged_in,
530            allow_password_change=self.settings.get("allow_password_change"),
531            login_available=self.login_available,
532            token_available=bool(self.token),
533            static_url=self.static_url,
534            sys_info=json_sys_info(),
535            contents_js_source=self.contents_js_source,
536            version_hash=self.version_hash,
537            xsrf_form_html=self.xsrf_form_html,
538            token=self.token,
539            xsrf_token=self.xsrf_token.decode("utf8"),
540            nbjs_translations=json.dumps(
541                combine_translations(self.request.headers.get("Accept-Language", ""))
542            ),
543            **self.jinja_template_vars
544        )
545
546    def get_json_body(self):
547        """Return the body of the request as JSON data."""
548        if not self.request.body:
549            return None
550        # Do we need to call body.decode('utf-8') here?
551        body = self.request.body.strip().decode(u"utf-8")
552        try:
553            model = json.loads(body)
554        except Exception as e:
555            self.log.debug("Bad JSON: %r", body)
556            self.log.error("Couldn't parse JSON", exc_info=True)
557            raise web.HTTPError(400, u"Invalid JSON in body of request") from e
558        return model
559
560    def write_error(self, status_code, **kwargs):
561        """render custom error pages"""
562        exc_info = kwargs.get("exc_info")
563        message = ""
564        status_message = responses.get(status_code, "Unknown HTTP Error")
565        exception = "(unknown)"
566        if exc_info:
567            exception = exc_info[1]
568            # get the custom message, if defined
569            try:
570                message = exception.log_message % exception.args
571            except Exception:
572                pass
573
574            # construct the custom reason, if defined
575            reason = getattr(exception, "reason", "")
576            if reason:
577                status_message = reason
578
579        # build template namespace
580        ns = dict(
581            status_code=status_code,
582            status_message=status_message,
583            message=message,
584            exception=exception,
585        )
586
587        self.set_header("Content-Type", "text/html")
588        # render the template
589        try:
590            html = self.render_template("%s.html" % status_code, **ns)
591        except TemplateNotFound:
592            html = self.render_template("error.html", **ns)
593
594        self.write(html)
595
596
597class APIHandler(JupyterHandler):
598    """Base class for API handlers"""
599
600    def prepare(self):
601        if not self.check_origin():
602            raise web.HTTPError(404)
603        return super(APIHandler, self).prepare()
604
605    def write_error(self, status_code, **kwargs):
606        """APIHandler errors are JSON, not human pages"""
607        self.set_header("Content-Type", "application/json")
608        message = responses.get(status_code, "Unknown HTTP Error")
609        reply = {
610            "message": message,
611        }
612        exc_info = kwargs.get("exc_info")
613        if exc_info:
614            e = exc_info[1]
615            if isinstance(e, HTTPError):
616                reply["message"] = e.log_message or message
617                reply["reason"] = e.reason
618            else:
619                reply["message"] = "Unhandled error"
620                reply["reason"] = None
621                reply["traceback"] = "".join(traceback.format_exception(*exc_info))
622        self.log.warning(reply["message"])
623        self.finish(json.dumps(reply))
624
625    def get_current_user(self):
626        """Raise 403 on API handlers instead of redirecting to human login page"""
627        # preserve _user_cache so we don't raise more than once
628        if hasattr(self, "_user_cache"):
629            return self._user_cache
630        self._user_cache = user = super(APIHandler, self).get_current_user()
631        return user
632
633    def get_login_url(self):
634        # if get_login_url is invoked in an API handler,
635        # that means @web.authenticated is trying to trigger a redirect.
636        # instead of redirecting, raise 403 instead.
637        if not self.current_user:
638            raise web.HTTPError(403)
639        return super(APIHandler, self).get_login_url()
640
641    @property
642    def content_security_policy(self):
643        csp = "; ".join(
644            [
645                super(APIHandler, self).content_security_policy,
646                "default-src 'none'",
647            ]
648        )
649        return csp
650
651    # set _track_activity = False on API handlers that shouldn't track activity
652    _track_activity = True
653
654    def update_api_activity(self):
655        """Update last_activity of API requests"""
656        # record activity of authenticated requests
657        if (
658            self._track_activity
659            and getattr(self, "_user_cache", None)
660            and self.get_argument("no_track_activity", None) is None
661        ):
662            self.settings["api_last_activity"] = utcnow()
663
664    def finish(self, *args, **kwargs):
665        self.update_api_activity()
666        self.set_header("Content-Type", "application/json")
667        return super(APIHandler, self).finish(*args, **kwargs)
668
669    def options(self, *args, **kwargs):
670        if "Access-Control-Allow-Headers" in self.settings.get("headers", {}):
671            self.set_header(
672                "Access-Control-Allow-Headers",
673                self.settings["headers"]["Access-Control-Allow-Headers"],
674            )
675        else:
676            self.set_header(
677                "Access-Control-Allow-Headers", "accept, content-type, authorization, x-xsrftoken"
678            )
679        self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS")
680
681        # if authorization header is requested,
682        # that means the request is token-authenticated.
683        # avoid browser-side rejection of the preflight request.
684        # only allow this exception if allow_origin has not been specified
685        # and Jupyter server authentication is enabled.
686        # If the token is not valid, the 'real' request will still be rejected.
687        requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split(
688            ","
689        )
690        if (
691            requested_headers
692            and any(h.strip().lower() == "authorization" for h in requested_headers)
693            and (
694                # FIXME: it would be even better to check specifically for token-auth,
695                # but there is currently no API for this.
696                self.login_available
697            )
698            and (
699                self.allow_origin
700                or self.allow_origin_pat
701                or "Access-Control-Allow-Origin" in self.settings.get("headers", {})
702            )
703        ):
704            self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
705
706
707class Template404(JupyterHandler):
708    """Render our 404 template"""
709
710    def prepare(self):
711        raise web.HTTPError(404)
712
713
714class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler):
715    """static files should only be accessible when logged in"""
716
717    @property
718    def content_security_policy(self):
719        # In case we're serving HTML/SVG, confine any Javascript to a unique
720        # origin so it can't interact with the Jupyter server.
721        return (
722            super(AuthenticatedFileHandler, self).content_security_policy
723            + "; sandbox allow-scripts"
724        )
725
726    @web.authenticated
727    def head(self, path):
728        self.check_xsrf_cookie()
729        return super(AuthenticatedFileHandler, self).head(path)
730
731    @web.authenticated
732    def get(self, path):
733        if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", False):
734            name = path.rsplit("/", 1)[-1]
735            self.set_attachment_header(name)
736
737        return web.StaticFileHandler.get(self, path)
738
739    def get_content_type(self):
740        path = self.absolute_path.strip("/")
741        if "/" in path:
742            _, name = path.rsplit("/", 1)
743        else:
744            name = path
745        if name.endswith(".ipynb"):
746            return "application/x-ipynb+json"
747        else:
748            cur_mime = mimetypes.guess_type(name)[0]
749            if cur_mime == "text/plain":
750                return "text/plain; charset=UTF-8"
751            else:
752                return super(AuthenticatedFileHandler, self).get_content_type()
753
754    def set_headers(self):
755        super(AuthenticatedFileHandler, self).set_headers()
756        # disable browser caching, rely on 304 replies for savings
757        if "v" not in self.request.arguments:
758            self.add_header("Cache-Control", "no-cache")
759
760    def compute_etag(self):
761        return None
762
763    def validate_absolute_path(self, root, absolute_path):
764        """Validate and return the absolute path.
765
766        Requires tornado 3.1
767
768        Adding to tornado's own handling, forbids the serving of hidden files.
769        """
770        abs_path = super(AuthenticatedFileHandler, self).validate_absolute_path(root, absolute_path)
771        abs_root = os.path.abspath(root)
772        if is_hidden(abs_path, abs_root) and not self.contents_manager.allow_hidden:
773            self.log.info(
774                "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable"
775            )
776            raise web.HTTPError(404)
777        return abs_path
778
779
780def json_errors(method):
781    """Decorate methods with this to return GitHub style JSON errors.
782
783    This should be used on any JSON API on any handler method that can raise HTTPErrors.
784
785    This will grab the latest HTTPError exception using sys.exc_info
786    and then:
787
788    1. Set the HTTP status code based on the HTTPError
789    2. Create and return a JSON body with a message field describing
790       the error in a human readable form.
791    """
792    warnings.warn(
793        "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.",
794        DeprecationWarning,
795        stacklevel=2,
796    )
797
798    @functools.wraps(method)
799    def wrapper(self, *args, **kwargs):
800        self.write_error = types.MethodType(APIHandler.write_error, self)
801        return method(self, *args, **kwargs)
802
803    return wrapper
804
805
806# -----------------------------------------------------------------------------
807# File handler
808# -----------------------------------------------------------------------------
809
810# to minimize subclass changes:
811HTTPError = web.HTTPError
812
813
814class FileFindHandler(JupyterHandler, web.StaticFileHandler):
815    """subclass of StaticFileHandler for serving files from a search path"""
816
817    # cache search results, don't search for files more than once
818    _static_paths = {}
819
820    def set_headers(self):
821        super(FileFindHandler, self).set_headers()
822        # disable browser caching, rely on 304 replies for savings
823        if "v" not in self.request.arguments or any(
824            self.request.path.startswith(path) for path in self.no_cache_paths
825        ):
826            self.set_header("Cache-Control", "no-cache")
827
828    def initialize(self, path, default_filename=None, no_cache_paths=None):
829        self.no_cache_paths = no_cache_paths or []
830
831        if isinstance(path, str):
832            path = [path]
833
834        self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path)
835        self.default_filename = default_filename
836
837    def compute_etag(self):
838        return None
839
840    @classmethod
841    def get_absolute_path(cls, roots, path):
842        """locate a file to serve on our static file search path"""
843        with cls._lock:
844            if path in cls._static_paths:
845                return cls._static_paths[path]
846            try:
847                abspath = os.path.abspath(filefind(path, roots))
848            except IOError:
849                # IOError means not found
850                return ""
851
852            cls._static_paths[path] = abspath
853
854            log().debug("Path %s served from %s" % (path, abspath))
855            return abspath
856
857    def validate_absolute_path(self, root, absolute_path):
858        """check if the file should be served (raises 404, 403, etc.)"""
859        if absolute_path == "":
860            raise web.HTTPError(404)
861
862        for root in self.root:
863            if (absolute_path + os.sep).startswith(root):
864                break
865
866        return super(FileFindHandler, self).validate_absolute_path(root, absolute_path)
867
868
869class APIVersionHandler(APIHandler):
870    def get(self):
871        # not authenticated, so give as few info as possible
872        self.finish(json.dumps({"version": jupyter_server.__version__}))
873
874
875class TrailingSlashHandler(web.RequestHandler):
876    """Simple redirect handler that strips trailing slashes
877
878    This should be the first, highest priority handler.
879    """
880
881    def get(self):
882        path, *rest = self.request.uri.partition("?")
883        # trim trailing *and* leading /
884        # to avoid misinterpreting repeated '//'
885        path = "/" + path.strip("/")
886        new_uri = "".join([path, *rest])
887        self.redirect(new_uri)
888
889    post = put = get
890
891
892class MainHandler(JupyterHandler):
893    """Simple handler for base_url."""
894
895    def get(self):
896        html = self.render_template("main.html")
897        self.write(html)
898
899    post = put = get
900
901
902class FilesRedirectHandler(JupyterHandler):
903    """Handler for redirecting relative URLs to the /files/ handler"""
904
905    @staticmethod
906    async def redirect_to_files(self, path):
907        """make redirect logic a reusable static method
908
909        so it can be called from other handlers.
910        """
911        cm = self.contents_manager
912        if await ensure_async(cm.dir_exists(path)):
913            # it's a *directory*, redirect to /tree
914            url = url_path_join(self.base_url, "tree", url_escape(path))
915        else:
916            orig_path = path
917            # otherwise, redirect to /files
918            parts = path.split("/")
919
920            if not await ensure_async(cm.file_exists(path=path)) and "files" in parts:
921                # redirect without files/ iff it would 404
922                # this preserves pre-2.0-style 'files/' links
923                self.log.warning("Deprecated files/ URL: %s", orig_path)
924                parts.remove("files")
925                path = "/".join(parts)
926
927            if not await ensure_async(cm.file_exists(path=path)):
928                raise web.HTTPError(404)
929
930            url = url_path_join(self.base_url, "files", url_escape(path))
931        self.log.debug("Redirecting %s to %s", self.request.path, url)
932        self.redirect(url)
933
934    def get(self, path=""):
935        return self.redirect_to_files(self, path)
936
937
938class RedirectWithParams(web.RequestHandler):
939    """Sam as web.RedirectHandler, but preserves URL parameters"""
940
941    def initialize(self, url, permanent=True):
942        self._url = url
943        self._permanent = permanent
944
945    def get(self):
946        sep = "&" if "?" in self._url else "?"
947        url = sep.join([self._url, self.request.query])
948        self.redirect(url, permanent=self._permanent)
949
950
951class PrometheusMetricsHandler(JupyterHandler):
952    """
953    Return prometheus metrics for this notebook server
954    """
955
956    def get(self):
957        if self.settings["authenticate_prometheus"] and not self.logged_in:
958            raise web.HTTPError(403)
959
960        self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST)
961        self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY))
962
963
964# -----------------------------------------------------------------------------
965# URL pattern fragments for re-use
966# -----------------------------------------------------------------------------
967
968# path matches any number of `/foo[/bar...]` or just `/` or ''
969path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))"
970
971# -----------------------------------------------------------------------------
972# URL to handler mappings
973# -----------------------------------------------------------------------------
974
975
976default_handlers = [
977    (r".*/", TrailingSlashHandler),
978    (r"api", APIVersionHandler),
979    (r"/(robots\.txt|favicon\.ico)", web.StaticFileHandler),
980    (r"/metrics", PrometheusMetricsHandler),
981]
982