1import io
2import logging
3import os
4import threading
5import warnings
6import weakref
7from distutils.version import LooseVersion
8from errno import ESPIPE
9from glob import has_magic
10from hashlib import sha256
11
12from .callbacks import _DEFAULT_CALLBACK
13from .config import apply_config, conf
14from .dircache import DirCache
15from .transaction import Transaction
16from .utils import (
17    _unstrip_protocol,
18    get_package_version_without_import,
19    other_paths,
20    read_block,
21    stringify_path,
22    tokenize,
23)
24
25logger = logging.getLogger("fsspec")
26
27
28def make_instance(cls, args, kwargs):
29    return cls(*args, **kwargs)
30
31
32class _Cached(type):
33    """
34    Metaclass for caching file system instances.
35
36    Notes
37    -----
38    Instances are cached according to
39
40    * The values of the class attributes listed in `_extra_tokenize_attributes`
41    * The arguments passed to ``__init__``.
42
43    This creates an additional reference to the filesystem, which prevents the
44    filesystem from being garbage collected when all *user* references go away.
45    A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
46    be made for a filesystem instance to be garbage collected.
47    """
48
49    def __init__(cls, *args, **kwargs):
50        super().__init__(*args, **kwargs)
51        # Note: we intentionally create a reference here, to avoid garbage
52        # collecting instances when all other references are gone. To really
53        # delete a FileSystem, the cache must be cleared.
54        if conf.get("weakref_instance_cache"):  # pragma: no cover
55            # debug option for analysing fork/spawn conditions
56            cls._cache = weakref.WeakValueDictionary()
57        else:
58            cls._cache = {}
59        cls._pid = os.getpid()
60
61    def __call__(cls, *args, **kwargs):
62        kwargs = apply_config(cls, kwargs)
63        extra_tokens = tuple(
64            getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
65        )
66        token = tokenize(
67            cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
68        )
69        skip = kwargs.pop("skip_instance_cache", False)
70        if os.getpid() != cls._pid:
71            cls._cache.clear()
72            cls._pid = os.getpid()
73        if not skip and cls.cachable and token in cls._cache:
74            return cls._cache[token]
75        else:
76            obj = super().__call__(*args, **kwargs)
77            # Setting _fs_token here causes some static linters to complain.
78            obj._fs_token_ = token
79            obj.storage_args = args
80            obj.storage_options = kwargs
81            if obj.async_impl:
82                from .asyn import mirror_sync_methods
83
84                mirror_sync_methods(obj)
85
86            if cls.cachable and not skip:
87                cls._cache[token] = obj
88            return obj
89
90
91pa_version = get_package_version_without_import("pyarrow")
92if pa_version and LooseVersion(pa_version) < LooseVersion("2.0"):
93    try:
94        import pyarrow as pa
95
96        up = pa.filesystem.DaskFileSystem
97    except ImportError:  # pragma: no cover
98        # pyarrow exists but doesn't import for some reason
99        up = object
100else:  # pragma: no cover
101    up = object
102
103
104class AbstractFileSystem(up, metaclass=_Cached):
105    """
106    An abstract super-class for pythonic file-systems
107
108    Implementations are expected to be compatible with or, better, subclass
109    from here.
110    """
111
112    cachable = True  # this class can be cached, instances reused
113    _cached = False
114    blocksize = 2 ** 22
115    sep = "/"
116    protocol = "abstract"
117    async_impl = False
118    root_marker = ""  # For some FSs, may require leading '/' or other character
119
120    #: Extra *class attributes* that should be considered when hashing.
121    _extra_tokenize_attributes = ()
122
123    def __init__(self, *args, **storage_options):
124        """Create and configure file-system instance
125
126        Instances may be cachable, so if similar enough arguments are seen
127        a new instance is not required. The token attribute exists to allow
128        implementations to cache instances if they wish.
129
130        A reasonable default should be provided if there are no arguments.
131
132        Subclasses should call this method.
133
134        Parameters
135        ----------
136        use_listings_cache, listings_expiry_time, max_paths:
137            passed to ``DirCache``, if the implementation supports
138            directory listing caching. Pass use_listings_cache=False
139            to disable such caching.
140        skip_instance_cache: bool
141            If this is a cachable implementation, pass True here to force
142            creating a new instance even if a matching instance exists, and prevent
143            storing this instance.
144        asynchronous: bool
145        loop: asyncio-compatible IOLoop or None
146        """
147        if self._cached:
148            # reusing instance, don't change
149            return
150        self._cached = True
151        self._intrans = False
152        self._transaction = None
153        self._invalidated_caches_in_transaction = []
154        self.dircache = DirCache(**storage_options)
155
156        if storage_options.pop("add_docs", None):
157            warnings.warn("add_docs is no longer supported.", FutureWarning)
158
159        if storage_options.pop("add_aliases", None):
160            warnings.warn("add_aliases has been removed.", FutureWarning)
161        # This is set in _Cached
162        self._fs_token_ = None
163
164    @property
165    def _fs_token(self):
166        return self._fs_token_
167
168    def __dask_tokenize__(self):
169        return self._fs_token
170
171    def __hash__(self):
172        return int(self._fs_token, 16)
173
174    def __eq__(self, other):
175        return isinstance(other, type(self)) and self._fs_token == other._fs_token
176
177    def __reduce__(self):
178        return make_instance, (type(self), self.storage_args, self.storage_options)
179
180    @classmethod
181    def _strip_protocol(cls, path):
182        """Turn path from fully-qualified to file-system-specific
183
184        May require FS-specific handling, e.g., for relative paths or links.
185        """
186        if isinstance(path, list):
187            return [cls._strip_protocol(p) for p in path]
188        path = stringify_path(path)
189        protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
190        for protocol in protos:
191            if path.startswith(protocol + "://"):
192                path = path[len(protocol) + 3 :]
193            elif path.startswith(protocol + "::"):
194                path = path[len(protocol) + 2 :]
195        path = path.rstrip("/")
196        # use of root_marker to make minimum required path, e.g., "/"
197        return path or cls.root_marker
198
199    @staticmethod
200    def _get_kwargs_from_urls(path):
201        """If kwargs can be encoded in the paths, extract them here
202
203        This should happen before instantiation of the class; incoming paths
204        then should be amended to strip the options in methods.
205
206        Examples may look like an sftp path "sftp://user@host:/my/path", where
207        the user and host should become kwargs and later get stripped.
208        """
209        # by default, nothing happens
210        return {}
211
212    @classmethod
213    def current(cls):
214        """Return the most recently created FileSystem
215
216        If no instance has been created, then create one with defaults
217        """
218        if not len(cls._cache):
219            return cls()
220        else:
221            return list(cls._cache.values())[-1]
222
223    @property
224    def transaction(self):
225        """A context within which files are committed together upon exit
226
227        Requires the file class to implement `.commit()` and `.discard()`
228        for the normal and exception cases.
229        """
230        if self._transaction is None:
231            self._transaction = Transaction(self)
232        return self._transaction
233
234    def start_transaction(self):
235        """Begin write transaction for deferring files, non-context version"""
236        self._intrans = True
237        self._transaction = Transaction(self)
238        return self.transaction
239
240    def end_transaction(self):
241        """Finish write transaction, non-context version"""
242        self.transaction.complete()
243        self._transaction = None
244        # The invalid cache must be cleared after the transcation is completed.
245        for path in self._invalidated_caches_in_transaction:
246            self.invalidate_cache(path)
247        self._invalidated_caches_in_transaction.clear()
248
249    def invalidate_cache(self, path=None):
250        """
251        Discard any cached directory information
252
253        Parameters
254        ----------
255        path: string or None
256            If None, clear all listings cached else listings at or under given
257            path.
258        """
259        # Not necessary to implement invalidation mechanism, may have no cache.
260        # But if have, you should call this method of parent class from your
261        # subclass to ensure expiring caches after transacations correctly.
262        # See the implementation of FTPFileSystem in ftp.py
263        if self._intrans:
264            self._invalidated_caches_in_transaction.append(path)
265
266    def mkdir(self, path, create_parents=True, **kwargs):
267        """
268        Create directory entry at path
269
270        For systems that don't have true directories, may create an for
271        this instance only and not touch the real filesystem
272
273        Parameters
274        ----------
275        path: str
276            location
277        create_parents: bool
278            if True, this is equivalent to ``makedirs``
279        kwargs:
280            may be permissions, etc.
281        """
282        pass  # not necessary to implement, may not have directories
283
284    def makedirs(self, path, exist_ok=False):
285        """Recursively make directories
286
287        Creates directory at path and any intervening required directories.
288        Raises exception if, for instance, the path already exists but is a
289        file.
290
291        Parameters
292        ----------
293        path: str
294            leaf directory name
295        exist_ok: bool (False)
296            If False, will error if the target already exists
297        """
298        pass  # not necessary to implement, may not have directories
299
300    def rmdir(self, path):
301        """Remove a directory, if empty"""
302        pass  # not necessary to implement, may not have directories
303
304    def ls(self, path, detail=True, **kwargs):
305        """List objects at path.
306
307        This should include subdirectories and files at that location. The
308        difference between a file and a directory must be clear when details
309        are requested.
310
311        The specific keys, or perhaps a FileInfo class, or similar, is TBD,
312        but must be consistent across implementations.
313        Must include:
314
315        - full path to the entry (without protocol)
316        - size of the entry, in bytes. If the value cannot be determined, will
317          be ``None``.
318        - type of entry, "file", "directory" or other
319
320        Additional information
321        may be present, aproriate to the file-system, e.g., generation,
322        checksum, etc.
323
324        May use refresh=True|False to allow use of self._ls_from_cache to
325        check for a saved listing and avoid calling the backend. This would be
326        common where listing may be expensive.
327
328        Parameters
329        ----------
330        path: str
331        detail: bool
332            if True, gives a list of dictionaries, where each is the same as
333            the result of ``info(path)``. If False, gives a list of paths
334            (str).
335        kwargs: may have additional backend-specific options, such as version
336            information
337
338        Returns
339        -------
340        List of strings if detail is False, or list of directory information
341        dicts if detail is True.
342        """
343        raise NotImplementedError
344
345    def _ls_from_cache(self, path):
346        """Check cache for listing
347
348        Returns listing, if found (may me empty list for a directly that exists
349        but contains nothing), None if not in cache.
350        """
351        parent = self._parent(path)
352        if path.rstrip("/") in self.dircache:
353            return self.dircache[path.rstrip("/")]
354        try:
355            files = [
356                f
357                for f in self.dircache[parent]
358                if f["name"] == path
359                or (f["name"] == path.rstrip("/") and f["type"] == "directory")
360            ]
361            if len(files) == 0:
362                # parent dir was listed but did not contain this file
363                raise FileNotFoundError(path)
364            return files
365        except KeyError:
366            pass
367
368    def walk(self, path, maxdepth=None, **kwargs):
369        """Return all files belows path
370
371        List all files, recursing into subdirectories; output is iterator-style,
372        like ``os.walk()``. For a simple list of files, ``find()`` is available.
373
374        Note that the "files" outputted will include anything that is not
375        a directory, such as links.
376
377        Parameters
378        ----------
379        path: str
380            Root to recurse into
381        maxdepth: int
382            Maximum recursion depth. None means limitless, but not recommended
383            on link-based file-systems.
384        kwargs: passed to ``ls``
385        """
386        path = self._strip_protocol(path)
387        full_dirs = {}
388        dirs = {}
389        files = {}
390
391        detail = kwargs.pop("detail", False)
392        try:
393            listing = self.ls(path, detail=True, **kwargs)
394        except (FileNotFoundError, IOError):
395            if detail:
396                return path, {}, {}
397            return path, [], []
398
399        for info in listing:
400            # each info name must be at least [path]/part , but here
401            # we check also for names like [path]/part/
402            pathname = info["name"].rstrip("/")
403            name = pathname.rsplit("/", 1)[-1]
404            if info["type"] == "directory" and pathname != path:
405                # do not include "self" path
406                full_dirs[pathname] = info
407                dirs[name] = info
408            elif pathname == path:
409                # file-like with same name as give path
410                files[""] = info
411            else:
412                files[name] = info
413
414        if detail:
415            yield path, dirs, files
416        else:
417            yield path, list(dirs), list(files)
418
419        if maxdepth is not None:
420            maxdepth -= 1
421            if maxdepth < 1:
422                return
423
424        for d in full_dirs:
425            yield from self.walk(d, maxdepth=maxdepth, detail=detail, **kwargs)
426
427    def find(self, path, maxdepth=None, withdirs=False, **kwargs):
428        """List all files below path.
429
430        Like posix ``find`` command without conditions
431
432        Parameters
433        ----------
434        path : str
435        maxdepth: int or None
436            If not None, the maximum number of levels to descend
437        withdirs: bool
438            Whether to include directory paths in the output. This is True
439            when used by glob, but users usually only want files.
440        kwargs are passed to ``ls``.
441        """
442        # TODO: allow equivalent of -name parameter
443        path = self._strip_protocol(path)
444        out = dict()
445        detail = kwargs.pop("detail", False)
446        for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
447            if withdirs:
448                files.update(dirs)
449            out.update({info["name"]: info for name, info in files.items()})
450        if not out and self.isfile(path):
451            # walk works on directories, but find should also return [path]
452            # when path happens to be a file
453            out[path] = {}
454        names = sorted(out)
455        if not detail:
456            return names
457        else:
458            return {name: out[name] for name in names}
459
460    def du(self, path, total=True, maxdepth=None, **kwargs):
461        """Space used by files within a path
462
463        Parameters
464        ----------
465        path: str
466        total: bool
467            whether to sum all the file sizes
468        maxdepth: int or None
469            maximum number of directory levels to descend, None for unlimited.
470        kwargs: passed to ``ls``
471
472        Returns
473        -------
474        Dict of {fn: size} if total=False, or int otherwise, where numbers
475        refer to bytes used.
476        """
477        sizes = {}
478        for f in self.find(path, maxdepth=maxdepth, **kwargs):
479            info = self.info(f)
480            sizes[info["name"]] = info["size"]
481        if total:
482            return sum(sizes.values())
483        else:
484            return sizes
485
486    def glob(self, path, **kwargs):
487        """
488        Find files by glob-matching.
489
490        If the path ends with '/' and does not contain "*", it is essentially
491        the same as ``ls(path)``, returning only files.
492
493        We support ``"**"``,
494        ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
495
496        Search path names that contain embedded characters special to this
497        implementation of glob may not produce expected results;
498        e.g., 'foo/bar/*starredfilename*'.
499
500        kwargs are passed to ``ls``.
501        """
502        import re
503
504        ends = path.endswith("/")
505        path = self._strip_protocol(path)
506        indstar = path.find("*") if path.find("*") >= 0 else len(path)
507        indques = path.find("?") if path.find("?") >= 0 else len(path)
508        indbrace = path.find("[") if path.find("[") >= 0 else len(path)
509
510        ind = min(indstar, indques, indbrace)
511
512        detail = kwargs.pop("detail", False)
513
514        if not has_magic(path):
515            root = path
516            depth = 1
517            if ends:
518                path += "/*"
519            elif self.exists(path):
520                if not detail:
521                    return [path]
522                else:
523                    return {path: self.info(path)}
524            else:
525                if not detail:
526                    return []  # glob of non-existent returns empty
527                else:
528                    return {}
529        elif "/" in path[:ind]:
530            ind2 = path[:ind].rindex("/")
531            root = path[: ind2 + 1]
532            depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
533        else:
534            root = ""
535            depth = None if "**" in path else path[ind + 1 :].count("/") + 1
536
537        allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
538        # Escape characters special to python regex, leaving our supported
539        # special characters in place.
540        # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
541        # for shell globbing details.
542        pattern = (
543            "^"
544            + (
545                path.replace("\\", r"\\")
546                .replace(".", r"\.")
547                .replace("+", r"\+")
548                .replace("//", "/")
549                .replace("(", r"\(")
550                .replace(")", r"\)")
551                .replace("|", r"\|")
552                .replace("^", r"\^")
553                .replace("$", r"\$")
554                .replace("{", r"\{")
555                .replace("}", r"\}")
556                .rstrip("/")
557                .replace("?", ".")
558            )
559            + "$"
560        )
561        pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
562        pattern = re.sub("[*]", "[^/]*", pattern)
563        pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
564        out = {
565            p: allpaths[p]
566            for p in sorted(allpaths)
567            if pattern.match(p.replace("//", "/").rstrip("/"))
568        }
569        if detail:
570            return out
571        else:
572            return list(out)
573
574    def exists(self, path, **kwargs):
575        """Is there a file at the given path"""
576        try:
577            self.info(path, **kwargs)
578            return True
579        except:  # noqa: E722
580            # any exception allowed bar FileNotFoundError?
581            return False
582
583    def lexists(self, path, **kwargs):
584        """If there is a file at the given path (including
585        broken links)"""
586        return self.exists(path)
587
588    def info(self, path, **kwargs):
589        """Give details of entry at path
590
591        Returns a single dictionary, with exactly the same information as ``ls``
592        would with ``detail=True``.
593
594        The default implementation should calls ls and could be overridden by a
595        shortcut. kwargs are passed on to ```ls()``.
596
597        Some file systems might not be able to measure the file's size, in
598        which case, the returned dict will include ``'size': None``.
599
600        Returns
601        -------
602        dict with keys: name (full path in the FS), size (in bytes), type (file,
603        directory, or something else) and other FS-specific keys.
604        """
605        path = self._strip_protocol(path)
606        out = self.ls(self._parent(path), detail=True, **kwargs)
607        out = [o for o in out if o["name"].rstrip("/") == path]
608        if out:
609            return out[0]
610        out = self.ls(path, detail=True, **kwargs)
611        path = path.rstrip("/")
612        out1 = [o for o in out if o["name"].rstrip("/") == path]
613        if len(out1) == 1:
614            if "size" not in out1[0]:
615                out1[0]["size"] = None
616            return out1[0]
617        elif len(out1) > 1 or out:
618            return {"name": path, "size": 0, "type": "directory"}
619        else:
620            raise FileNotFoundError(path)
621
622    def checksum(self, path):
623        """Unique value for current version of file
624
625        If the checksum is the same from one moment to another, the contents
626        are guaranteed to be the same. If the checksum changes, the contents
627        *might* have changed.
628
629        This should normally be overridden; default will probably capture
630        creation/modification timestamp (which would be good) or maybe
631        access timestamp (which would be bad)
632        """
633        return int(tokenize(self.info(path)), 16)
634
635    def size(self, path):
636        """Size in bytes of file"""
637        return self.info(path).get("size", None)
638
639    def sizes(self, paths):
640        """Size in bytes of each file in a list of paths"""
641        return [self.size(p) for p in paths]
642
643    def isdir(self, path):
644        """Is this entry directory-like?"""
645        try:
646            return self.info(path)["type"] == "directory"
647        except IOError:
648            return False
649
650    def isfile(self, path):
651        """Is this entry file-like?"""
652        try:
653            return self.info(path)["type"] == "file"
654        except:  # noqa: E722
655            return False
656
657    def cat_file(self, path, start=None, end=None, **kwargs):
658        """Get the content of a file
659
660        Parameters
661        ----------
662        path: URL of file on this filesystems
663        start, end: int
664            Bytes limits of the read. If negative, backwards from end,
665            like usual python slices. Either can be None for start or
666            end of file, respectively
667        kwargs: passed to ``open()``.
668        """
669        # explicitly set buffering off?
670        with self.open(path, "rb", **kwargs) as f:
671            if start is not None:
672                if start >= 0:
673                    f.seek(start)
674                else:
675                    f.seek(max(0, f.size + start))
676            if end is not None:
677                if end < 0:
678                    end = f.size + end
679                return f.read(end - f.tell())
680            return f.read()
681
682    def pipe_file(self, path, value, **kwargs):
683        """Set the bytes of given file"""
684        with self.open(path, "wb") as f:
685            f.write(value)
686
687    def pipe(self, path, value=None, **kwargs):
688        """Put value into path
689
690        (counterpart to ``cat``)
691
692        Parameters
693        ----------
694        path: string or dict(str, bytes)
695            If a string, a single remote location to put ``value`` bytes; if a dict,
696            a mapping of {path: bytesvalue}.
697        value: bytes, optional
698            If using a single path, these are the bytes to put there. Ignored if
699            ``path`` is a dict
700        """
701        if isinstance(path, str):
702            self.pipe_file(self._strip_protocol(path), value, **kwargs)
703        elif isinstance(path, dict):
704            for k, v in path.items():
705                self.pipe_file(self._strip_protocol(k), v, **kwargs)
706        else:
707            raise ValueError("path must be str or dict")
708
709    def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs):
710        if max_gap is not None:
711            raise NotImplementedError
712        if not isinstance(paths, list):
713            raise TypeError
714        if not isinstance(starts, list):
715            starts = [starts] * len(paths)
716        if not isinstance(ends, list):
717            ends = [starts] * len(paths)
718        if len(starts) != len(paths) or len(ends) != len(paths):
719            raise ValueError
720        return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)]
721
722    def cat(self, path, recursive=False, on_error="raise", **kwargs):
723        """Fetch (potentially multiple) paths' contents
724
725        Parameters
726        ----------
727        recursive: bool
728            If True, assume the path(s) are directories, and get all the
729            contained files
730        on_error : "raise", "omit", "return"
731            If raise, an underlying exception will be raised (converted to KeyError
732            if the type is in self.missing_exceptions); if omit, keys with exception
733            will simply not be included in the output; if "return", all keys are
734            included in the output, but the value will be bytes or an exception
735            instance.
736        kwargs: passed to cat_file
737
738        Returns
739        -------
740        dict of {path: contents} if there are multiple paths
741        or the path has been otherwise expanded
742        """
743        paths = self.expand_path(path, recursive=recursive)
744        if (
745            len(paths) > 1
746            or isinstance(path, list)
747            or paths[0] != self._strip_protocol(path)
748        ):
749            out = {}
750            for path in paths:
751                try:
752                    out[path] = self.cat_file(path, **kwargs)
753                except Exception as e:
754                    if on_error == "raise":
755                        raise
756                    if on_error == "return":
757                        out[path] = e
758            return out
759        else:
760            return self.cat_file(paths[0], **kwargs)
761
762    def get_file(self, rpath, lpath, callback=_DEFAULT_CALLBACK, **kwargs):
763        """Copy single remote file to local"""
764        if self.isdir(rpath):
765            os.makedirs(lpath, exist_ok=True)
766            return None
767
768        with self.open(rpath, "rb", **kwargs) as f1:
769            callback.set_size(getattr(f1, "size", None))
770            with open(lpath, "wb") as f2:
771                data = True
772                while data:
773                    data = f1.read(self.blocksize)
774                    segment_len = f2.write(data)
775                    callback.relative_update(segment_len)
776
777    def get(self, rpath, lpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs):
778        """Copy file(s) to local.
779
780        Copies a specific file or tree of files (if recursive=True). If lpath
781        ends with a "/", it will be assumed to be a directory, and target files
782        will go within. Can submit a list of paths, which may be glob-patterns
783        and will be expanded.
784
785        Calls get_file for each source.
786        """
787        from .implementations.local import make_path_posix
788
789        if isinstance(lpath, str):
790            lpath = make_path_posix(lpath)
791        rpaths = self.expand_path(rpath, recursive=recursive)
792        lpaths = other_paths(rpaths, lpath)
793
794        callback.set_size(len(lpaths))
795        for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
796            callback.branch(rpath, lpath, kwargs)
797            self.get_file(rpath, lpath, **kwargs)
798
799    def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs):
800        """Copy single file to remote"""
801        if os.path.isdir(lpath):
802            self.makedirs(rpath, exist_ok=True)
803            return None
804
805        with open(lpath, "rb") as f1:
806            callback.set_size(f1.seek(0, 2))
807            f1.seek(0)
808
809            self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
810            with self.open(rpath, "wb", **kwargs) as f2:
811                data = True
812                while data:
813                    data = f1.read(self.blocksize)
814                    segment_len = f2.write(data)
815                    callback.relative_update(segment_len)
816
817    def put(self, lpath, rpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs):
818        """Copy file(s) from local.
819
820        Copies a specific file or tree of files (if recursive=True). If rpath
821        ends with a "/", it will be assumed to be a directory, and target files
822        will go within.
823
824        Calls put_file for each source.
825        """
826        from .implementations.local import LocalFileSystem, make_path_posix
827
828        rpath = (
829            self._strip_protocol(rpath)
830            if isinstance(rpath, str)
831            else [self._strip_protocol(p) for p in rpath]
832        )
833        if isinstance(lpath, str):
834            lpath = make_path_posix(lpath)
835        fs = LocalFileSystem()
836        lpaths = fs.expand_path(lpath, recursive=recursive)
837        rpaths = other_paths(
838            lpaths, rpath, exists=isinstance(rpath, str) and self.isdir(rpath)
839        )
840
841        callback.set_size(len(rpaths))
842        for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
843            callback.branch(lpath, rpath, kwargs)
844            self.put_file(lpath, rpath, **kwargs)
845
846    def head(self, path, size=1024):
847        """Get the first ``size`` bytes from file"""
848        with self.open(path, "rb") as f:
849            return f.read(size)
850
851    def tail(self, path, size=1024):
852        """Get the last ``size`` bytes from file"""
853        with self.open(path, "rb") as f:
854            f.seek(max(-size, -f.size), 2)
855            return f.read()
856
857    def cp_file(self, path1, path2, **kwargs):
858        raise NotImplementedError
859
860    def copy(self, path1, path2, recursive=False, on_error=None, **kwargs):
861        """Copy within two locations in the filesystem
862
863        on_error : "raise", "ignore"
864            If raise, any not-found exceptions will be raised; if ignore any
865            not-found exceptions will cause the path to be skipped; defaults to
866            raise unless recursive is true, where the default is ignore
867        """
868        if on_error is None and recursive:
869            on_error = "ignore"
870        elif on_error is None:
871            on_error = "raise"
872
873        paths = self.expand_path(path1, recursive=recursive)
874        path2 = other_paths(paths, path2)
875        for p1, p2 in zip(paths, path2):
876            try:
877                self.cp_file(p1, p2, **kwargs)
878            except FileNotFoundError:
879                if on_error == "raise":
880                    raise
881
882    def expand_path(self, path, recursive=False, maxdepth=None):
883        """Turn one or more globs or directories into a list of all matching paths
884        to files or directories."""
885        if isinstance(path, str):
886            out = self.expand_path([path], recursive, maxdepth)
887        else:
888            # reduce depth on each recursion level unless None or 0
889            maxdepth = maxdepth if not maxdepth else maxdepth - 1
890            out = set()
891            path = [self._strip_protocol(p) for p in path]
892            for p in path:
893                if has_magic(p):
894                    bit = set(self.glob(p))
895                    out |= bit
896                    if recursive:
897                        out |= set(
898                            self.expand_path(
899                                list(bit), recursive=recursive, maxdepth=maxdepth
900                            )
901                        )
902                    continue
903                elif recursive:
904                    rec = set(self.find(p, maxdepth=maxdepth, withdirs=True))
905                    out |= rec
906                if p not in out and (recursive is False or self.exists(p)):
907                    # should only check once, for the root
908                    out.add(p)
909        if not out:
910            raise FileNotFoundError(path)
911        return list(sorted(out))
912
913    def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
914        """Move file(s) from one location to another"""
915        self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
916        self.rm(path1, recursive=recursive)
917
918    def rm_file(self, path):
919        """Delete a file"""
920        self._rm(path)
921
922    def _rm(self, path):
923        """Delete one file"""
924        # this is the old name for the method, prefer rm_file
925        raise NotImplementedError
926
927    def rm(self, path, recursive=False, maxdepth=None):
928        """Delete files.
929
930        Parameters
931        ----------
932        path: str or list of str
933            File(s) to delete.
934        recursive: bool
935            If file(s) are directories, recursively delete contents and then
936            also remove the directory
937        maxdepth: int or None
938            Depth to pass to walk for finding files to delete, if recursive.
939            If None, there will be no limit and infinite recursion may be
940            possible.
941        """
942        path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
943        for p in reversed(path):
944            self.rm_file(p)
945
946    @classmethod
947    def _parent(cls, path):
948        path = cls._strip_protocol(path.rstrip("/"))
949        if "/" in path:
950            parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
951            return cls.root_marker + parent
952        else:
953            return cls.root_marker
954
955    def _open(
956        self,
957        path,
958        mode="rb",
959        block_size=None,
960        autocommit=True,
961        cache_options=None,
962        **kwargs,
963    ):
964        """Return raw bytes-mode file-like from the file-system"""
965        return AbstractBufferedFile(
966            self,
967            path,
968            mode,
969            block_size,
970            autocommit,
971            cache_options=cache_options,
972            **kwargs,
973        )
974
975    def open(
976        self,
977        path,
978        mode="rb",
979        block_size=None,
980        cache_options=None,
981        compression=None,
982        **kwargs,
983    ):
984        """
985        Return a file-like object from the filesystem
986
987        The resultant instance must function correctly in a context ``with``
988        block.
989
990        Parameters
991        ----------
992        path: str
993            Target file
994        mode: str like 'rb', 'w'
995            See builtin ``open()``
996        block_size: int
997            Some indication of buffering - this is a value in bytes
998        cache_options : dict, optional
999            Extra arguments to pass through to the cache.
1000        compression: string or None
1001            If given, open file using compression codec. Can either be a compression
1002            name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1003            compression from the filename suffix.
1004        encoding, errors, newline: passed on to TextIOWrapper for text mode
1005        """
1006        import io
1007
1008        path = self._strip_protocol(path)
1009        if "b" not in mode:
1010            mode = mode.replace("t", "") + "b"
1011
1012            text_kwargs = {
1013                k: kwargs.pop(k)
1014                for k in ["encoding", "errors", "newline"]
1015                if k in kwargs
1016            }
1017            return io.TextIOWrapper(
1018                self.open(
1019                    path,
1020                    mode,
1021                    block_size=block_size,
1022                    cache_options=cache_options,
1023                    compression=compression,
1024                    **kwargs,
1025                ),
1026                **text_kwargs,
1027            )
1028        else:
1029            ac = kwargs.pop("autocommit", not self._intrans)
1030            f = self._open(
1031                path,
1032                mode=mode,
1033                block_size=block_size,
1034                autocommit=ac,
1035                cache_options=cache_options,
1036                **kwargs,
1037            )
1038            if compression is not None:
1039                from fsspec.compression import compr
1040                from fsspec.core import get_compression
1041
1042                compression = get_compression(path, compression)
1043                compress = compr[compression]
1044                f = compress(f, mode=mode[0])
1045
1046            if not ac and "r" not in mode:
1047                self.transaction.files.append(f)
1048            return f
1049
1050    def touch(self, path, truncate=True, **kwargs):
1051        """Create empty file, or update timestamp
1052
1053        Parameters
1054        ----------
1055        path: str
1056            file location
1057        truncate: bool
1058            If True, always set file size to 0; if False, update timestamp and
1059            leave file unchanged, if backend allows this
1060        """
1061        if truncate or not self.exists(path):
1062            with self.open(path, "wb", **kwargs):
1063                pass
1064        else:
1065            raise NotImplementedError  # update timestamp, if possible
1066
1067    def ukey(self, path):
1068        """Hash of file properties, to tell if it has changed"""
1069        return sha256(str(self.info(path)).encode()).hexdigest()
1070
1071    def read_block(self, fn, offset, length, delimiter=None):
1072        """Read a block of bytes from
1073
1074        Starting at ``offset`` of the file, read ``length`` bytes.  If
1075        ``delimiter`` is set then we ensure that the read starts and stops at
1076        delimiter boundaries that follow the locations ``offset`` and ``offset
1077        + length``.  If ``offset`` is zero then we start at zero.  The
1078        bytestring returned WILL include the end delimiter string.
1079
1080        If offset+length is beyond the eof, reads to eof.
1081
1082        Parameters
1083        ----------
1084        fn: string
1085            Path to filename
1086        offset: int
1087            Byte offset to start read
1088        length: int
1089            Number of bytes to read
1090        delimiter: bytes (optional)
1091            Ensure reading starts and stops at delimiter bytestring
1092
1093        Examples
1094        --------
1095        >>> fs.read_block('data/file.csv', 0, 13)  # doctest: +SKIP
1096        b'Alice, 100\\nBo'
1097        >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n')  # doctest: +SKIP
1098        b'Alice, 100\\nBob, 200\\n'
1099
1100        Use ``length=None`` to read to the end of the file.
1101        >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n')  # doctest: +SKIP
1102        b'Alice, 100\\nBob, 200\\nCharlie, 300'
1103
1104        See Also
1105        --------
1106        utils.read_block
1107        """
1108        with self.open(fn, "rb") as f:
1109            size = f.size
1110            if length is None:
1111                length = size
1112            if size is not None and offset + length > size:
1113                length = size - offset
1114            return read_block(f, offset, length, delimiter)
1115
1116    def to_json(self):
1117        """
1118        JSON representation of this filesystem instance
1119
1120        Returns
1121        -------
1122        str: JSON structure with keys cls (the python location of this class),
1123            protocol (text name of this class's protocol, first one in case of
1124            multiple), args (positional args, usually empty), and all other
1125            kwargs as their own keys.
1126        """
1127        import json
1128
1129        cls = type(self)
1130        cls = ".".join((cls.__module__, cls.__name__))
1131        proto = (
1132            self.protocol[0]
1133            if isinstance(self.protocol, (tuple, list))
1134            else self.protocol
1135        )
1136        return json.dumps(
1137            dict(
1138                **{"cls": cls, "protocol": proto, "args": self.storage_args},
1139                **self.storage_options,
1140            )
1141        )
1142
1143    @staticmethod
1144    def from_json(blob):
1145        """
1146        Recreate a filesystem instance from JSON representation
1147
1148        See ``.to_json()`` for the expected structure of the input
1149
1150        Parameters
1151        ----------
1152        blob: str
1153
1154        Returns
1155        -------
1156        file system instance, not necessarily of this particular class.
1157        """
1158        import json
1159
1160        from .registry import _import_class, get_filesystem_class
1161
1162        dic = json.loads(blob)
1163        protocol = dic.pop("protocol")
1164        try:
1165            cls = _import_class(dic.pop("cls"))
1166        except (ImportError, ValueError, RuntimeError, KeyError):
1167            cls = get_filesystem_class(protocol)
1168        return cls(*dic.pop("args", ()), **dic)
1169
1170    def _get_pyarrow_filesystem(self):
1171        """
1172        Make a version of the FS instance which will be acceptable to pyarrow
1173        """
1174        # all instances already also derive from pyarrow
1175        return self
1176
1177    def get_mapper(self, root, check=False, create=False):
1178        """Create key/value store based on this file-system
1179
1180        Makes a MutableMapping interface to the FS at the given root path.
1181        See ``fsspec.mapping.FSMap`` for further details.
1182        """
1183        from .mapping import FSMap
1184
1185        return FSMap(root, self, check, create)
1186
1187    @classmethod
1188    def clear_instance_cache(cls):
1189        """
1190        Clear the cache of filesystem instances.
1191
1192        Notes
1193        -----
1194        Unless overridden by setting the ``cachable`` class attribute to False,
1195        the filesystem class stores a reference to newly created instances. This
1196        prevents Python's normal rules around garbage collection from working,
1197        since the instances refcount will not drop to zero until
1198        ``clear_instance_cache`` is called.
1199        """
1200        cls._cache.clear()
1201
1202    def created(self, path):
1203        """Return the created timestamp of a file as a datetime.datetime"""
1204        raise NotImplementedError
1205
1206    def modified(self, path):
1207        """Return the modified timestamp of a file as a datetime.datetime"""
1208        raise NotImplementedError
1209
1210    # ------------------------------------------------------------------------
1211    # Aliases
1212
1213    def makedir(self, path, create_parents=True, **kwargs):
1214        """Alias of `AbstractFileSystem.mkdir`."""
1215        return self.mkdir(path, create_parents=create_parents, **kwargs)
1216
1217    def mkdirs(self, path, exist_ok=False):
1218        """Alias of `AbstractFileSystem.makedirs`."""
1219        return self.makedirs(path, exist_ok=exist_ok)
1220
1221    def listdir(self, path, detail=True, **kwargs):
1222        """Alias of `AbstractFileSystem.ls`."""
1223        return self.ls(path, detail=detail, **kwargs)
1224
1225    def cp(self, path1, path2, **kwargs):
1226        """Alias of `AbstractFileSystem.copy`."""
1227        return self.copy(path1, path2, **kwargs)
1228
1229    def move(self, path1, path2, **kwargs):
1230        """Alias of `AbstractFileSystem.mv`."""
1231        return self.mv(path1, path2, **kwargs)
1232
1233    def stat(self, path, **kwargs):
1234        """Alias of `AbstractFileSystem.info`."""
1235        return self.info(path, **kwargs)
1236
1237    def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1238        """Alias of `AbstractFileSystem.du`."""
1239        return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1240
1241    def rename(self, path1, path2, **kwargs):
1242        """Alias of `AbstractFileSystem.mv`."""
1243        return self.mv(path1, path2, **kwargs)
1244
1245    def delete(self, path, recursive=False, maxdepth=None):
1246        """Alias of `AbstractFileSystem.rm`."""
1247        return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1248
1249    def upload(self, lpath, rpath, recursive=False, **kwargs):
1250        """Alias of `AbstractFileSystem.put`."""
1251        return self.put(lpath, rpath, recursive=recursive, **kwargs)
1252
1253    def download(self, rpath, lpath, recursive=False, **kwargs):
1254        """Alias of `AbstractFileSystem.get`."""
1255        return self.get(rpath, lpath, recursive=recursive, **kwargs)
1256
1257    def sign(self, path, expiration=100, **kwargs):
1258        """Create a signed URL representing the given path
1259
1260        Some implementations allow temporary URLs to be generated, as a
1261        way of delegating credentials.
1262
1263        Parameters
1264        ----------
1265        path : str
1266             The path on the filesystem
1267        expiration : int
1268            Number of seconds to enable the URL for (if supported)
1269
1270        Returns
1271        -------
1272        URL : str
1273            The signed URL
1274
1275        Raises
1276        ------
1277        NotImplementedError : if method is not implemented for a filesystem
1278        """
1279        raise NotImplementedError("Sign is not implemented for this filesystem")
1280
1281    def _isfilestore(self):
1282        # Originally inherited from pyarrow DaskFileSystem. Keeping this
1283        # here for backwards compatibility as long as pyarrow uses its
1284        # legacy fsspec-compatible filesystems and thus accepts fsspec
1285        # filesystems as well
1286        return False
1287
1288
1289class AbstractBufferedFile(io.IOBase):
1290    """Convenient class to derive from to provide buffering
1291
1292    In the case that the backend does not provide a pythonic file-like object
1293    already, this class contains much of the logic to build one. The only
1294    methods that need to be overridden are ``_upload_chunk``,
1295    ``_initiate_upload`` and ``_fetch_range``.
1296    """
1297
1298    DEFAULT_BLOCK_SIZE = 5 * 2 ** 20
1299    _details = None
1300
1301    def __init__(
1302        self,
1303        fs,
1304        path,
1305        mode="rb",
1306        block_size="default",
1307        autocommit=True,
1308        cache_type="readahead",
1309        cache_options=None,
1310        size=None,
1311        **kwargs,
1312    ):
1313        """
1314        Template for files with buffered reading and writing
1315
1316        Parameters
1317        ----------
1318        fs: instance of FileSystem
1319        path: str
1320            location in file-system
1321        mode: str
1322            Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1323            systems may be read-only, and some may not support append.
1324        block_size: int
1325            Buffer size for reading or writing, 'default' for class default
1326        autocommit: bool
1327            Whether to write to final destination; may only impact what
1328            happens when file is being closed.
1329        cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1330            Caching policy in read mode. See the definitions in ``core``.
1331        cache_options : dict
1332            Additional options passed to the constructor for the cache specified
1333            by `cache_type`.
1334        size: int
1335            If given and in read mode, suppressed having to look up the file size
1336        kwargs:
1337            Gets stored as self.kwargs
1338        """
1339        from .core import caches
1340
1341        self.path = path
1342        self.fs = fs
1343        self.mode = mode
1344        self.blocksize = (
1345            self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1346        )
1347        self.loc = 0
1348        self.autocommit = autocommit
1349        self.end = None
1350        self.start = None
1351        self.closed = False
1352
1353        if cache_options is None:
1354            cache_options = {}
1355
1356        if "trim" in kwargs:
1357            warnings.warn(
1358                "Passing 'trim' to control the cache behavior has been deprecated. "
1359                "Specify it within the 'cache_options' argument instead.",
1360                FutureWarning,
1361            )
1362            cache_options["trim"] = kwargs.pop("trim")
1363
1364        self.kwargs = kwargs
1365
1366        if mode not in {"ab", "rb", "wb"}:
1367            raise NotImplementedError("File mode not supported")
1368        if mode == "rb":
1369            if size is not None:
1370                self.size = size
1371            else:
1372                self.size = self.details["size"]
1373            self.cache = caches[cache_type](
1374                self.blocksize, self._fetch_range, self.size, **cache_options
1375            )
1376        else:
1377            self.buffer = io.BytesIO()
1378            self.offset = None
1379            self.forced = False
1380            self.location = None
1381
1382    @property
1383    def details(self):
1384        if self._details is None:
1385            self._details = self.fs.info(self.path)
1386        return self._details
1387
1388    @details.setter
1389    def details(self, value):
1390        self._details = value
1391        self.size = value["size"]
1392
1393    @property
1394    def full_name(self):
1395        return _unstrip_protocol(self.path, self.fs)
1396
1397    @property
1398    def closed(self):
1399        # get around this attr being read-only in IOBase
1400        # use getattr here, since this can be called during del
1401        return getattr(self, "_closed", True)
1402
1403    @closed.setter
1404    def closed(self, c):
1405        self._closed = c
1406
1407    def __hash__(self):
1408        if "w" in self.mode:
1409            return id(self)
1410        else:
1411            return int(tokenize(self.details), 16)
1412
1413    def __eq__(self, other):
1414        """Files are equal if they have the same checksum, only in read mode"""
1415        return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other)
1416
1417    def commit(self):
1418        """Move from temp to final destination"""
1419
1420    def discard(self):
1421        """Throw away temporary file"""
1422
1423    def info(self):
1424        """File information about this path"""
1425        if "r" in self.mode:
1426            return self.details
1427        else:
1428            raise ValueError("Info not available while writing")
1429
1430    def tell(self):
1431        """Current file location"""
1432        return self.loc
1433
1434    def seek(self, loc, whence=0):
1435        """Set current file location
1436
1437        Parameters
1438        ----------
1439        loc: int
1440            byte location
1441        whence: {0, 1, 2}
1442            from start of file, current location or end of file, resp.
1443        """
1444        loc = int(loc)
1445        if not self.mode == "rb":
1446            raise OSError(ESPIPE, "Seek only available in read mode")
1447        if whence == 0:
1448            nloc = loc
1449        elif whence == 1:
1450            nloc = self.loc + loc
1451        elif whence == 2:
1452            nloc = self.size + loc
1453        else:
1454            raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence)
1455        if nloc < 0:
1456            raise ValueError("Seek before start of file")
1457        self.loc = nloc
1458        return self.loc
1459
1460    def write(self, data):
1461        """
1462        Write data to buffer.
1463
1464        Buffer only sent on flush() or if buffer is greater than
1465        or equal to blocksize.
1466
1467        Parameters
1468        ----------
1469        data: bytes
1470            Set of bytes to be written.
1471        """
1472        if self.mode not in {"wb", "ab"}:
1473            raise ValueError("File not in write mode")
1474        if self.closed:
1475            raise ValueError("I/O operation on closed file.")
1476        if self.forced:
1477            raise ValueError("This file has been force-flushed, can only close")
1478        out = self.buffer.write(data)
1479        self.loc += out
1480        if self.buffer.tell() >= self.blocksize:
1481            self.flush()
1482        return out
1483
1484    def flush(self, force=False):
1485        """
1486        Write buffered data to backend store.
1487
1488        Writes the current buffer, if it is larger than the block-size, or if
1489        the file is being closed.
1490
1491        Parameters
1492        ----------
1493        force: bool
1494            When closing, write the last block even if it is smaller than
1495            blocks are allowed to be. Disallows further writing to this file.
1496        """
1497
1498        if self.closed:
1499            raise ValueError("Flush on closed file")
1500        if force and self.forced:
1501            raise ValueError("Force flush cannot be called more than once")
1502        if force:
1503            self.forced = True
1504
1505        if self.mode not in {"wb", "ab"}:
1506            # no-op to flush on read-mode
1507            return
1508
1509        if not force and self.buffer.tell() < self.blocksize:
1510            # Defer write on small block
1511            return
1512
1513        if self.offset is None:
1514            # Initialize a multipart upload
1515            self.offset = 0
1516            try:
1517                self._initiate_upload()
1518            except:  # noqa: E722
1519                self.closed = True
1520                raise
1521
1522        if self._upload_chunk(final=force) is not False:
1523            self.offset += self.buffer.seek(0, 2)
1524            self.buffer = io.BytesIO()
1525
1526    def _upload_chunk(self, final=False):
1527        """Write one part of a multi-block file upload
1528
1529        Parameters
1530        ==========
1531        final: bool
1532            This is the last block, so should complete file, if
1533            self.autocommit is True.
1534        """
1535        # may not yet have been initialized, may need to call _initialize_upload
1536
1537    def _initiate_upload(self):
1538        """Create remote file/upload"""
1539        pass
1540
1541    def _fetch_range(self, start, end):
1542        """Get the specified set of bytes from remote"""
1543        raise NotImplementedError
1544
1545    def read(self, length=-1):
1546        """
1547        Return data from cache, or fetch pieces as necessary
1548
1549        Parameters
1550        ----------
1551        length: int (-1)
1552            Number of bytes to read; if <0, all remaining bytes.
1553        """
1554        length = -1 if length is None else int(length)
1555        if self.mode != "rb":
1556            raise ValueError("File not in read mode")
1557        if length < 0:
1558            length = self.size - self.loc
1559        if self.closed:
1560            raise ValueError("I/O operation on closed file.")
1561        logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
1562        if length == 0:
1563            # don't even bother calling fetch
1564            return b""
1565        out = self.cache._fetch(self.loc, self.loc + length)
1566        self.loc += len(out)
1567        return out
1568
1569    def readinto(self, b):
1570        """mirrors builtin file's readinto method
1571
1572        https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
1573        """
1574        out = memoryview(b).cast("B")
1575        data = self.read(out.nbytes)
1576        out[: len(data)] = data
1577        return len(data)
1578
1579    def readuntil(self, char=b"\n", blocks=None):
1580        """Return data between current position and first occurrence of char
1581
1582        char is included in the output, except if the end of the tile is
1583        encountered first.
1584
1585        Parameters
1586        ----------
1587        char: bytes
1588            Thing to find
1589        blocks: None or int
1590            How much to read in each go. Defaults to file blocksize - which may
1591            mean a new read on every call.
1592        """
1593        out = []
1594        while True:
1595            start = self.tell()
1596            part = self.read(blocks or self.blocksize)
1597            if len(part) == 0:
1598                break
1599            found = part.find(char)
1600            if found > -1:
1601                out.append(part[: found + len(char)])
1602                self.seek(start + found + len(char))
1603                break
1604            out.append(part)
1605        return b"".join(out)
1606
1607    def readline(self):
1608        """Read until first occurrence of newline character
1609
1610        Note that, because of character encoding, this is not necessarily a
1611        true line ending.
1612        """
1613        return self.readuntil(b"\n")
1614
1615    def __next__(self):
1616        out = self.readline()
1617        if out:
1618            return out
1619        raise StopIteration
1620
1621    def __iter__(self):
1622        return self
1623
1624    def readlines(self):
1625        """Return all data, split by the newline character"""
1626        data = self.read()
1627        lines = data.split(b"\n")
1628        out = [l + b"\n" for l in lines[:-1]]
1629        if data.endswith(b"\n"):
1630            return out
1631        else:
1632            return out + [lines[-1]]
1633        # return list(self)  ???
1634
1635    def readinto1(self, b):
1636        return self.readinto(b)
1637
1638    def close(self):
1639        """Close file
1640
1641        Finalizes writes, discards cache
1642        """
1643        if getattr(self, "_unclosable", False):
1644            return
1645        if self.closed:
1646            return
1647        if self.mode == "rb":
1648            self.cache = None
1649        else:
1650            if not self.forced:
1651                self.flush(force=True)
1652
1653            if self.fs is not None:
1654                self.fs.invalidate_cache(self.path)
1655                self.fs.invalidate_cache(self.fs._parent(self.path))
1656
1657        self.closed = True
1658
1659    def readable(self):
1660        """Whether opened for reading"""
1661        return self.mode == "rb" and not self.closed
1662
1663    def seekable(self):
1664        """Whether is seekable (only in read mode)"""
1665        return self.readable()
1666
1667    def writable(self):
1668        """Whether opened for writing"""
1669        return self.mode in {"wb", "ab"} and not self.closed
1670
1671    def __del__(self):
1672        if not self.closed:
1673            self.close()
1674
1675    def __str__(self):
1676        return "<File-like object %s, %s>" % (type(self.fs).__name__, self.path)
1677
1678    __repr__ = __str__
1679
1680    def __enter__(self):
1681        return self
1682
1683    def __exit__(self, *args):
1684        self.close()
1685