1"""Base class for filesystem wrappers.
2"""
3
4from __future__ import unicode_literals
5
6import typing
7
8import six
9
10from . import errors
11from .base import FS
12from .copy import copy_file, copy_dir
13from .info import Info
14from .move import move_file, move_dir
15from .path import abspath, normpath
16from .error_tools import unwrap_errors
17
18if typing.TYPE_CHECKING:
19    from datetime import datetime
20    from threading import RLock
21    from typing import (
22        Any,
23        AnyStr,
24        BinaryIO,
25        Callable,
26        Collection,
27        Iterator,
28        Iterable,
29        IO,
30        List,
31        Mapping,
32        Optional,
33        Text,
34        Tuple,
35        Union,
36    )
37    from .enums import ResourceType
38    from .info import RawInfo
39    from .permissions import Permissions
40    from .subfs import SubFS
41    from .walk import BoundWalker
42
43    _T = typing.TypeVar("_T", bound="FS")
44    _OpendirFactory = Callable[[_T, Text], SubFS[_T]]
45
46
47_F = typing.TypeVar("_F", bound="FS", covariant=True)
48_W = typing.TypeVar("_W", bound="WrapFS[FS]")
49
50
51@six.python_2_unicode_compatible
52class WrapFS(FS, typing.Generic[_F]):
53    """A proxy for a filesystem object.
54
55    This class exposes an filesystem interface, where the data is
56    stored on another filesystem(s), and is the basis for
57    `~fs.subfs.SubFS` and other *virtual* filesystems.
58
59    """
60
61    wrap_name = None  # type: Optional[Text]
62
63    def __init__(self, wrap_fs):
64        # type: (_F) -> None
65        self._wrap_fs = wrap_fs
66        super(WrapFS, self).__init__()
67
68    def __repr__(self):
69        # type: () -> Text
70        return "{}({!r})".format(self.__class__.__name__, self._wrap_fs)
71
72    def __str__(self):
73        # type: () -> Text
74        wraps = []
75        _fs = self  # type: Union[FS, WrapFS[FS]]
76        while hasattr(_fs, "_wrap_fs"):
77            wrap_name = getattr(_fs, "wrap_name", None)
78            if wrap_name is not None:
79                wraps.append(wrap_name)
80            _fs = _fs._wrap_fs  # type: ignore
81        if wraps:
82            _str = "{}({})".format(_fs, ", ".join(wraps[::-1]))
83        else:
84            _str = "{}".format(_fs)
85        return _str
86
87    def delegate_path(self, path):
88        # type: (Text) -> Tuple[_F, Text]
89        """Encode a path for proxied filesystem.
90
91        Arguments:
92            path (str): A path on the filesystem.
93
94        Returns:
95            (FS, str): a tuple of ``(<filesystem>, <new_path>)``
96
97        """
98        return self._wrap_fs, path
99
100    def delegate_fs(self):
101        # type: () -> _F
102        """Get the proxied filesystem.
103
104        This method should return a filesystem for methods not
105        associated with a path, e.g. `~fs.base.FS.getmeta`.
106
107        """
108        return self._wrap_fs
109
110    def appendbytes(self, path, data):
111        # type: (Text, bytes) -> None
112        self.check()
113        _fs, _path = self.delegate_path(path)
114        with unwrap_errors(path):
115            return _fs.appendbytes(_path, data)
116
117    def appendtext(
118        self,
119        path,  # type: Text
120        text,  # type: Text
121        encoding="utf-8",  # type: Text
122        errors=None,  # type: Optional[Text]
123        newline="",  # type: Text
124    ):
125        # type: (...) -> None
126        self.check()
127        _fs, _path = self.delegate_path(path)
128        with unwrap_errors(path):
129            return _fs.appendtext(
130                _path, text, encoding=encoding, errors=errors, newline=newline
131            )
132
133    def getinfo(self, path, namespaces=None):
134        # type: (Text, Optional[Collection[Text]]) -> Info
135        self.check()
136        _fs, _path = self.delegate_path(path)
137        with unwrap_errors(path):
138            raw_info = _fs.getinfo(_path, namespaces=namespaces).raw
139        if abspath(normpath(path)) == "/":
140            raw_info = dict(raw_info)
141            raw_info["basic"]["name"] = ""  # type: ignore
142        return Info(raw_info)
143
144    def listdir(self, path):
145        # type: (Text) -> List[Text]
146        self.check()
147        _fs, _path = self.delegate_path(path)
148        with unwrap_errors(path):
149            dir_list = _fs.listdir(_path)
150        return dir_list
151
152    def lock(self):
153        # type: () -> RLock
154        self.check()
155        _fs = self.delegate_fs()
156        return _fs.lock()
157
158    def makedir(
159        self,
160        path,  # type: Text
161        permissions=None,  # type: Optional[Permissions]
162        recreate=False,  # type: bool
163    ):
164        # type: (...) -> SubFS[FS]
165        self.check()
166        _fs, _path = self.delegate_path(path)
167        with unwrap_errors(path):
168            return _fs.makedir(_path, permissions=permissions, recreate=recreate)
169
170    def move(self, src_path, dst_path, overwrite=False):
171        # type: (Text, Text, bool) -> None
172        # A custom move permits a potentially optimized code path
173        src_fs, _src_path = self.delegate_path(src_path)
174        dst_fs, _dst_path = self.delegate_path(dst_path)
175        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
176            if not overwrite and dst_fs.exists(_dst_path):
177                raise errors.DestinationExists(_dst_path)
178            move_file(src_fs, _src_path, dst_fs, _dst_path)
179
180    def movedir(self, src_path, dst_path, create=False):
181        # type: (Text, Text, bool) -> None
182        src_fs, _src_path = self.delegate_path(src_path)
183        dst_fs, _dst_path = self.delegate_path(dst_path)
184        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
185            if not create and not dst_fs.exists(_dst_path):
186                raise errors.ResourceNotFound(dst_path)
187            if not src_fs.getinfo(_src_path).is_dir:
188                raise errors.DirectoryExpected(src_path)
189            move_dir(src_fs, _src_path, dst_fs, _dst_path)
190
191    def openbin(self, path, mode="r", buffering=-1, **options):
192        # type: (Text, Text, int, **Any) -> BinaryIO
193        self.check()
194        _fs, _path = self.delegate_path(path)
195        with unwrap_errors(path):
196            bin_file = _fs.openbin(_path, mode=mode, buffering=-1, **options)
197        return bin_file
198
199    def remove(self, path):
200        # type: (Text) -> None
201        self.check()
202        _fs, _path = self.delegate_path(path)
203        with unwrap_errors(path):
204            _fs.remove(_path)
205
206    def removedir(self, path):
207        # type: (Text) -> None
208        self.check()
209        _path = abspath(normpath(path))
210        if _path == "/":
211            raise errors.RemoveRootError()
212        _fs, _path = self.delegate_path(path)
213        with unwrap_errors(path):
214            _fs.removedir(_path)
215
216    def removetree(self, dir_path):
217        # type: (Text) -> None
218        self.check()
219        _path = abspath(normpath(dir_path))
220        if _path == "/":
221            raise errors.RemoveRootError()
222        _fs, _path = self.delegate_path(dir_path)
223        with unwrap_errors(dir_path):
224            _fs.removetree(_path)
225
226    def scandir(
227        self,
228        path,  # type: Text
229        namespaces=None,  # type: Optional[Collection[Text]]
230        page=None,  # type: Optional[Tuple[int, int]]
231    ):
232        # type: (...) -> Iterator[Info]
233        self.check()
234        _fs, _path = self.delegate_path(path)
235        with unwrap_errors(path):
236            for info in _fs.scandir(_path, namespaces=namespaces, page=page):
237                yield info
238
239    def setinfo(self, path, info):
240        # type: (Text, RawInfo) -> None
241        self.check()
242        _fs, _path = self.delegate_path(path)
243        return _fs.setinfo(_path, info)
244
245    def settimes(self, path, accessed=None, modified=None):
246        # type: (Text, Optional[datetime], Optional[datetime]) -> None
247        self.check()
248        _fs, _path = self.delegate_path(path)
249        with unwrap_errors(path):
250            _fs.settimes(_path, accessed=accessed, modified=modified)
251
252    def touch(self, path):
253        # type: (Text) -> None
254        self.check()
255        _fs, _path = self.delegate_path(path)
256        with unwrap_errors(path):
257            _fs.touch(_path)
258
259    def copy(self, src_path, dst_path, overwrite=False):
260        # type: (Text, Text, bool) -> None
261        src_fs, _src_path = self.delegate_path(src_path)
262        dst_fs, _dst_path = self.delegate_path(dst_path)
263        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
264            if not overwrite and dst_fs.exists(_dst_path):
265                raise errors.DestinationExists(_dst_path)
266            copy_file(src_fs, _src_path, dst_fs, _dst_path)
267
268    def copydir(self, src_path, dst_path, create=False):
269        # type: (Text, Text, bool) -> None
270        src_fs, _src_path = self.delegate_path(src_path)
271        dst_fs, _dst_path = self.delegate_path(dst_path)
272        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
273            if not create and not dst_fs.exists(_dst_path):
274                raise errors.ResourceNotFound(dst_path)
275            if not src_fs.getinfo(_src_path).is_dir:
276                raise errors.DirectoryExpected(src_path)
277            copy_dir(src_fs, _src_path, dst_fs, _dst_path)
278
279    def create(self, path, wipe=False):
280        # type: (Text, bool) -> bool
281        self.check()
282        _fs, _path = self.delegate_path(path)
283        with unwrap_errors(path):
284            return _fs.create(_path, wipe=wipe)
285
286    def desc(self, path):
287        # type: (Text) -> Text
288        self.check()
289        _fs, _path = self.delegate_path(path)
290        with unwrap_errors(path):
291            desc = _fs.desc(_path)
292        return desc
293
294    def download(self, path, file, chunk_size=None, **options):
295        # type: (Text, BinaryIO, Optional[int], **Any) -> None
296        self.check()
297        _fs, _path = self.delegate_path(path)
298        with unwrap_errors(path):
299            _fs.download(_path, file, chunk_size=chunk_size, **options)
300
301    def exists(self, path):
302        # type: (Text) -> bool
303        self.check()
304        _fs, _path = self.delegate_path(path)
305        with unwrap_errors(path):
306            exists = _fs.exists(_path)
307        return exists
308
309    def filterdir(
310        self,
311        path,  # type: Text
312        files=None,  # type: Optional[Iterable[Text]]
313        dirs=None,  # type: Optional[Iterable[Text]]
314        exclude_dirs=None,  # type: Optional[Iterable[Text]]
315        exclude_files=None,  # type: Optional[Iterable[Text]]
316        namespaces=None,  # type: Optional[Collection[Text]]
317        page=None,  # type: Optional[Tuple[int, int]]
318    ):
319        # type: (...) -> Iterator[Info]
320        self.check()
321        _fs, _path = self.delegate_path(path)
322        iter_files = iter(
323            _fs.filterdir(
324                _path,
325                exclude_dirs=exclude_dirs,
326                exclude_files=exclude_files,
327                files=files,
328                dirs=dirs,
329                namespaces=namespaces,
330                page=page,
331            )
332        )
333        with unwrap_errors(path):
334            for info in iter_files:
335                yield info
336
337    def readbytes(self, path):
338        # type: (Text) -> bytes
339        self.check()
340        _fs, _path = self.delegate_path(path)
341        with unwrap_errors(path):
342            _bytes = _fs.readbytes(_path)
343        return _bytes
344
345    def readtext(
346        self,
347        path,  # type: Text
348        encoding=None,  # type: Optional[Text]
349        errors=None,  # type: Optional[Text]
350        newline="",  # type: Text
351    ):
352        # type: (...) -> Text
353        self.check()
354        _fs, _path = self.delegate_path(path)
355        with unwrap_errors(path):
356            _text = _fs.readtext(
357                _path, encoding=encoding, errors=errors, newline=newline
358            )
359        return _text
360
361    def getmeta(self, namespace="standard"):
362        # type: (Text) -> Mapping[Text, object]
363        self.check()
364        meta = self.delegate_fs().getmeta(namespace=namespace)
365        return meta
366
367    def getsize(self, path):
368        # type: (Text) -> int
369        self.check()
370        _fs, _path = self.delegate_path(path)
371        with unwrap_errors(path):
372            size = _fs.getsize(_path)
373        return size
374
375    def getsyspath(self, path):
376        # type: (Text) -> Text
377        self.check()
378        _fs, _path = self.delegate_path(path)
379        with unwrap_errors(path):
380            sys_path = _fs.getsyspath(_path)
381        return sys_path
382
383    def gettype(self, path):
384        # type: (Text) -> ResourceType
385        self.check()
386        _fs, _path = self.delegate_path(path)
387        with unwrap_errors(path):
388            _type = _fs.gettype(_path)
389        return _type
390
391    def geturl(self, path, purpose="download"):
392        # type: (Text, Text) -> Text
393        self.check()
394        _fs, _path = self.delegate_path(path)
395        with unwrap_errors(path):
396            return _fs.geturl(_path, purpose=purpose)
397
398    def hassyspath(self, path):
399        # type: (Text) -> bool
400        self.check()
401        _fs, _path = self.delegate_path(path)
402        with unwrap_errors(path):
403            has_sys_path = _fs.hassyspath(_path)
404        return has_sys_path
405
406    def hasurl(self, path, purpose="download"):
407        # type: (Text, Text) -> bool
408        self.check()
409        _fs, _path = self.delegate_path(path)
410        with unwrap_errors(path):
411            has_url = _fs.hasurl(_path, purpose=purpose)
412        return has_url
413
414    def isdir(self, path):
415        # type: (Text) -> bool
416        self.check()
417        _fs, _path = self.delegate_path(path)
418        with unwrap_errors(path):
419            _isdir = _fs.isdir(_path)
420        return _isdir
421
422    def isfile(self, path):
423        # type: (Text) -> bool
424        self.check()
425        _fs, _path = self.delegate_path(path)
426        with unwrap_errors(path):
427            _isfile = _fs.isfile(_path)
428        return _isfile
429
430    def islink(self, path):
431        # type: (Text) -> bool
432        self.check()
433        _fs, _path = self.delegate_path(path)
434        with unwrap_errors(path):
435            _islink = _fs.islink(_path)
436        return _islink
437
438    def makedirs(
439        self,
440        path,  # type: Text
441        permissions=None,  # type: Optional[Permissions]
442        recreate=False,  # type: bool
443    ):
444        # type: (...) -> SubFS[FS]
445        self.check()
446        _fs, _path = self.delegate_path(path)
447        return _fs.makedirs(_path, permissions=permissions, recreate=recreate)
448
449    # FIXME(@althonos): line_buffering is not a FS.open declared argument
450    def open(
451        self,
452        path,  # type: Text
453        mode="r",  # type: Text
454        buffering=-1,  # type: int
455        encoding=None,  # type: Optional[Text]
456        errors=None,  # type: Optional[Text]
457        newline="",  # type: Text
458        line_buffering=False,  # type: bool
459        **options  # type: Any
460    ):
461        # type: (...) -> IO[AnyStr]
462        self.check()
463        _fs, _path = self.delegate_path(path)
464        with unwrap_errors(path):
465            open_file = _fs.open(
466                _path,
467                mode=mode,
468                buffering=buffering,
469                encoding=encoding,
470                errors=errors,
471                newline=newline,
472                line_buffering=line_buffering,
473                **options
474            )
475        return open_file
476
477    def opendir(
478        self,  # type: _W
479        path,  # type: Text
480        factory=None,  # type: Optional[_OpendirFactory]
481    ):
482        # type: (...) -> SubFS[_W]
483        from .subfs import SubFS
484
485        factory = factory or SubFS
486        if not self.getinfo(path).is_dir:
487            raise errors.DirectoryExpected(path=path)
488        with unwrap_errors(path):
489            return factory(self, path)
490
491    def writebytes(self, path, contents):
492        # type: (Text, bytes) -> None
493        self.check()
494        _fs, _path = self.delegate_path(path)
495        with unwrap_errors(path):
496            _fs.writebytes(_path, contents)
497
498    def upload(self, path, file, chunk_size=None, **options):
499        # type: (Text, BinaryIO, Optional[int], **Any) -> None
500        self.check()
501        _fs, _path = self.delegate_path(path)
502        with unwrap_errors(path):
503            _fs.upload(_path, file, chunk_size=chunk_size, **options)
504
505    def writefile(
506        self,
507        path,  # type: Text
508        file,  # type: IO[AnyStr]
509        encoding=None,  # type: Optional[Text]
510        errors=None,  # type: Optional[Text]
511        newline="",  # type: Text
512    ):
513        # type: (...) -> None
514        self.check()
515        _fs, _path = self.delegate_path(path)
516        with unwrap_errors(path):
517            _fs.writefile(
518                _path, file, encoding=encoding, errors=errors, newline=newline
519            )
520
521    def validatepath(self, path):
522        # type: (Text) -> Text
523        self.check()
524        _fs, _path = self.delegate_path(path)
525        with unwrap_errors(path):
526            _fs.validatepath(_path)
527        path = abspath(normpath(path))
528        return path
529
530    def hash(self, path, name):
531        # type: (Text, Text) -> Text
532        self.check()
533        _fs, _path = self.delegate_path(path)
534        with unwrap_errors(path):
535            return _fs.hash(_path, name)
536
537    @property
538    def walk(self):
539        # type: () -> BoundWalker
540        return self._wrap_fs.walker_class.bind(self)
541