1"""
2Application Profiler
3====================
4
5This module provides a middleware that profiles each request with the
6:mod:`cProfile` module. This can help identify bottlenecks in your code
7that may be slowing down your application.
8
9.. autoclass:: ProfilerMiddleware
10
11:copyright: 2007 Pallets
12:license: BSD-3-Clause
13"""
14import os.path
15import sys
16import time
17import typing as t
18from pstats import Stats
19
20try:
21    from cProfile import Profile
22except ImportError:
23    from profile import Profile  # type: ignore
24
25if t.TYPE_CHECKING:
26    from _typeshed.wsgi import StartResponse
27    from _typeshed.wsgi import WSGIApplication
28    from _typeshed.wsgi import WSGIEnvironment
29
30
31class ProfilerMiddleware:
32    """Wrap a WSGI application and profile the execution of each
33    request. Responses are buffered so that timings are more exact.
34
35    If ``stream`` is given, :class:`pstats.Stats` are written to it
36    after each request. If ``profile_dir`` is given, :mod:`cProfile`
37    data files are saved to that directory, one file per request.
38
39    The filename can be customized by passing ``filename_format``. If
40    it is a string, it will be formatted using :meth:`str.format` with
41    the following fields available:
42
43    -   ``{method}`` - The request method; GET, POST, etc.
44    -   ``{path}`` - The request path or 'root' should one not exist.
45    -   ``{elapsed}`` - The elapsed time of the request.
46    -   ``{time}`` - The time of the request.
47
48    If it is a callable, it will be called with the WSGI ``environ``
49    dict and should return a filename.
50
51    :param app: The WSGI application to wrap.
52    :param stream: Write stats to this stream. Disable with ``None``.
53    :param sort_by: A tuple of columns to sort stats by. See
54        :meth:`pstats.Stats.sort_stats`.
55    :param restrictions: A tuple of restrictions to filter stats by. See
56        :meth:`pstats.Stats.print_stats`.
57    :param profile_dir: Save profile data files to this directory.
58    :param filename_format: Format string for profile data file names,
59        or a callable returning a name. See explanation above.
60
61    .. code-block:: python
62
63        from werkzeug.middleware.profiler import ProfilerMiddleware
64        app = ProfilerMiddleware(app)
65
66    .. versionchanged:: 0.15
67        Stats are written even if ``profile_dir`` is given, and can be
68        disable by passing ``stream=None``.
69
70    .. versionadded:: 0.15
71        Added ``filename_format``.
72
73    .. versionadded:: 0.9
74        Added ``restrictions`` and ``profile_dir``.
75    """
76
77    def __init__(
78        self,
79        app: "WSGIApplication",
80        stream: t.IO[str] = sys.stdout,
81        sort_by: t.Iterable[str] = ("time", "calls"),
82        restrictions: t.Iterable[t.Union[str, int, float]] = (),
83        profile_dir: t.Optional[str] = None,
84        filename_format: str = "{method}.{path}.{elapsed:.0f}ms.{time:.0f}.prof",
85    ) -> None:
86        self._app = app
87        self._stream = stream
88        self._sort_by = sort_by
89        self._restrictions = restrictions
90        self._profile_dir = profile_dir
91        self._filename_format = filename_format
92
93    def __call__(
94        self, environ: "WSGIEnvironment", start_response: "StartResponse"
95    ) -> t.Iterable[bytes]:
96        response_body: t.List[bytes] = []
97
98        def catching_start_response(status, headers, exc_info=None):  # type: ignore
99            start_response(status, headers, exc_info)
100            return response_body.append
101
102        def runapp() -> None:
103            app_iter = self._app(
104                environ, t.cast("StartResponse", catching_start_response)
105            )
106            response_body.extend(app_iter)
107
108            if hasattr(app_iter, "close"):
109                app_iter.close()  # type: ignore
110
111        profile = Profile()
112        start = time.time()
113        profile.runcall(runapp)
114        body = b"".join(response_body)
115        elapsed = time.time() - start
116
117        if self._profile_dir is not None:
118            if callable(self._filename_format):
119                filename = self._filename_format(environ)
120            else:
121                filename = self._filename_format.format(
122                    method=environ["REQUEST_METHOD"],
123                    path=environ["PATH_INFO"].strip("/").replace("/", ".") or "root",
124                    elapsed=elapsed * 1000.0,
125                    time=time.time(),
126                )
127            filename = os.path.join(self._profile_dir, filename)
128            profile.dump_stats(filename)
129
130        if self._stream is not None:
131            stats = Stats(profile, stream=self._stream)
132            stats.sort_stats(*self._sort_by)
133            print("-" * 80, file=self._stream)
134            path_info = environ.get("PATH_INFO", "")
135            print(f"PATH: {path_info!r}", file=self._stream)
136            stats.print_stats(*self._restrictions)
137            print(f"{'-' * 80}\n", file=self._stream)
138
139        return [body]
140