1"""Interface for accessing the file system with automatic caching.
2
3The idea is to cache the results of any file system state reads during
4a single transaction. This has two main benefits:
5
6* This avoids redundant syscalls, as we won't perform the same OS
7  operations multiple times.
8
9* This makes it easier to reason about concurrent FS updates, as different
10  operations targeting the same paths can't report different state during
11  a transaction.
12
13Note that this only deals with reading state, not writing.
14
15Properties maintained by the API:
16
17* The contents of the file are always from the same or later time compared
18  to the reported mtime of the file, even if mtime is queried after reading
19  a file.
20
21* Repeating an operation produces the same result as the first one during
22  a transaction.
23
24* Call flush() to start a new transaction (flush the caches).
25
26The API is a bit limited. It's easy to add new cached operations, however.
27You should perform all file system reads through the API to actually take
28advantage of the benefits.
29"""
30
31import os
32import stat
33import sys
34from typing import Dict, List, Set
35from mypy.util import hash_digest
36from mypy_extensions import mypyc_attr
37
38
39@mypyc_attr(allow_interpreted_subclasses=True)  # for tests
40class FileSystemCache:
41    def __init__(self) -> None:
42        # The package root is not flushed with the caches.
43        # It is set by set_package_root() below.
44        self.package_root = []  # type: List[str]
45        self.flush()
46
47    def set_package_root(self, package_root: List[str]) -> None:
48        self.package_root = package_root
49
50    def flush(self) -> None:
51        """Start another transaction and empty all caches."""
52        self.stat_cache = {}  # type: Dict[str, os.stat_result]
53        self.stat_error_cache = {}  # type: Dict[str, OSError]
54        self.listdir_cache = {}  # type: Dict[str, List[str]]
55        self.listdir_error_cache = {}  # type: Dict[str, OSError]
56        self.isfile_case_cache = {}  # type: Dict[str, bool]
57        self.exists_case_cache = {}  # type: Dict[str, bool]
58        self.read_cache = {}  # type: Dict[str, bytes]
59        self.read_error_cache = {}  # type: Dict[str, Exception]
60        self.hash_cache = {}  # type: Dict[str, str]
61        self.fake_package_cache = set()  # type: Set[str]
62
63    def stat(self, path: str) -> os.stat_result:
64        if path in self.stat_cache:
65            return self.stat_cache[path]
66        if path in self.stat_error_cache:
67            raise copy_os_error(self.stat_error_cache[path])
68        try:
69            st = os.stat(path)
70        except OSError as err:
71            if self.init_under_package_root(path):
72                try:
73                    return self._fake_init(path)
74                except OSError:
75                    pass
76            # Take a copy to get rid of associated traceback and frame objects.
77            # Just assigning to __traceback__ doesn't free them.
78            self.stat_error_cache[path] = copy_os_error(err)
79            raise err
80        self.stat_cache[path] = st
81        return st
82
83    def init_under_package_root(self, path: str) -> bool:
84        """Is this path an __init__.py under a package root?
85
86        This is used to detect packages that don't contain __init__.py
87        files, which is needed to support Bazel.  The function should
88        only be called for non-existing files.
89
90        It will return True if it refers to a __init__.py file that
91        Bazel would create, so that at runtime Python would think the
92        directory containing it is a package.  For this to work you
93        must pass one or more package roots using the --package-root
94        flag.
95
96        As an exceptional case, any directory that is a package root
97        itself will not be considered to contain a __init__.py file.
98        This is different from the rules Bazel itself applies, but is
99        necessary for mypy to properly distinguish packages from other
100        directories.
101
102        See https://docs.bazel.build/versions/master/be/python.html,
103        where this behavior is described under legacy_create_init.
104        """
105        if not self.package_root:
106            return False
107        dirname, basename = os.path.split(path)
108        if basename != '__init__.py':
109            return False
110        try:
111            st = self.stat(dirname)
112        except OSError:
113            return False
114        else:
115            if not stat.S_ISDIR(st.st_mode):
116                return False
117        ok = False
118        drive, path = os.path.splitdrive(path)  # Ignore Windows drive name
119        if os.path.isabs(path):
120            path = os.path.relpath(path)
121        path = os.path.normpath(path)
122        for root in self.package_root:
123            if path.startswith(root):
124                if path == root + basename:
125                    # A package root itself is never a package.
126                    ok = False
127                    break
128                else:
129                    ok = True
130        return ok
131
132    def _fake_init(self, path: str) -> os.stat_result:
133        """Prime the cache with a fake __init__.py file.
134
135        This makes code that looks for path believe an empty file by
136        that name exists.  Should only be called after
137        init_under_package_root() returns True.
138        """
139        dirname, basename = os.path.split(path)
140        assert basename == '__init__.py', path
141        assert not os.path.exists(path), path  # Not cached!
142        dirname = os.path.normpath(dirname)
143        st = self.stat(dirname)  # May raise OSError
144        # Get stat result as a sequence so we can modify it.
145        # (Alas, typeshed's os.stat_result is not a sequence yet.)
146        tpl = tuple(st)  # type: ignore[arg-type, var-annotated]
147        seq = list(tpl)  # type: List[float]
148        seq[stat.ST_MODE] = stat.S_IFREG | 0o444
149        seq[stat.ST_INO] = 1
150        seq[stat.ST_NLINK] = 1
151        seq[stat.ST_SIZE] = 0
152        tpl = tuple(seq)
153        st = os.stat_result(tpl)
154        self.stat_cache[path] = st
155        # Make listdir() and read() also pretend this file exists.
156        self.fake_package_cache.add(dirname)
157        return st
158
159    def listdir(self, path: str) -> List[str]:
160        path = os.path.normpath(path)
161        if path in self.listdir_cache:
162            res = self.listdir_cache[path]
163            # Check the fake cache.
164            if path in self.fake_package_cache and '__init__.py' not in res:
165                res.append('__init__.py')  # Updates the result as well as the cache
166            return res
167        if path in self.listdir_error_cache:
168            raise copy_os_error(self.listdir_error_cache[path])
169        try:
170            results = os.listdir(path)
171        except OSError as err:
172            # Like above, take a copy to reduce memory use.
173            self.listdir_error_cache[path] = copy_os_error(err)
174            raise err
175        self.listdir_cache[path] = results
176        # Check the fake cache.
177        if path in self.fake_package_cache and '__init__.py' not in results:
178            results.append('__init__.py')
179        return results
180
181    def isfile(self, path: str) -> bool:
182        try:
183            st = self.stat(path)
184        except OSError:
185            return False
186        return stat.S_ISREG(st.st_mode)
187
188    def isfile_case(self, path: str, prefix: str) -> bool:
189        """Return whether path exists and is a file.
190
191        On case-insensitive filesystems (like Mac or Windows) this returns
192        False if the case of path's last component does not exactly match
193        the case found in the filesystem.
194
195        We check also the case of other path components up to prefix.
196        For example, if path is 'user-stubs/pack/mod.pyi' and prefix is 'user-stubs',
197        we check that the case of 'pack' and 'mod.py' matches exactly, 'user-stubs' will be
198        case insensitive on case insensitive filesystems.
199
200        The caller must ensure that prefix is a valid file system prefix of path.
201        """
202        if sys.platform == "linux":
203            # Assume that the file system on Linux is case sensitive
204            return self.isfile(path)
205        if not self.isfile(path):
206            # Fast path
207            return False
208        if path in self.isfile_case_cache:
209            return self.isfile_case_cache[path]
210        head, tail = os.path.split(path)
211        if not tail:
212            self.isfile_case_cache[path] = False
213            return False
214        try:
215            names = self.listdir(head)
216            # This allows one to check file name case sensitively in
217            # case-insensitive filesystems.
218            res = tail in names
219        except OSError:
220            res = False
221        if res:
222            # Also recursively check the other path components in case sensitive way.
223            res = self._exists_case(head, prefix)
224        self.isfile_case_cache[path] = res
225        return res
226
227    def _exists_case(self, path: str, prefix: str) -> bool:
228        """Helper to check path components in case sensitive fashion, up to prefix."""
229        if path in self.exists_case_cache:
230            return self.exists_case_cache[path]
231        head, tail = os.path.split(path)
232        if not head.startswith(prefix) or not tail:
233            # Only perform the check for paths under prefix.
234            self.exists_case_cache[path] = True
235            return True
236        try:
237            names = self.listdir(head)
238            # This allows one to check file name case sensitively in
239            # case-insensitive filesystems.
240            res = tail in names
241        except OSError:
242            res = False
243        if res:
244            # Also recursively check other path components.
245            res = self._exists_case(head, prefix)
246        self.exists_case_cache[path] = res
247        return res
248
249    def isdir(self, path: str) -> bool:
250        try:
251            st = self.stat(path)
252        except OSError:
253            return False
254        return stat.S_ISDIR(st.st_mode)
255
256    def exists(self, path: str) -> bool:
257        try:
258            self.stat(path)
259        except FileNotFoundError:
260            return False
261        return True
262
263    def read(self, path: str) -> bytes:
264        if path in self.read_cache:
265            return self.read_cache[path]
266        if path in self.read_error_cache:
267            raise self.read_error_cache[path]
268
269        # Need to stat first so that the contents of file are from no
270        # earlier instant than the mtime reported by self.stat().
271        self.stat(path)
272
273        dirname, basename = os.path.split(path)
274        dirname = os.path.normpath(dirname)
275        # Check the fake cache.
276        if basename == '__init__.py' and dirname in self.fake_package_cache:
277            data = b''
278        else:
279            try:
280                with open(path, 'rb') as f:
281                    data = f.read()
282            except OSError as err:
283                self.read_error_cache[path] = err
284                raise
285
286        self.read_cache[path] = data
287        self.hash_cache[path] = hash_digest(data)
288        return data
289
290    def hash_digest(self, path: str) -> str:
291        if path not in self.hash_cache:
292            self.read(path)
293        return self.hash_cache[path]
294
295    def samefile(self, f1: str, f2: str) -> bool:
296        s1 = self.stat(f1)
297        s2 = self.stat(f2)
298        return os.path.samestat(s1, s2)
299
300
301def copy_os_error(e: OSError) -> OSError:
302    new = OSError(*e.args)
303    new.errno = e.errno
304    new.strerror = e.strerror
305    new.filename = e.filename
306    if e.filename2:
307        new.filename2 = e.filename2
308    return new
309