1"""Collection of useful `~fs.wrapfs.WrapFS` subclasses. 2 3Here's an example that opens a filesystem then makes it *read only*:: 4 5 >>> from fs import open_fs 6 >>> from fs.wrap import read_only 7 >>> projects_fs = open_fs('~/projects') 8 >>> read_only_projects_fs = read_only(projects_fs) 9 >>> read_only_projects_fs.remove('__init__.py') 10 Traceback (most recent call last): 11 ... 12 fs.errors.ResourceReadOnly: resource '__init__.py' is read only 13 14""" 15 16from __future__ import print_function 17from __future__ import unicode_literals 18 19import typing 20 21from .wrapfs import WrapFS 22from .path import abspath, normpath, split 23from .errors import ResourceReadOnly, ResourceNotFound 24from .info import Info 25from .mode import check_writable 26 27if typing.TYPE_CHECKING: 28 from datetime import datetime 29 from typing import ( 30 Any, 31 BinaryIO, 32 Collection, 33 Dict, 34 Iterator, 35 IO, 36 Optional, 37 Text, 38 Tuple, 39 ) 40 from .base import FS # noqa: F401 41 from .info import RawInfo 42 from .subfs import SubFS 43 from .permissions import Permissions 44 45 46_W = typing.TypeVar("_W", bound="WrapFS") 47_T = typing.TypeVar("_T", bound="FS") 48_F = typing.TypeVar("_F", bound="FS", covariant=True) 49 50 51def read_only(fs): 52 # type: (_T) -> WrapReadOnly[_T] 53 """Make a read-only filesystem. 54 55 Arguments: 56 fs (FS): A filesystem instance. 57 58 Returns: 59 FS: A read only version of ``fs`` 60 61 """ 62 return WrapReadOnly(fs) 63 64 65def cache_directory(fs): 66 # type: (_T) -> WrapCachedDir[_T] 67 """Make a filesystem that caches directory information. 68 69 Arguments: 70 fs (FS): A filesystem instance. 71 72 Returns: 73 FS: A filesystem that caches results of `~FS.scandir`, `~FS.isdir` 74 and other methods which read directory information. 75 76 """ 77 return WrapCachedDir(fs) 78 79 80class WrapCachedDir(WrapFS[_F], typing.Generic[_F]): 81 """Caches filesystem directory information. 82 83 This filesystem caches directory information retrieved from a 84 scandir call. This *may* speed up code that calls `~FS.isdir`, 85 `~FS.isfile`, or `~FS.gettype` too frequently. 86 87 Note: 88 Using this wrap will prevent changes to directory information 89 being visible to the filesystem object. Consequently it is best 90 used only in a fairly limited scope where you don't expected 91 anything on the filesystem to change. 92 93 """ 94 95 wrap_name = "cached-dir" 96 97 def __init__(self, wrap_fs): 98 # type: (_F) -> None 99 super(WrapCachedDir, self).__init__(wrap_fs) 100 self._cache = {} # type: Dict[Tuple[Text, frozenset], Dict[Text, Info]] 101 102 def scandir( 103 self, 104 path, # type: Text 105 namespaces=None, # type: Optional[Collection[Text]] 106 page=None, # type: Optional[Tuple[int, int]] 107 ): 108 # type: (...) -> Iterator[Info] 109 _path = abspath(normpath(path)) 110 cache_key = (_path, frozenset(namespaces or ())) 111 if cache_key not in self._cache: 112 _scan_result = self._wrap_fs.scandir(path, namespaces=namespaces, page=page) 113 _dir = {info.name: info for info in _scan_result} 114 self._cache[cache_key] = _dir 115 gen_scandir = iter(self._cache[cache_key].values()) 116 return gen_scandir 117 118 def getinfo(self, path, namespaces=None): 119 # type: (Text, Optional[Collection[Text]]) -> Info 120 _path = abspath(normpath(path)) 121 if _path == "/": 122 return Info({"basic": {"name": "", "is_dir": True}}) 123 dir_path, resource_name = split(_path) 124 cache_key = (dir_path, frozenset(namespaces or ())) 125 126 if cache_key not in self._cache: 127 self.scandir(dir_path, namespaces=namespaces) 128 129 _dir = self._cache[cache_key] 130 try: 131 info = _dir[resource_name] 132 except KeyError: 133 raise ResourceNotFound(path) 134 return info 135 136 def isdir(self, path): 137 # type: (Text) -> bool 138 # FIXME(@althonos): this raises an error on non-existing file ! 139 return self.getinfo(path).is_dir 140 141 def isfile(self, path): 142 # type: (Text) -> bool 143 # FIXME(@althonos): this raises an error on non-existing file ! 144 return not self.getinfo(path).is_dir 145 146 147class WrapReadOnly(WrapFS[_F], typing.Generic[_F]): 148 """Makes a Filesystem read-only. 149 150 Any call that would would write data or modify the filesystem in any way 151 will raise a `~fs.errors.ResourceReadOnly` exception. 152 153 """ 154 155 wrap_name = "read-only" 156 157 def appendbytes(self, path, data): 158 # type: (Text, bytes) -> None 159 self.check() 160 raise ResourceReadOnly(path) 161 162 def appendtext( 163 self, 164 path, # type: Text 165 text, # type: Text 166 encoding="utf-8", # type: Text 167 errors=None, # type: Optional[Text] 168 newline="", # type: Text 169 ): 170 # type: (...) -> None 171 self.check() 172 raise ResourceReadOnly(path) 173 174 def makedir( 175 self, # type: _W 176 path, # type: Text 177 permissions=None, # type: Optional[Permissions] 178 recreate=False, # type: bool 179 ): 180 # type: (...) -> SubFS[_W] 181 self.check() 182 raise ResourceReadOnly(path) 183 184 def move(self, src_path, dst_path, overwrite=False): 185 # type: (Text, Text, bool) -> None 186 self.check() 187 raise ResourceReadOnly(dst_path) 188 189 def openbin(self, path, mode="r", buffering=-1, **options): 190 # type: (Text, Text, int, **Any) -> BinaryIO 191 self.check() 192 if check_writable(mode): 193 raise ResourceReadOnly(path) 194 return self._wrap_fs.openbin(path, mode=mode, buffering=-1, **options) 195 196 def remove(self, path): 197 # type: (Text) -> None 198 self.check() 199 raise ResourceReadOnly(path) 200 201 def removedir(self, path): 202 # type: (Text) -> None 203 self.check() 204 raise ResourceReadOnly(path) 205 206 def setinfo(self, path, info): 207 # type: (Text, RawInfo) -> None 208 self.check() 209 raise ResourceReadOnly(path) 210 211 def writetext( 212 self, 213 path, # type: Text 214 contents, # type: Text 215 encoding="utf-8", # type: Text 216 errors=None, # type: Optional[Text] 217 newline="", # type: Text 218 ): 219 # type: (...) -> None 220 self.check() 221 raise ResourceReadOnly(path) 222 223 def settimes(self, path, accessed=None, modified=None): 224 # type: (Text, Optional[datetime], Optional[datetime]) -> None 225 self.check() 226 raise ResourceReadOnly(path) 227 228 def copy(self, src_path, dst_path, overwrite=False): 229 # type: (Text, Text, bool) -> None 230 self.check() 231 raise ResourceReadOnly(dst_path) 232 233 def create(self, path, wipe=False): 234 # type: (Text, bool) -> bool 235 self.check() 236 raise ResourceReadOnly(path) 237 238 def makedirs( 239 self, # type: _W 240 path, # type: Text 241 permissions=None, # type: Optional[Permissions] 242 recreate=False, # type: bool 243 ): 244 # type: (...) -> SubFS[_W] 245 self.check() 246 raise ResourceReadOnly(path) 247 248 def open( 249 self, 250 path, # type: Text 251 mode="r", # type: Text 252 buffering=-1, # type: int 253 encoding=None, # type: Optional[Text] 254 errors=None, # type: Optional[Text] 255 newline="", # type: Text 256 line_buffering=False, # type: bool 257 **options # type: Any 258 ): 259 # type: (...) -> IO 260 self.check() 261 if check_writable(mode): 262 raise ResourceReadOnly(path) 263 return self._wrap_fs.open( 264 path, 265 mode=mode, 266 buffering=buffering, 267 encoding=encoding, 268 errors=errors, 269 newline=newline, 270 line_buffering=line_buffering, 271 **options 272 ) 273 274 def writebytes(self, path, contents): 275 # type: (Text, bytes) -> None 276 self.check() 277 raise ResourceReadOnly(path) 278 279 def upload(self, path, file, chunk_size=None, **options): 280 # type: (Text, BinaryIO, Optional[int], **Any) -> None 281 self.check() 282 raise ResourceReadOnly(path) 283 284 def writefile( 285 self, 286 path, # type: Text 287 file, # type: IO 288 encoding=None, # type: Optional[Text] 289 errors=None, # type: Optional[Text] 290 newline="", # type: Text 291 ): 292 # type: (...) -> None 293 self.check() 294 raise ResourceReadOnly(path) 295 296 def touch(self, path): 297 # type: (Text) -> None 298 self.check() 299 raise ResourceReadOnly(path) 300