1import re
2from typing import TYPE_CHECKING, Awaitable, Callable, Tuple, Type, TypeVar
3
4from .web_exceptions import HTTPPermanentRedirect, _HTTPMove
5from .web_request import Request
6from .web_response import StreamResponse
7from .web_urldispatcher import SystemRoute
8
9__all__ = (
10    "middleware",
11    "normalize_path_middleware",
12)
13
14if TYPE_CHECKING:  # pragma: no cover
15    from .web_app import Application
16
17_Func = TypeVar("_Func")
18
19
20async def _check_request_resolves(request: Request, path: str) -> Tuple[bool, Request]:
21    alt_request = request.clone(rel_url=path)
22
23    match_info = await request.app.router.resolve(alt_request)
24    alt_request._match_info = match_info  # type: ignore
25
26    if match_info.http_exception is None:
27        return True, alt_request
28
29    return False, request
30
31
32def middleware(f: _Func) -> _Func:
33    f.__middleware_version__ = 1  # type: ignore
34    return f
35
36
37_Handler = Callable[[Request], Awaitable[StreamResponse]]
38_Middleware = Callable[[Request, _Handler], Awaitable[StreamResponse]]
39
40
41def normalize_path_middleware(
42    *,
43    append_slash: bool = True,
44    remove_slash: bool = False,
45    merge_slashes: bool = True,
46    redirect_class: Type[_HTTPMove] = HTTPPermanentRedirect
47) -> _Middleware:
48    """
49    Middleware factory which produces a middleware that normalizes
50    the path of a request. By normalizing it means:
51
52        - Add or remove a trailing slash to the path.
53        - Double slashes are replaced by one.
54
55    The middleware returns as soon as it finds a path that resolves
56    correctly. The order if both merge and append/remove are enabled is
57        1) merge slashes
58        2) append/remove slash
59        3) both merge slashes and append/remove slash.
60    If the path resolves with at least one of those conditions, it will
61    redirect to the new path.
62
63    Only one of `append_slash` and `remove_slash` can be enabled. If both
64    are `True` the factory will raise an assertion error
65
66    If `append_slash` is `True` the middleware will append a slash when
67    needed. If a resource is defined with trailing slash and the request
68    comes without it, it will append it automatically.
69
70    If `remove_slash` is `True`, `append_slash` must be `False`. When enabled
71    the middleware will remove trailing slashes and redirect if the resource
72    is defined
73
74    If merge_slashes is True, merge multiple consecutive slashes in the
75    path into one.
76    """
77
78    correct_configuration = not (append_slash and remove_slash)
79    assert correct_configuration, "Cannot both remove and append slash"
80
81    @middleware
82    async def impl(request: Request, handler: _Handler) -> StreamResponse:
83        if isinstance(request.match_info.route, SystemRoute):
84            paths_to_check = []
85            if "?" in request.raw_path:
86                path, query = request.raw_path.split("?", 1)
87                query = "?" + query
88            else:
89                query = ""
90                path = request.raw_path
91
92            if merge_slashes:
93                paths_to_check.append(re.sub("//+", "/", path))
94            if append_slash and not request.path.endswith("/"):
95                paths_to_check.append(path + "/")
96            if remove_slash and request.path.endswith("/"):
97                paths_to_check.append(path[:-1])
98            if merge_slashes and append_slash:
99                paths_to_check.append(re.sub("//+", "/", path + "/"))
100            if merge_slashes and remove_slash:
101                merged_slashes = re.sub("//+", "/", path)
102                paths_to_check.append(merged_slashes[:-1])
103
104            for path in paths_to_check:
105                path = re.sub("^//+", "/", path)  # SECURITY: GHSA-v6wp-4m6f-gcjg
106                resolves, request = await _check_request_resolves(request, path)
107                if resolves:
108                    raise redirect_class(request.raw_path + query)
109
110        return await handler(request)
111
112    return impl
113
114
115def _fix_request_current_app(app: "Application") -> _Middleware:
116    @middleware
117    async def impl(request: Request, handler: _Handler) -> StreamResponse:
118        with request.match_info.set_current_app(app):
119            return await handler(request)
120
121    return impl
122