1"""The optional bytecode cache system. This is useful if you have very
2complex template situations and the compilation of all those templates
3slows down your application too much.
4
5Situations where this is useful are often forking web applications that
6are initialized on the first request.
7"""
8import errno
9import fnmatch
10import marshal
11import os
12import pickle
13import stat
14import sys
15import tempfile
16import typing as t
17from hashlib import sha1
18from io import BytesIO
19from types import CodeType
20
21if t.TYPE_CHECKING:
22    import typing_extensions as te
23    from .environment import Environment
24
25    class _MemcachedClient(te.Protocol):
26        def get(self, key: str) -> bytes:
27            ...
28
29        def set(self, key: str, value: bytes, timeout: t.Optional[int] = None) -> None:
30            ...
31
32
33bc_version = 5
34# Magic bytes to identify Jinja bytecode cache files. Contains the
35# Python major and minor version to avoid loading incompatible bytecode
36# if a project upgrades its Python version.
37bc_magic = (
38    b"j2"
39    + pickle.dumps(bc_version, 2)
40    + pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1], 2)
41)
42
43
44class Bucket:
45    """Buckets are used to store the bytecode for one template.  It's created
46    and initialized by the bytecode cache and passed to the loading functions.
47
48    The buckets get an internal checksum from the cache assigned and use this
49    to automatically reject outdated cache material.  Individual bytecode
50    cache subclasses don't have to care about cache invalidation.
51    """
52
53    def __init__(self, environment: "Environment", key: str, checksum: str) -> None:
54        self.environment = environment
55        self.key = key
56        self.checksum = checksum
57        self.reset()
58
59    def reset(self) -> None:
60        """Resets the bucket (unloads the bytecode)."""
61        self.code: t.Optional[CodeType] = None
62
63    def load_bytecode(self, f: t.BinaryIO) -> None:
64        """Loads bytecode from a file or file like object."""
65        # make sure the magic header is correct
66        magic = f.read(len(bc_magic))
67        if magic != bc_magic:
68            self.reset()
69            return
70        # the source code of the file changed, we need to reload
71        checksum = pickle.load(f)
72        if self.checksum != checksum:
73            self.reset()
74            return
75        # if marshal_load fails then we need to reload
76        try:
77            self.code = marshal.load(f)
78        except (EOFError, ValueError, TypeError):
79            self.reset()
80            return
81
82    def write_bytecode(self, f: t.BinaryIO) -> None:
83        """Dump the bytecode into the file or file like object passed."""
84        if self.code is None:
85            raise TypeError("can't write empty bucket")
86        f.write(bc_magic)
87        pickle.dump(self.checksum, f, 2)
88        marshal.dump(self.code, f)
89
90    def bytecode_from_string(self, string: bytes) -> None:
91        """Load bytecode from bytes."""
92        self.load_bytecode(BytesIO(string))
93
94    def bytecode_to_string(self) -> bytes:
95        """Return the bytecode as bytes."""
96        out = BytesIO()
97        self.write_bytecode(out)
98        return out.getvalue()
99
100
101class BytecodeCache:
102    """To implement your own bytecode cache you have to subclass this class
103    and override :meth:`load_bytecode` and :meth:`dump_bytecode`.  Both of
104    these methods are passed a :class:`~jinja2.bccache.Bucket`.
105
106    A very basic bytecode cache that saves the bytecode on the file system::
107
108        from os import path
109
110        class MyCache(BytecodeCache):
111
112            def __init__(self, directory):
113                self.directory = directory
114
115            def load_bytecode(self, bucket):
116                filename = path.join(self.directory, bucket.key)
117                if path.exists(filename):
118                    with open(filename, 'rb') as f:
119                        bucket.load_bytecode(f)
120
121            def dump_bytecode(self, bucket):
122                filename = path.join(self.directory, bucket.key)
123                with open(filename, 'wb') as f:
124                    bucket.write_bytecode(f)
125
126    A more advanced version of a filesystem based bytecode cache is part of
127    Jinja.
128    """
129
130    def load_bytecode(self, bucket: Bucket) -> None:
131        """Subclasses have to override this method to load bytecode into a
132        bucket.  If they are not able to find code in the cache for the
133        bucket, it must not do anything.
134        """
135        raise NotImplementedError()
136
137    def dump_bytecode(self, bucket: Bucket) -> None:
138        """Subclasses have to override this method to write the bytecode
139        from a bucket back to the cache.  If it unable to do so it must not
140        fail silently but raise an exception.
141        """
142        raise NotImplementedError()
143
144    def clear(self) -> None:
145        """Clears the cache.  This method is not used by Jinja but should be
146        implemented to allow applications to clear the bytecode cache used
147        by a particular environment.
148        """
149
150    def get_cache_key(
151        self, name: str, filename: t.Optional[t.Union[str]] = None
152    ) -> str:
153        """Returns the unique hash key for this template name."""
154        hash = sha1(name.encode("utf-8"))
155
156        if filename is not None:
157            hash.update(f"|{filename}".encode("utf-8"))
158
159        return hash.hexdigest()
160
161    def get_source_checksum(self, source: str) -> str:
162        """Returns a checksum for the source."""
163        return sha1(source.encode("utf-8")).hexdigest()
164
165    def get_bucket(
166        self,
167        environment: "Environment",
168        name: str,
169        filename: t.Optional[str],
170        source: str,
171    ) -> Bucket:
172        """Return a cache bucket for the given template.  All arguments are
173        mandatory but filename may be `None`.
174        """
175        key = self.get_cache_key(name, filename)
176        checksum = self.get_source_checksum(source)
177        bucket = Bucket(environment, key, checksum)
178        self.load_bytecode(bucket)
179        return bucket
180
181    def set_bucket(self, bucket: Bucket) -> None:
182        """Put the bucket into the cache."""
183        self.dump_bytecode(bucket)
184
185
186class FileSystemBytecodeCache(BytecodeCache):
187    """A bytecode cache that stores bytecode on the filesystem.  It accepts
188    two arguments: The directory where the cache items are stored and a
189    pattern string that is used to build the filename.
190
191    If no directory is specified a default cache directory is selected.  On
192    Windows the user's temp directory is used, on UNIX systems a directory
193    is created for the user in the system temp directory.
194
195    The pattern can be used to have multiple separate caches operate on the
196    same directory.  The default pattern is ``'__jinja2_%s.cache'``.  ``%s``
197    is replaced with the cache key.
198
199    >>> bcc = FileSystemBytecodeCache('/tmp/jinja_cache', '%s.cache')
200
201    This bytecode cache supports clearing of the cache using the clear method.
202    """
203
204    def __init__(
205        self, directory: t.Optional[str] = None, pattern: str = "__jinja2_%s.cache"
206    ) -> None:
207        if directory is None:
208            directory = self._get_default_cache_dir()
209        self.directory = directory
210        self.pattern = pattern
211
212    def _get_default_cache_dir(self) -> str:
213        def _unsafe_dir() -> "te.NoReturn":
214            raise RuntimeError(
215                "Cannot determine safe temp directory.  You "
216                "need to explicitly provide one."
217            )
218
219        tmpdir = tempfile.gettempdir()
220
221        # On windows the temporary directory is used specific unless
222        # explicitly forced otherwise.  We can just use that.
223        if os.name == "nt":
224            return tmpdir
225        if not hasattr(os, "getuid"):
226            _unsafe_dir()
227
228        dirname = f"_jinja2-cache-{os.getuid()}"
229        actual_dir = os.path.join(tmpdir, dirname)
230
231        try:
232            os.mkdir(actual_dir, stat.S_IRWXU)
233        except OSError as e:
234            if e.errno != errno.EEXIST:
235                raise
236        try:
237            os.chmod(actual_dir, stat.S_IRWXU)
238            actual_dir_stat = os.lstat(actual_dir)
239            if (
240                actual_dir_stat.st_uid != os.getuid()
241                or not stat.S_ISDIR(actual_dir_stat.st_mode)
242                or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
243            ):
244                _unsafe_dir()
245        except OSError as e:
246            if e.errno != errno.EEXIST:
247                raise
248
249        actual_dir_stat = os.lstat(actual_dir)
250        if (
251            actual_dir_stat.st_uid != os.getuid()
252            or not stat.S_ISDIR(actual_dir_stat.st_mode)
253            or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
254        ):
255            _unsafe_dir()
256
257        return actual_dir
258
259    def _get_cache_filename(self, bucket: Bucket) -> str:
260        return os.path.join(self.directory, self.pattern % (bucket.key,))
261
262    def load_bytecode(self, bucket: Bucket) -> None:
263        filename = self._get_cache_filename(bucket)
264
265        if os.path.exists(filename):
266            with open(filename, "rb") as f:
267                bucket.load_bytecode(f)
268
269    def dump_bytecode(self, bucket: Bucket) -> None:
270        with open(self._get_cache_filename(bucket), "wb") as f:
271            bucket.write_bytecode(f)
272
273    def clear(self) -> None:
274        # imported lazily here because google app-engine doesn't support
275        # write access on the file system and the function does not exist
276        # normally.
277        from os import remove
278
279        files = fnmatch.filter(os.listdir(self.directory), self.pattern % ("*",))
280        for filename in files:
281            try:
282                remove(os.path.join(self.directory, filename))
283            except OSError:
284                pass
285
286
287class MemcachedBytecodeCache(BytecodeCache):
288    """This class implements a bytecode cache that uses a memcache cache for
289    storing the information.  It does not enforce a specific memcache library
290    (tummy's memcache or cmemcache) but will accept any class that provides
291    the minimal interface required.
292
293    Libraries compatible with this class:
294
295    -   `cachelib <https://github.com/pallets/cachelib>`_
296    -   `python-memcached <https://pypi.org/project/python-memcached/>`_
297
298    (Unfortunately the django cache interface is not compatible because it
299    does not support storing binary data, only text. You can however pass
300    the underlying cache client to the bytecode cache which is available
301    as `django.core.cache.cache._client`.)
302
303    The minimal interface for the client passed to the constructor is this:
304
305    .. class:: MinimalClientInterface
306
307        .. method:: set(key, value[, timeout])
308
309            Stores the bytecode in the cache.  `value` is a string and
310            `timeout` the timeout of the key.  If timeout is not provided
311            a default timeout or no timeout should be assumed, if it's
312            provided it's an integer with the number of seconds the cache
313            item should exist.
314
315        .. method:: get(key)
316
317            Returns the value for the cache key.  If the item does not
318            exist in the cache the return value must be `None`.
319
320    The other arguments to the constructor are the prefix for all keys that
321    is added before the actual cache key and the timeout for the bytecode in
322    the cache system.  We recommend a high (or no) timeout.
323
324    This bytecode cache does not support clearing of used items in the cache.
325    The clear method is a no-operation function.
326
327    .. versionadded:: 2.7
328       Added support for ignoring memcache errors through the
329       `ignore_memcache_errors` parameter.
330    """
331
332    def __init__(
333        self,
334        client: "_MemcachedClient",
335        prefix: str = "jinja2/bytecode/",
336        timeout: t.Optional[int] = None,
337        ignore_memcache_errors: bool = True,
338    ):
339        self.client = client
340        self.prefix = prefix
341        self.timeout = timeout
342        self.ignore_memcache_errors = ignore_memcache_errors
343
344    def load_bytecode(self, bucket: Bucket) -> None:
345        try:
346            code = self.client.get(self.prefix + bucket.key)
347        except Exception:
348            if not self.ignore_memcache_errors:
349                raise
350        else:
351            bucket.bytecode_from_string(code)
352
353    def dump_bytecode(self, bucket: Bucket) -> None:
354        key = self.prefix + bucket.key
355        value = bucket.bytecode_to_string()
356
357        try:
358            if self.timeout is not None:
359                self.client.set(key, value, self.timeout)
360            else:
361                self.client.set(key, value)
362        except Exception:
363            if not self.ignore_memcache_errors:
364                raise
365