1"""Interface for accessing the file system with automatic caching. 2 3The idea is to cache the results of any file system state reads during 4a single transaction. This has two main benefits: 5 6* This avoids redundant syscalls, as we won't perform the same OS 7 operations multiple times. 8 9* This makes it easier to reason about concurrent FS updates, as different 10 operations targeting the same paths can't report different state during 11 a transaction. 12 13Note that this only deals with reading state, not writing. 14 15Properties maintained by the API: 16 17* The contents of the file are always from the same or later time compared 18 to the reported mtime of the file, even if mtime is queried after reading 19 a file. 20 21* Repeating an operation produces the same result as the first one during 22 a transaction. 23 24* Call flush() to start a new transaction (flush the caches). 25 26The API is a bit limited. It's easy to add new cached operations, however. 27You should perform all file system reads through the API to actually take 28advantage of the benefits. 29""" 30 31import os 32import stat 33import sys 34from typing import Dict, List, Set 35from mypy.util import hash_digest 36from mypy_extensions import mypyc_attr 37 38 39@mypyc_attr(allow_interpreted_subclasses=True) # for tests 40class FileSystemCache: 41 def __init__(self) -> None: 42 # The package root is not flushed with the caches. 43 # It is set by set_package_root() below. 44 self.package_root = [] # type: List[str] 45 self.flush() 46 47 def set_package_root(self, package_root: List[str]) -> None: 48 self.package_root = package_root 49 50 def flush(self) -> None: 51 """Start another transaction and empty all caches.""" 52 self.stat_cache = {} # type: Dict[str, os.stat_result] 53 self.stat_error_cache = {} # type: Dict[str, OSError] 54 self.listdir_cache = {} # type: Dict[str, List[str]] 55 self.listdir_error_cache = {} # type: Dict[str, OSError] 56 self.isfile_case_cache = {} # type: Dict[str, bool] 57 self.exists_case_cache = {} # type: Dict[str, bool] 58 self.read_cache = {} # type: Dict[str, bytes] 59 self.read_error_cache = {} # type: Dict[str, Exception] 60 self.hash_cache = {} # type: Dict[str, str] 61 self.fake_package_cache = set() # type: Set[str] 62 63 def stat(self, path: str) -> os.stat_result: 64 if path in self.stat_cache: 65 return self.stat_cache[path] 66 if path in self.stat_error_cache: 67 raise copy_os_error(self.stat_error_cache[path]) 68 try: 69 st = os.stat(path) 70 except OSError as err: 71 if self.init_under_package_root(path): 72 try: 73 return self._fake_init(path) 74 except OSError: 75 pass 76 # Take a copy to get rid of associated traceback and frame objects. 77 # Just assigning to __traceback__ doesn't free them. 78 self.stat_error_cache[path] = copy_os_error(err) 79 raise err 80 self.stat_cache[path] = st 81 return st 82 83 def init_under_package_root(self, path: str) -> bool: 84 """Is this path an __init__.py under a package root? 85 86 This is used to detect packages that don't contain __init__.py 87 files, which is needed to support Bazel. The function should 88 only be called for non-existing files. 89 90 It will return True if it refers to a __init__.py file that 91 Bazel would create, so that at runtime Python would think the 92 directory containing it is a package. For this to work you 93 must pass one or more package roots using the --package-root 94 flag. 95 96 As an exceptional case, any directory that is a package root 97 itself will not be considered to contain a __init__.py file. 98 This is different from the rules Bazel itself applies, but is 99 necessary for mypy to properly distinguish packages from other 100 directories. 101 102 See https://docs.bazel.build/versions/master/be/python.html, 103 where this behavior is described under legacy_create_init. 104 """ 105 if not self.package_root: 106 return False 107 dirname, basename = os.path.split(path) 108 if basename != '__init__.py': 109 return False 110 try: 111 st = self.stat(dirname) 112 except OSError: 113 return False 114 else: 115 if not stat.S_ISDIR(st.st_mode): 116 return False 117 ok = False 118 drive, path = os.path.splitdrive(path) # Ignore Windows drive name 119 if os.path.isabs(path): 120 path = os.path.relpath(path) 121 path = os.path.normpath(path) 122 for root in self.package_root: 123 if path.startswith(root): 124 if path == root + basename: 125 # A package root itself is never a package. 126 ok = False 127 break 128 else: 129 ok = True 130 return ok 131 132 def _fake_init(self, path: str) -> os.stat_result: 133 """Prime the cache with a fake __init__.py file. 134 135 This makes code that looks for path believe an empty file by 136 that name exists. Should only be called after 137 init_under_package_root() returns True. 138 """ 139 dirname, basename = os.path.split(path) 140 assert basename == '__init__.py', path 141 assert not os.path.exists(path), path # Not cached! 142 dirname = os.path.normpath(dirname) 143 st = self.stat(dirname) # May raise OSError 144 # Get stat result as a sequence so we can modify it. 145 # (Alas, typeshed's os.stat_result is not a sequence yet.) 146 tpl = tuple(st) # type: ignore[arg-type, var-annotated] 147 seq = list(tpl) # type: List[float] 148 seq[stat.ST_MODE] = stat.S_IFREG | 0o444 149 seq[stat.ST_INO] = 1 150 seq[stat.ST_NLINK] = 1 151 seq[stat.ST_SIZE] = 0 152 tpl = tuple(seq) 153 st = os.stat_result(tpl) 154 self.stat_cache[path] = st 155 # Make listdir() and read() also pretend this file exists. 156 self.fake_package_cache.add(dirname) 157 return st 158 159 def listdir(self, path: str) -> List[str]: 160 path = os.path.normpath(path) 161 if path in self.listdir_cache: 162 res = self.listdir_cache[path] 163 # Check the fake cache. 164 if path in self.fake_package_cache and '__init__.py' not in res: 165 res.append('__init__.py') # Updates the result as well as the cache 166 return res 167 if path in self.listdir_error_cache: 168 raise copy_os_error(self.listdir_error_cache[path]) 169 try: 170 results = os.listdir(path) 171 except OSError as err: 172 # Like above, take a copy to reduce memory use. 173 self.listdir_error_cache[path] = copy_os_error(err) 174 raise err 175 self.listdir_cache[path] = results 176 # Check the fake cache. 177 if path in self.fake_package_cache and '__init__.py' not in results: 178 results.append('__init__.py') 179 return results 180 181 def isfile(self, path: str) -> bool: 182 try: 183 st = self.stat(path) 184 except OSError: 185 return False 186 return stat.S_ISREG(st.st_mode) 187 188 def isfile_case(self, path: str, prefix: str) -> bool: 189 """Return whether path exists and is a file. 190 191 On case-insensitive filesystems (like Mac or Windows) this returns 192 False if the case of path's last component does not exactly match 193 the case found in the filesystem. 194 195 We check also the case of other path components up to prefix. 196 For example, if path is 'user-stubs/pack/mod.pyi' and prefix is 'user-stubs', 197 we check that the case of 'pack' and 'mod.py' matches exactly, 'user-stubs' will be 198 case insensitive on case insensitive filesystems. 199 200 The caller must ensure that prefix is a valid file system prefix of path. 201 """ 202 if sys.platform == "linux": 203 # Assume that the file system on Linux is case sensitive 204 return self.isfile(path) 205 if not self.isfile(path): 206 # Fast path 207 return False 208 if path in self.isfile_case_cache: 209 return self.isfile_case_cache[path] 210 head, tail = os.path.split(path) 211 if not tail: 212 self.isfile_case_cache[path] = False 213 return False 214 try: 215 names = self.listdir(head) 216 # This allows one to check file name case sensitively in 217 # case-insensitive filesystems. 218 res = tail in names 219 except OSError: 220 res = False 221 if res: 222 # Also recursively check the other path components in case sensitive way. 223 res = self._exists_case(head, prefix) 224 self.isfile_case_cache[path] = res 225 return res 226 227 def _exists_case(self, path: str, prefix: str) -> bool: 228 """Helper to check path components in case sensitive fashion, up to prefix.""" 229 if path in self.exists_case_cache: 230 return self.exists_case_cache[path] 231 head, tail = os.path.split(path) 232 if not head.startswith(prefix) or not tail: 233 # Only perform the check for paths under prefix. 234 self.exists_case_cache[path] = True 235 return True 236 try: 237 names = self.listdir(head) 238 # This allows one to check file name case sensitively in 239 # case-insensitive filesystems. 240 res = tail in names 241 except OSError: 242 res = False 243 if res: 244 # Also recursively check other path components. 245 res = self._exists_case(head, prefix) 246 self.exists_case_cache[path] = res 247 return res 248 249 def isdir(self, path: str) -> bool: 250 try: 251 st = self.stat(path) 252 except OSError: 253 return False 254 return stat.S_ISDIR(st.st_mode) 255 256 def exists(self, path: str) -> bool: 257 try: 258 self.stat(path) 259 except FileNotFoundError: 260 return False 261 return True 262 263 def read(self, path: str) -> bytes: 264 if path in self.read_cache: 265 return self.read_cache[path] 266 if path in self.read_error_cache: 267 raise self.read_error_cache[path] 268 269 # Need to stat first so that the contents of file are from no 270 # earlier instant than the mtime reported by self.stat(). 271 self.stat(path) 272 273 dirname, basename = os.path.split(path) 274 dirname = os.path.normpath(dirname) 275 # Check the fake cache. 276 if basename == '__init__.py' and dirname in self.fake_package_cache: 277 data = b'' 278 else: 279 try: 280 with open(path, 'rb') as f: 281 data = f.read() 282 except OSError as err: 283 self.read_error_cache[path] = err 284 raise 285 286 self.read_cache[path] = data 287 self.hash_cache[path] = hash_digest(data) 288 return data 289 290 def hash_digest(self, path: str) -> str: 291 if path not in self.hash_cache: 292 self.read(path) 293 return self.hash_cache[path] 294 295 def samefile(self, f1: str, f2: str) -> bool: 296 s1 = self.stat(f1) 297 s2 = self.stat(f2) 298 return os.path.samestat(s1, s2) 299 300 301def copy_os_error(e: OSError) -> OSError: 302 new = OSError(*e.args) 303 new.errno = e.errno 304 new.strerror = e.strerror 305 new.filename = e.filename 306 if e.filename2: 307 new.filename2 = e.filename2 308 return new 309