1import abc
2import asyncio
3import base64
4import hashlib
5import inspect
6import keyword
7import os
8import re
9import warnings
10from contextlib import contextmanager
11from functools import wraps
12from pathlib import Path
13from types import MappingProxyType
14from typing import (
15    TYPE_CHECKING,
16    Any,
17    Awaitable,
18    Callable,
19    Container,
20    Dict,
21    Generator,
22    Iterable,
23    Iterator,
24    List,
25    Mapping,
26    Optional,
27    Pattern,
28    Set,
29    Sized,
30    Tuple,
31    Type,
32    Union,
33    cast,
34)
35
36from typing_extensions import TypedDict
37from yarl import URL, __version__ as yarl_version  # type: ignore
38
39from . import hdrs
40from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
41from .helpers import DEBUG
42from .http import HttpVersion11
43from .typedefs import PathLike
44from .web_exceptions import (
45    HTTPException,
46    HTTPExpectationFailed,
47    HTTPForbidden,
48    HTTPMethodNotAllowed,
49    HTTPNotFound,
50)
51from .web_fileresponse import FileResponse
52from .web_request import Request
53from .web_response import Response, StreamResponse
54from .web_routedef import AbstractRouteDef
55
56__all__ = (
57    "UrlDispatcher",
58    "UrlMappingMatchInfo",
59    "AbstractResource",
60    "Resource",
61    "PlainResource",
62    "DynamicResource",
63    "AbstractRoute",
64    "ResourceRoute",
65    "StaticResource",
66    "View",
67)
68
69
70if TYPE_CHECKING:  # pragma: no cover
71    from .web_app import Application
72
73    BaseDict = Dict[str, str]
74else:
75    BaseDict = dict
76
77YARL_VERSION = tuple(map(int, yarl_version.split(".")[:2]))
78
79HTTP_METHOD_RE = re.compile(r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$")
80ROUTE_RE = re.compile(r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})")
81PATH_SEP = re.escape("/")
82
83
84_WebHandler = Callable[[Request], Awaitable[StreamResponse]]
85_ExpectHandler = Callable[[Request], Awaitable[None]]
86_Resolve = Tuple[Optional[AbstractMatchInfo], Set[str]]
87
88
89class _InfoDict(TypedDict, total=False):
90    path: str
91
92    formatter: str
93    pattern: Pattern[str]
94
95    directory: Path
96    prefix: str
97    routes: Mapping[str, "AbstractRoute"]
98
99    app: "Application"
100
101    domain: str
102
103    rule: "AbstractRuleMatching"
104
105    http_exception: HTTPException
106
107
108class AbstractResource(Sized, Iterable["AbstractRoute"]):
109    def __init__(self, *, name: Optional[str] = None) -> None:
110        self._name = name
111
112    @property
113    def name(self) -> Optional[str]:
114        return self._name
115
116    @property
117    @abc.abstractmethod
118    def canonical(self) -> str:
119        """Exposes the resource's canonical path.
120
121        For example '/foo/bar/{name}'
122
123        """
124
125    @abc.abstractmethod  # pragma: no branch
126    def url_for(self, **kwargs: str) -> URL:
127        """Construct url for resource with additional params."""
128
129    @abc.abstractmethod  # pragma: no branch
130    async def resolve(self, request: Request) -> _Resolve:
131        """Resolve resource
132
133        Return (UrlMappingMatchInfo, allowed_methods) pair."""
134
135    @abc.abstractmethod
136    def add_prefix(self, prefix: str) -> None:
137        """Add a prefix to processed URLs.
138
139        Required for subapplications support.
140
141        """
142
143    @abc.abstractmethod
144    def get_info(self) -> _InfoDict:
145        """Return a dict with additional info useful for introspection"""
146
147    def freeze(self) -> None:
148        pass
149
150    @abc.abstractmethod
151    def raw_match(self, path: str) -> bool:
152        """Perform a raw match against path"""
153
154
155class AbstractRoute(abc.ABC):
156    def __init__(
157        self,
158        method: str,
159        handler: Union[_WebHandler, Type[AbstractView]],
160        *,
161        expect_handler: Optional[_ExpectHandler] = None,
162        resource: Optional[AbstractResource] = None,
163    ) -> None:
164
165        if expect_handler is None:
166            expect_handler = _default_expect_handler
167
168        assert asyncio.iscoroutinefunction(
169            expect_handler
170        ), f"Coroutine is expected, got {expect_handler!r}"
171
172        method = method.upper()
173        if not HTTP_METHOD_RE.match(method):
174            raise ValueError(f"{method} is not allowed HTTP method")
175
176        assert callable(handler), handler
177        if asyncio.iscoroutinefunction(handler):
178            pass
179        elif inspect.isgeneratorfunction(handler):
180            warnings.warn(
181                "Bare generators are deprecated, " "use @coroutine wrapper",
182                DeprecationWarning,
183            )
184        elif isinstance(handler, type) and issubclass(handler, AbstractView):
185            pass
186        else:
187            warnings.warn(
188                "Bare functions are deprecated, " "use async ones", DeprecationWarning
189            )
190
191            @wraps(handler)
192            async def handler_wrapper(request: Request) -> StreamResponse:
193                result = old_handler(request)
194                if asyncio.iscoroutine(result):
195                    return await result
196                return result  # type: ignore
197
198            old_handler = handler
199            handler = handler_wrapper
200
201        self._method = method
202        self._handler = handler
203        self._expect_handler = expect_handler
204        self._resource = resource
205
206    @property
207    def method(self) -> str:
208        return self._method
209
210    @property
211    def handler(self) -> _WebHandler:
212        return self._handler
213
214    @property
215    @abc.abstractmethod
216    def name(self) -> Optional[str]:
217        """Optional route's name, always equals to resource's name."""
218
219    @property
220    def resource(self) -> Optional[AbstractResource]:
221        return self._resource
222
223    @abc.abstractmethod
224    def get_info(self) -> _InfoDict:
225        """Return a dict with additional info useful for introspection"""
226
227    @abc.abstractmethod  # pragma: no branch
228    def url_for(self, *args: str, **kwargs: str) -> URL:
229        """Construct url for route with additional params."""
230
231    async def handle_expect_header(self, request: Request) -> None:
232        await self._expect_handler(request)
233
234
235class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
236    def __init__(self, match_dict: Dict[str, str], route: AbstractRoute):
237        super().__init__(match_dict)
238        self._route = route
239        self._apps = []  # type: List[Application]
240        self._current_app = None  # type: Optional[Application]
241        self._frozen = False
242
243    @property
244    def handler(self) -> _WebHandler:
245        return self._route.handler
246
247    @property
248    def route(self) -> AbstractRoute:
249        return self._route
250
251    @property
252    def expect_handler(self) -> _ExpectHandler:
253        return self._route.handle_expect_header
254
255    @property
256    def http_exception(self) -> Optional[HTTPException]:
257        return None
258
259    def get_info(self) -> _InfoDict:  # type: ignore
260        return self._route.get_info()
261
262    @property
263    def apps(self) -> Tuple["Application", ...]:
264        return tuple(self._apps)
265
266    def add_app(self, app: "Application") -> None:
267        if self._frozen:
268            raise RuntimeError("Cannot change apps stack after .freeze() call")
269        if self._current_app is None:
270            self._current_app = app
271        self._apps.insert(0, app)
272
273    @property
274    def current_app(self) -> "Application":
275        app = self._current_app
276        assert app is not None
277        return app
278
279    @contextmanager
280    def set_current_app(self, app: "Application") -> Generator[None, None, None]:
281        if DEBUG:  # pragma: no cover
282            if app not in self._apps:
283                raise RuntimeError(
284                    "Expected one of the following apps {!r}, got {!r}".format(
285                        self._apps, app
286                    )
287                )
288        prev = self._current_app
289        self._current_app = app
290        try:
291            yield
292        finally:
293            self._current_app = prev
294
295    def freeze(self) -> None:
296        self._frozen = True
297
298    def __repr__(self) -> str:
299        return f"<MatchInfo {super().__repr__()}: {self._route}>"
300
301
302class MatchInfoError(UrlMappingMatchInfo):
303    def __init__(self, http_exception: HTTPException) -> None:
304        self._exception = http_exception
305        super().__init__({}, SystemRoute(self._exception))
306
307    @property
308    def http_exception(self) -> HTTPException:
309        return self._exception
310
311    def __repr__(self) -> str:
312        return "<MatchInfoError {}: {}>".format(
313            self._exception.status, self._exception.reason
314        )
315
316
317async def _default_expect_handler(request: Request) -> None:
318    """Default handler for Expect header.
319
320    Just send "100 Continue" to client.
321    raise HTTPExpectationFailed if value of header is not "100-continue"
322    """
323    expect = request.headers.get(hdrs.EXPECT, "")
324    if request.version == HttpVersion11:
325        if expect.lower() == "100-continue":
326            await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
327        else:
328            raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
329
330
331class Resource(AbstractResource):
332    def __init__(self, *, name: Optional[str] = None) -> None:
333        super().__init__(name=name)
334        self._routes = []  # type: List[ResourceRoute]
335
336    def add_route(
337        self,
338        method: str,
339        handler: Union[Type[AbstractView], _WebHandler],
340        *,
341        expect_handler: Optional[_ExpectHandler] = None,
342    ) -> "ResourceRoute":
343
344        for route_obj in self._routes:
345            if route_obj.method == method or route_obj.method == hdrs.METH_ANY:
346                raise RuntimeError(
347                    "Added route will never be executed, "
348                    "method {route.method} is already "
349                    "registered".format(route=route_obj)
350                )
351
352        route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
353        self.register_route(route_obj)
354        return route_obj
355
356    def register_route(self, route: "ResourceRoute") -> None:
357        assert isinstance(
358            route, ResourceRoute
359        ), f"Instance of Route class is required, got {route!r}"
360        self._routes.append(route)
361
362    async def resolve(self, request: Request) -> _Resolve:
363        allowed_methods = set()  # type: Set[str]
364
365        match_dict = self._match(request.rel_url.raw_path)
366        if match_dict is None:
367            return None, allowed_methods
368
369        for route_obj in self._routes:
370            route_method = route_obj.method
371            allowed_methods.add(route_method)
372
373            if route_method == request.method or route_method == hdrs.METH_ANY:
374                return (UrlMappingMatchInfo(match_dict, route_obj), allowed_methods)
375        else:
376            return None, allowed_methods
377
378    @abc.abstractmethod
379    def _match(self, path: str) -> Optional[Dict[str, str]]:
380        pass  # pragma: no cover
381
382    def __len__(self) -> int:
383        return len(self._routes)
384
385    def __iter__(self) -> Iterator[AbstractRoute]:
386        return iter(self._routes)
387
388    # TODO: implement all abstract methods
389
390
391class PlainResource(Resource):
392    def __init__(self, path: str, *, name: Optional[str] = None) -> None:
393        super().__init__(name=name)
394        assert not path or path.startswith("/")
395        self._path = path
396
397    @property
398    def canonical(self) -> str:
399        return self._path
400
401    def freeze(self) -> None:
402        if not self._path:
403            self._path = "/"
404
405    def add_prefix(self, prefix: str) -> None:
406        assert prefix.startswith("/")
407        assert not prefix.endswith("/")
408        assert len(prefix) > 1
409        self._path = prefix + self._path
410
411    def _match(self, path: str) -> Optional[Dict[str, str]]:
412        # string comparison is about 10 times faster than regexp matching
413        if self._path == path:
414            return {}
415        else:
416            return None
417
418    def raw_match(self, path: str) -> bool:
419        return self._path == path
420
421    def get_info(self) -> _InfoDict:
422        return {"path": self._path}
423
424    def url_for(self) -> URL:  # type: ignore
425        return URL.build(path=self._path, encoded=True)
426
427    def __repr__(self) -> str:
428        name = "'" + self.name + "' " if self.name is not None else ""
429        return f"<PlainResource {name} {self._path}>"
430
431
432class DynamicResource(Resource):
433
434    DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
435    DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
436    GOOD = r"[^{}/]+"
437
438    def __init__(self, path: str, *, name: Optional[str] = None) -> None:
439        super().__init__(name=name)
440        pattern = ""
441        formatter = ""
442        for part in ROUTE_RE.split(path):
443            match = self.DYN.fullmatch(part)
444            if match:
445                pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
446                formatter += "{" + match.group("var") + "}"
447                continue
448
449            match = self.DYN_WITH_RE.fullmatch(part)
450            if match:
451                pattern += "(?P<{var}>{re})".format(**match.groupdict())
452                formatter += "{" + match.group("var") + "}"
453                continue
454
455            if "{" in part or "}" in part:
456                raise ValueError(f"Invalid path '{path}'['{part}']")
457
458            part = _requote_path(part)
459            formatter += part
460            pattern += re.escape(part)
461
462        try:
463            compiled = re.compile(pattern)
464        except re.error as exc:
465            raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
466        assert compiled.pattern.startswith(PATH_SEP)
467        assert formatter.startswith("/")
468        self._pattern = compiled
469        self._formatter = formatter
470
471    @property
472    def canonical(self) -> str:
473        return self._formatter
474
475    def add_prefix(self, prefix: str) -> None:
476        assert prefix.startswith("/")
477        assert not prefix.endswith("/")
478        assert len(prefix) > 1
479        self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
480        self._formatter = prefix + self._formatter
481
482    def _match(self, path: str) -> Optional[Dict[str, str]]:
483        match = self._pattern.fullmatch(path)
484        if match is None:
485            return None
486        else:
487            return {
488                key: _unquote_path(value) for key, value in match.groupdict().items()
489            }
490
491    def raw_match(self, path: str) -> bool:
492        return self._formatter == path
493
494    def get_info(self) -> _InfoDict:
495        return {"formatter": self._formatter, "pattern": self._pattern}
496
497    def url_for(self, **parts: str) -> URL:
498        url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
499        return URL.build(path=url, encoded=True)
500
501    def __repr__(self) -> str:
502        name = "'" + self.name + "' " if self.name is not None else ""
503        return "<DynamicResource {name} {formatter}>".format(
504            name=name, formatter=self._formatter
505        )
506
507
508class PrefixResource(AbstractResource):
509    def __init__(self, prefix: str, *, name: Optional[str] = None) -> None:
510        assert not prefix or prefix.startswith("/"), prefix
511        assert prefix in ("", "/") or not prefix.endswith("/"), prefix
512        super().__init__(name=name)
513        self._prefix = _requote_path(prefix)
514
515    @property
516    def canonical(self) -> str:
517        return self._prefix
518
519    def add_prefix(self, prefix: str) -> None:
520        assert prefix.startswith("/")
521        assert not prefix.endswith("/")
522        assert len(prefix) > 1
523        self._prefix = prefix + self._prefix
524
525    def raw_match(self, prefix: str) -> bool:
526        return False
527
528    # TODO: impl missing abstract methods
529
530
531class StaticResource(PrefixResource):
532    VERSION_KEY = "v"
533
534    def __init__(
535        self,
536        prefix: str,
537        directory: PathLike,
538        *,
539        name: Optional[str] = None,
540        expect_handler: Optional[_ExpectHandler] = None,
541        chunk_size: int = 256 * 1024,
542        show_index: bool = False,
543        follow_symlinks: bool = False,
544        append_version: bool = False,
545    ) -> None:
546        super().__init__(prefix, name=name)
547        try:
548            directory = Path(directory)
549            if str(directory).startswith("~"):
550                directory = Path(os.path.expanduser(str(directory)))
551            directory = directory.resolve()
552            if not directory.is_dir():
553                raise ValueError("Not a directory")
554        except (FileNotFoundError, ValueError) as error:
555            raise ValueError(f"No directory exists at '{directory}'") from error
556        self._directory = directory
557        self._show_index = show_index
558        self._chunk_size = chunk_size
559        self._follow_symlinks = follow_symlinks
560        self._expect_handler = expect_handler
561        self._append_version = append_version
562
563        self._routes = {
564            "GET": ResourceRoute(
565                "GET", self._handle, self, expect_handler=expect_handler
566            ),
567            "HEAD": ResourceRoute(
568                "HEAD", self._handle, self, expect_handler=expect_handler
569            ),
570        }
571
572    def url_for(  # type: ignore
573        self,
574        *,
575        filename: Union[str, Path],
576        append_version: Optional[bool] = None,
577    ) -> URL:
578        if append_version is None:
579            append_version = self._append_version
580        if isinstance(filename, Path):
581            filename = str(filename)
582        filename = filename.lstrip("/")
583
584        url = URL.build(path=self._prefix, encoded=True)
585        # filename is not encoded
586        if YARL_VERSION < (1, 6):
587            url = url / filename.replace("%", "%25")
588        else:
589            url = url / filename
590
591        if append_version:
592            try:
593                filepath = self._directory.joinpath(filename).resolve()
594                if not self._follow_symlinks:
595                    filepath.relative_to(self._directory)
596            except (ValueError, FileNotFoundError):
597                # ValueError for case when path point to symlink
598                # with follow_symlinks is False
599                return url  # relatively safe
600            if filepath.is_file():
601                # TODO cache file content
602                # with file watcher for cache invalidation
603                with filepath.open("rb") as f:
604                    file_bytes = f.read()
605                h = self._get_file_hash(file_bytes)
606                url = url.with_query({self.VERSION_KEY: h})
607                return url
608        return url
609
610    @staticmethod
611    def _get_file_hash(byte_array: bytes) -> str:
612        m = hashlib.sha256()  # todo sha256 can be configurable param
613        m.update(byte_array)
614        b64 = base64.urlsafe_b64encode(m.digest())
615        return b64.decode("ascii")
616
617    def get_info(self) -> _InfoDict:
618        return {
619            "directory": self._directory,
620            "prefix": self._prefix,
621            "routes": self._routes,
622        }
623
624    def set_options_route(self, handler: _WebHandler) -> None:
625        if "OPTIONS" in self._routes:
626            raise RuntimeError("OPTIONS route was set already")
627        self._routes["OPTIONS"] = ResourceRoute(
628            "OPTIONS", handler, self, expect_handler=self._expect_handler
629        )
630
631    async def resolve(self, request: Request) -> _Resolve:
632        path = request.rel_url.raw_path
633        method = request.method
634        allowed_methods = set(self._routes)
635        if not path.startswith(self._prefix):
636            return None, set()
637
638        if method not in allowed_methods:
639            return None, allowed_methods
640
641        match_dict = {"filename": _unquote_path(path[len(self._prefix) + 1 :])}
642        return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
643
644    def __len__(self) -> int:
645        return len(self._routes)
646
647    def __iter__(self) -> Iterator[AbstractRoute]:
648        return iter(self._routes.values())
649
650    async def _handle(self, request: Request) -> StreamResponse:
651        rel_url = request.match_info["filename"]
652        try:
653            filename = Path(rel_url)
654            if filename.anchor:
655                # rel_url is an absolute name like
656                # /static/\\machine_name\c$ or /static/D:\path
657                # where the static dir is totally different
658                raise HTTPForbidden()
659            filepath = self._directory.joinpath(filename).resolve()
660            if not self._follow_symlinks:
661                filepath.relative_to(self._directory)
662        except (ValueError, FileNotFoundError) as error:
663            # relatively safe
664            raise HTTPNotFound() from error
665        except HTTPForbidden:
666            raise
667        except Exception as error:
668            # perm error or other kind!
669            request.app.logger.exception(error)
670            raise HTTPNotFound() from error
671
672        # on opening a dir, load its contents if allowed
673        if filepath.is_dir():
674            if self._show_index:
675                try:
676                    return Response(
677                        text=self._directory_as_html(filepath), content_type="text/html"
678                    )
679                except PermissionError:
680                    raise HTTPForbidden()
681            else:
682                raise HTTPForbidden()
683        elif filepath.is_file():
684            return FileResponse(filepath, chunk_size=self._chunk_size)
685        else:
686            raise HTTPNotFound
687
688    def _directory_as_html(self, filepath: Path) -> str:
689        # returns directory's index as html
690
691        # sanity check
692        assert filepath.is_dir()
693
694        relative_path_to_dir = filepath.relative_to(self._directory).as_posix()
695        index_of = f"Index of /{relative_path_to_dir}"
696        h1 = f"<h1>{index_of}</h1>"
697
698        index_list = []
699        dir_index = filepath.iterdir()
700        for _file in sorted(dir_index):
701            # show file url as relative to static path
702            rel_path = _file.relative_to(self._directory).as_posix()
703            file_url = self._prefix + "/" + rel_path
704
705            # if file is a directory, add '/' to the end of the name
706            if _file.is_dir():
707                file_name = f"{_file.name}/"
708            else:
709                file_name = _file.name
710
711            index_list.append(
712                '<li><a href="{url}">{name}</a></li>'.format(
713                    url=file_url, name=file_name
714                )
715            )
716        ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
717        body = f"<body>\n{h1}\n{ul}\n</body>"
718
719        head_str = f"<head>\n<title>{index_of}</title>\n</head>"
720        html = f"<html>\n{head_str}\n{body}\n</html>"
721
722        return html
723
724    def __repr__(self) -> str:
725        name = "'" + self.name + "'" if self.name is not None else ""
726        return "<StaticResource {name} {path} -> {directory!r}>".format(
727            name=name, path=self._prefix, directory=self._directory
728        )
729
730
731class PrefixedSubAppResource(PrefixResource):
732    def __init__(self, prefix: str, app: "Application") -> None:
733        super().__init__(prefix)
734        self._app = app
735        for resource in app.router.resources():
736            resource.add_prefix(prefix)
737
738    def add_prefix(self, prefix: str) -> None:
739        super().add_prefix(prefix)
740        for resource in self._app.router.resources():
741            resource.add_prefix(prefix)
742
743    def url_for(self, *args: str, **kwargs: str) -> URL:
744        raise RuntimeError(".url_for() is not supported " "by sub-application root")
745
746    def get_info(self) -> _InfoDict:
747        return {"app": self._app, "prefix": self._prefix}
748
749    async def resolve(self, request: Request) -> _Resolve:
750        if (
751            not request.url.raw_path.startswith(self._prefix + "/")
752            and request.url.raw_path != self._prefix
753        ):
754            return None, set()
755        match_info = await self._app.router.resolve(request)
756        match_info.add_app(self._app)
757        if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
758            methods = match_info.http_exception.allowed_methods
759        else:
760            methods = set()
761        return match_info, methods
762
763    def __len__(self) -> int:
764        return len(self._app.router.routes())
765
766    def __iter__(self) -> Iterator[AbstractRoute]:
767        return iter(self._app.router.routes())
768
769    def __repr__(self) -> str:
770        return "<PrefixedSubAppResource {prefix} -> {app!r}>".format(
771            prefix=self._prefix, app=self._app
772        )
773
774
775class AbstractRuleMatching(abc.ABC):
776    @abc.abstractmethod  # pragma: no branch
777    async def match(self, request: Request) -> bool:
778        """Return bool if the request satisfies the criteria"""
779
780    @abc.abstractmethod  # pragma: no branch
781    def get_info(self) -> _InfoDict:
782        """Return a dict with additional info useful for introspection"""
783
784    @property
785    @abc.abstractmethod  # pragma: no branch
786    def canonical(self) -> str:
787        """Return a str"""
788
789
790class Domain(AbstractRuleMatching):
791    re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
792
793    def __init__(self, domain: str) -> None:
794        super().__init__()
795        self._domain = self.validation(domain)
796
797    @property
798    def canonical(self) -> str:
799        return self._domain
800
801    def validation(self, domain: str) -> str:
802        if not isinstance(domain, str):
803            raise TypeError("Domain must be str")
804        domain = domain.rstrip(".").lower()
805        if not domain:
806            raise ValueError("Domain cannot be empty")
807        elif "://" in domain:
808            raise ValueError("Scheme not supported")
809        url = URL("http://" + domain)
810        assert url.raw_host is not None
811        if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
812            raise ValueError("Domain not valid")
813        if url.port == 80:
814            return url.raw_host
815        return f"{url.raw_host}:{url.port}"
816
817    async def match(self, request: Request) -> bool:
818        host = request.headers.get(hdrs.HOST)
819        if not host:
820            return False
821        return self.match_domain(host)
822
823    def match_domain(self, host: str) -> bool:
824        return host.lower() == self._domain
825
826    def get_info(self) -> _InfoDict:
827        return {"domain": self._domain}
828
829
830class MaskDomain(Domain):
831    re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
832
833    def __init__(self, domain: str) -> None:
834        super().__init__(domain)
835        mask = self._domain.replace(".", r"\.").replace("*", ".*")
836        self._mask = re.compile(mask)
837
838    @property
839    def canonical(self) -> str:
840        return self._mask.pattern
841
842    def match_domain(self, host: str) -> bool:
843        return self._mask.fullmatch(host) is not None
844
845
846class MatchedSubAppResource(PrefixedSubAppResource):
847    def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
848        AbstractResource.__init__(self)
849        self._prefix = ""
850        self._app = app
851        self._rule = rule
852
853    @property
854    def canonical(self) -> str:
855        return self._rule.canonical
856
857    def get_info(self) -> _InfoDict:
858        return {"app": self._app, "rule": self._rule}
859
860    async def resolve(self, request: Request) -> _Resolve:
861        if not await self._rule.match(request):
862            return None, set()
863        match_info = await self._app.router.resolve(request)
864        match_info.add_app(self._app)
865        if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
866            methods = match_info.http_exception.allowed_methods
867        else:
868            methods = set()
869        return match_info, methods
870
871    def __repr__(self) -> str:
872        return "<MatchedSubAppResource -> {app!r}>" "".format(app=self._app)
873
874
875class ResourceRoute(AbstractRoute):
876    """A route with resource"""
877
878    def __init__(
879        self,
880        method: str,
881        handler: Union[_WebHandler, Type[AbstractView]],
882        resource: AbstractResource,
883        *,
884        expect_handler: Optional[_ExpectHandler] = None,
885    ) -> None:
886        super().__init__(
887            method, handler, expect_handler=expect_handler, resource=resource
888        )
889
890    def __repr__(self) -> str:
891        return "<ResourceRoute [{method}] {resource} -> {handler!r}".format(
892            method=self.method, resource=self._resource, handler=self.handler
893        )
894
895    @property
896    def name(self) -> Optional[str]:
897        if self._resource is None:
898            return None
899        return self._resource.name
900
901    def url_for(self, *args: str, **kwargs: str) -> URL:
902        """Construct url for route with additional params."""
903        assert self._resource is not None
904        return self._resource.url_for(*args, **kwargs)
905
906    def get_info(self) -> _InfoDict:
907        assert self._resource is not None
908        return self._resource.get_info()
909
910
911class SystemRoute(AbstractRoute):
912    def __init__(self, http_exception: HTTPException) -> None:
913        super().__init__(hdrs.METH_ANY, self._handle)
914        self._http_exception = http_exception
915
916    def url_for(self, *args: str, **kwargs: str) -> URL:
917        raise RuntimeError(".url_for() is not allowed for SystemRoute")
918
919    @property
920    def name(self) -> Optional[str]:
921        return None
922
923    def get_info(self) -> _InfoDict:
924        return {"http_exception": self._http_exception}
925
926    async def _handle(self, request: Request) -> StreamResponse:
927        raise self._http_exception
928
929    @property
930    def status(self) -> int:
931        return self._http_exception.status
932
933    @property
934    def reason(self) -> str:
935        return self._http_exception.reason
936
937    def __repr__(self) -> str:
938        return "<SystemRoute {self.status}: {self.reason}>".format(self=self)
939
940
941class View(AbstractView):
942    async def _iter(self) -> StreamResponse:
943        if self.request.method not in hdrs.METH_ALL:
944            self._raise_allowed_methods()
945        method = getattr(self, self.request.method.lower(), None)
946        if method is None:
947            self._raise_allowed_methods()
948        resp = await method()
949        return resp
950
951    def __await__(self) -> Generator[Any, None, StreamResponse]:
952        return self._iter().__await__()
953
954    def _raise_allowed_methods(self) -> None:
955        allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
956        raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
957
958
959class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
960    def __init__(self, resources: List[AbstractResource]) -> None:
961        self._resources = resources
962
963    def __len__(self) -> int:
964        return len(self._resources)
965
966    def __iter__(self) -> Iterator[AbstractResource]:
967        yield from self._resources
968
969    def __contains__(self, resource: object) -> bool:
970        return resource in self._resources
971
972
973class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
974    def __init__(self, resources: List[AbstractResource]):
975        self._routes = []  # type: List[AbstractRoute]
976        for resource in resources:
977            for route in resource:
978                self._routes.append(route)
979
980    def __len__(self) -> int:
981        return len(self._routes)
982
983    def __iter__(self) -> Iterator[AbstractRoute]:
984        yield from self._routes
985
986    def __contains__(self, route: object) -> bool:
987        return route in self._routes
988
989
990class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
991
992    NAME_SPLIT_RE = re.compile(r"[.:-]")
993
994    def __init__(self) -> None:
995        super().__init__()
996        self._resources = []  # type: List[AbstractResource]
997        self._named_resources = {}  # type: Dict[str, AbstractResource]
998
999    async def resolve(self, request: Request) -> AbstractMatchInfo:
1000        method = request.method
1001        allowed_methods = set()  # type: Set[str]
1002
1003        for resource in self._resources:
1004            match_dict, allowed = await resource.resolve(request)
1005            if match_dict is not None:
1006                return match_dict
1007            else:
1008                allowed_methods |= allowed
1009        else:
1010            if allowed_methods:
1011                return MatchInfoError(HTTPMethodNotAllowed(method, allowed_methods))
1012            else:
1013                return MatchInfoError(HTTPNotFound())
1014
1015    def __iter__(self) -> Iterator[str]:
1016        return iter(self._named_resources)
1017
1018    def __len__(self) -> int:
1019        return len(self._named_resources)
1020
1021    def __contains__(self, resource: object) -> bool:
1022        return resource in self._named_resources
1023
1024    def __getitem__(self, name: str) -> AbstractResource:
1025        return self._named_resources[name]
1026
1027    def resources(self) -> ResourcesView:
1028        return ResourcesView(self._resources)
1029
1030    def routes(self) -> RoutesView:
1031        return RoutesView(self._resources)
1032
1033    def named_resources(self) -> Mapping[str, AbstractResource]:
1034        return MappingProxyType(self._named_resources)
1035
1036    def register_resource(self, resource: AbstractResource) -> None:
1037        assert isinstance(
1038            resource, AbstractResource
1039        ), f"Instance of AbstractResource class is required, got {resource!r}"
1040        if self.frozen:
1041            raise RuntimeError("Cannot register a resource into frozen router.")
1042
1043        name = resource.name
1044
1045        if name is not None:
1046            parts = self.NAME_SPLIT_RE.split(name)
1047            for part in parts:
1048                if keyword.iskeyword(part):
1049                    raise ValueError(
1050                        f"Incorrect route name {name!r}, "
1051                        "python keywords cannot be used "
1052                        "for route name"
1053                    )
1054                if not part.isidentifier():
1055                    raise ValueError(
1056                        "Incorrect route name {!r}, "
1057                        "the name should be a sequence of "
1058                        "python identifiers separated "
1059                        "by dash, dot or column".format(name)
1060                    )
1061            if name in self._named_resources:
1062                raise ValueError(
1063                    "Duplicate {!r}, "
1064                    "already handled by {!r}".format(name, self._named_resources[name])
1065                )
1066            self._named_resources[name] = resource
1067        self._resources.append(resource)
1068
1069    def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource:
1070        if path and not path.startswith("/"):
1071            raise ValueError("path should be started with / or be empty")
1072        # Reuse last added resource if path and name are the same
1073        if self._resources:
1074            resource = self._resources[-1]
1075            if resource.name == name and resource.raw_match(path):
1076                return cast(Resource, resource)
1077        if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
1078            resource = PlainResource(_requote_path(path), name=name)
1079            self.register_resource(resource)
1080            return resource
1081        resource = DynamicResource(path, name=name)
1082        self.register_resource(resource)
1083        return resource
1084
1085    def add_route(
1086        self,
1087        method: str,
1088        path: str,
1089        handler: Union[_WebHandler, Type[AbstractView]],
1090        *,
1091        name: Optional[str] = None,
1092        expect_handler: Optional[_ExpectHandler] = None,
1093    ) -> AbstractRoute:
1094        resource = self.add_resource(path, name=name)
1095        return resource.add_route(method, handler, expect_handler=expect_handler)
1096
1097    def add_static(
1098        self,
1099        prefix: str,
1100        path: PathLike,
1101        *,
1102        name: Optional[str] = None,
1103        expect_handler: Optional[_ExpectHandler] = None,
1104        chunk_size: int = 256 * 1024,
1105        show_index: bool = False,
1106        follow_symlinks: bool = False,
1107        append_version: bool = False,
1108    ) -> AbstractResource:
1109        """Add static files view.
1110
1111        prefix - url prefix
1112        path - folder with files
1113
1114        """
1115        assert prefix.startswith("/")
1116        if prefix.endswith("/"):
1117            prefix = prefix[:-1]
1118        resource = StaticResource(
1119            prefix,
1120            path,
1121            name=name,
1122            expect_handler=expect_handler,
1123            chunk_size=chunk_size,
1124            show_index=show_index,
1125            follow_symlinks=follow_symlinks,
1126            append_version=append_version,
1127        )
1128        self.register_resource(resource)
1129        return resource
1130
1131    def add_head(self, path: str, handler: _WebHandler, **kwargs: Any) -> AbstractRoute:
1132        """
1133        Shortcut for add_route with method HEAD
1134        """
1135        return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
1136
1137    def add_options(
1138        self, path: str, handler: _WebHandler, **kwargs: Any
1139    ) -> AbstractRoute:
1140        """
1141        Shortcut for add_route with method OPTIONS
1142        """
1143        return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
1144
1145    def add_get(
1146        self,
1147        path: str,
1148        handler: _WebHandler,
1149        *,
1150        name: Optional[str] = None,
1151        allow_head: bool = True,
1152        **kwargs: Any,
1153    ) -> AbstractRoute:
1154        """
1155        Shortcut for add_route with method GET, if allow_head is true another
1156        route is added allowing head requests to the same endpoint
1157        """
1158        resource = self.add_resource(path, name=name)
1159        if allow_head:
1160            resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
1161        return resource.add_route(hdrs.METH_GET, handler, **kwargs)
1162
1163    def add_post(self, path: str, handler: _WebHandler, **kwargs: Any) -> AbstractRoute:
1164        """
1165        Shortcut for add_route with method POST
1166        """
1167        return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
1168
1169    def add_put(self, path: str, handler: _WebHandler, **kwargs: Any) -> AbstractRoute:
1170        """
1171        Shortcut for add_route with method PUT
1172        """
1173        return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
1174
1175    def add_patch(
1176        self, path: str, handler: _WebHandler, **kwargs: Any
1177    ) -> AbstractRoute:
1178        """
1179        Shortcut for add_route with method PATCH
1180        """
1181        return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
1182
1183    def add_delete(
1184        self, path: str, handler: _WebHandler, **kwargs: Any
1185    ) -> AbstractRoute:
1186        """
1187        Shortcut for add_route with method DELETE
1188        """
1189        return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
1190
1191    def add_view(
1192        self, path: str, handler: Type[AbstractView], **kwargs: Any
1193    ) -> AbstractRoute:
1194        """
1195        Shortcut for add_route with ANY methods for a class-based view
1196        """
1197        return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
1198
1199    def freeze(self) -> None:
1200        super().freeze()
1201        for resource in self._resources:
1202            resource.freeze()
1203
1204    def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
1205        """Append routes to route table.
1206
1207        Parameter should be a sequence of RouteDef objects.
1208
1209        Returns a list of registered AbstractRoute instances.
1210        """
1211        registered_routes = []
1212        for route_def in routes:
1213            registered_routes.extend(route_def.register(self))
1214        return registered_routes
1215
1216
1217def _quote_path(value: str) -> str:
1218    if YARL_VERSION < (1, 6):
1219        value = value.replace("%", "%25")
1220    return URL.build(path=value, encoded=False).raw_path
1221
1222
1223def _unquote_path(value: str) -> str:
1224    return URL.build(path=value, encoded=True).path
1225
1226
1227def _requote_path(value: str) -> str:
1228    # Quote non-ascii characters and other characters which must be quoted,
1229    # but preserve existing %-sequences.
1230    result = _quote_path(value)
1231    if "%" in value:
1232        result = result.replace("%25", "%")
1233    return result
1234