1from __future__ import absolute_import, division, print_function
3import io
4import logging
5import os
6import re
7from glob import has_magic
9# for backwards compat, we export cache things from here too
10from .caching import (  # noqa: F401
11    BaseCache,
12    BlockCache,
13    BytesCache,
14    MMapCache,
15    ReadAheadCache,
16    caches,
18from .compression import compr
19from .registry import filesystem, get_filesystem_class
20from .utils import (
21    _unstrip_protocol,
22    build_name_function,
23    infer_compression,
24    stringify_path,
25    update_storage_options,
28logger = logging.getLogger("fsspec")
31class OpenFile(object):
32    """
33    File-like object to be used in a context
35    Can layer (buffered) text-mode and compression over any file-system, which
36    are typically binary-only.
38    These instances are safe to serialize, as the low-level file object
39    is not created until invoked using `with`.
41    Parameters
42    ----------
43    fs: FileSystem
44        The file system to use for opening the file. Should be a subclass or duck-type
45        with ``fsspec.spec.AbstractFileSystem``
46    path: str
47        Location to open
48    mode: str like 'rb', optional
49        Mode of the opened file
50    compression: str or None, optional
51        Compression to apply
52    encoding: str or None, optional
53        The encoding to use if opened in text mode.
54    errors: str or None, optional
55        How to handle encoding errors if opened in text mode.
56    newline: None or str
57        Passed to TextIOWrapper in text mode, how to handle line endings.
58    """
60    def __init__(
61        self,
62        fs,
63        path,
64        mode="rb",
65        compression=None,
66        encoding=None,
67        errors=None,
68        newline=None,
69    ):
70        self.fs = fs
71        self.path = path
72        self.mode = mode
73        self.compression = get_compression(path, compression)
74        self.encoding = encoding
75        self.errors = errors
76        self.newline = newline
77        self.fobjects = []
79    def __reduce__(self):
80        return (
81            OpenFile,
82            (
83                self.fs,
84                self.path,
85                self.mode,
86                self.compression,
87                self.encoding,
88                self.errors,
89                self.newline,
90            ),
91        )
93    def __repr__(self):
94        return "<OpenFile '{}'>".format(self.path)
96    def __fspath__(self):
97        # may raise if cannot be resolved to local file
98        return self.open().__fspath__()
100    def __enter__(self):
101        mode = self.mode.replace("t", "").replace("b", "") + "b"
103        f = self.fs.open(self.path, mode=mode)
105        self.fobjects = [f]
107        if self.compression is not None:
108            compress = compr[self.compression]
109            f = compress(f, mode=mode[0])
110            self.fobjects.append(f)
112        if "b" not in self.mode:
113            # assume, for example, that 'r' is equivalent to 'rt' as in builtin
114            f = io.TextIOWrapper(
115                f, encoding=self.encoding, errors=self.errors, newline=self.newline
116            )
117            self.fobjects.append(f)
119        return self.fobjects[-1]
121    def __exit__(self, *args):
122        self.close()
124    def __del__(self):
125        if hasattr(self, "fobjects"):
126            self.fobjects.clear()  # may cause cleanup of objects and close files
128    @property
129    def full_name(self):
130        return _unstrip_protocol(self.path, self.fs)
132    def open(self):
133        """Materialise this as a real open file without context
135        The file should be explicitly closed to avoid enclosed file
136        instances persisting. This code-path monkey-patches the file-like
137        objects, so they can close even if the parent OpenFile object has already
138        been deleted; but a with-context is better style.
139        """
140        out = self.__enter__()
141        closer = out.close
142        fobjects = self.fobjects.copy()[:-1]
143        mode = self.mode
145        def close():
146            # this func has no reference to
147            closer()  # original close bound method of the final file-like
148            _close(fobjects, mode)  # call close on other dependent file-likes
150        out.close = close
151        return out
153    def close(self):
154        """Close all encapsulated file objects"""
155        _close(self.fobjects, self.mode)
158class OpenFiles(list):
159    """List of OpenFile instances
161    Can be used in a single context, which opens and closes all of the
162    contained files. Normal list access to get the elements works as
163    normal.
165    A special case is made for caching filesystems - the files will
166    be down/uploaded together at the start or end of the context, and
167    this may happen concurrently, if the target filesystem supports it.
168    """
170    def __init__(self, *args, mode="rb", fs=None):
171        self.mode = mode
172        self.fs = fs
173        self.files = []
174        super().__init__(*args)
176    def __enter__(self):
177        if self.fs is None:
178            raise ValueError("Context has already been used")
180        fs = self.fs
181        while True:
182            if hasattr(fs, "open_many"):
183                # check for concurrent cache download; or set up for upload
184                self.files = fs.open_many(self)
185                return self.files
186            if hasattr(fs, "fs") and fs.fs is not None:
187                fs = fs.fs
188            else:
189                break
190        return [s.__enter__() for s in self]
192    def __exit__(self, *args):
193        fs = self.fs
194        if "r" not in self.mode:
195            while True:
196                if hasattr(fs, "open_many"):
197                    # check for concurrent cache upload
198                    fs.commit_many(self.files)
199                    self.files.clear()
200                    return
201                if hasattr(fs, "fs") and fs.fs is not None:
202                    fs = fs.fs
203                else:
204                    break
205        [s.__exit__(*args) for s in self]
207    def __repr__(self):
208        return "<List of %s OpenFile instances>" % len(self)
211def _close(fobjects, mode):
212    for f in reversed(fobjects):
213        if "r" not in mode and not f.closed:
214            f.flush()
215        f.close()
216    fobjects.clear()
219def open_files(
220    urlpath,
221    mode="rb",
222    compression=None,
223    encoding="utf8",
224    errors=None,
225    name_function=None,
226    num=1,
227    protocol=None,
228    newline=None,
229    auto_mkdir=True,
230    expand=True,
231    **kwargs,
233    """Given a path or paths, return a list of ``OpenFile`` objects.
235    For writing, a str path must contain the "*" character, which will be filled
236    in by increasing numbers, e.g., "part*" ->  "part1", "part2" if num=2.
238    For either reading or writing, can instead provide explicit list of paths.
240    Parameters
241    ----------
242    urlpath: string or list
243        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
244        to read from alternative filesystems. To read from multiple files you
245        can pass a globstring or a list of paths, with the caveat that they
246        must all have the same protocol.
247    mode: 'rb', 'wt', etc.
248    compression: string or None
249        If given, open file using compression codec. Can either be a compression
250        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
251        compression from the filename suffix.
252    encoding: str
253        For text mode only
254    errors: None or str
255        Passed to TextIOWrapper in text mode
256    name_function: function or None
257        if opening a set of files for writing, those files do not yet exist,
258        so we need to generate their names by formatting the urlpath for
259        each sequence number
260    num: int [1]
261        if writing mode, number of files we expect to create (passed to
262        name+function)
263    protocol: str or None
264        If given, overrides the protocol found in the URL.
265    newline: bytes or None
266        Used for line terminator in text mode. If None, uses system default;
267        if blank, uses no translation.
268    auto_mkdir: bool (True)
269        If in write mode, this will ensure the target directory exists before
270        writing, by calling ``fs.mkdirs(exist_ok=True)``.
271    expand: bool
272    **kwargs: dict
273        Extra options that make sense to a particular storage connection, e.g.
274        host, port, username, password, etc.
276    Examples
277    --------
278    >>> files = open_files('2015-*-*.csv')  # doctest: +SKIP
279    >>> files = open_files(
280    ...     's3://bucket/2015-*-*.csv.gz', compression='gzip'
281    ... )  # doctest: +SKIP
283    Returns
284    -------
285    An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
286    be used as a single context
287    """
288    fs, fs_token, paths = get_fs_token_paths(
289        urlpath,
290        mode,
291        num=num,
292        name_function=name_function,
293        storage_options=kwargs,
294        protocol=protocol,
295        expand=expand,
296    )
297    if "r" not in mode and auto_mkdir:
298        parents = {fs._parent(path) for path in paths}
299        [fs.makedirs(parent, exist_ok=True) for parent in parents]
300    return OpenFiles(
301        [
302            OpenFile(
303                fs,
304                path,
305                mode=mode,
306                compression=compression,
307                encoding=encoding,
308                errors=errors,
309                newline=newline,
310            )
311            for path in paths
312        ],
313        mode=mode,
314        fs=fs,
315    )
318def _un_chain(path, kwargs):
319    if isinstance(path, (tuple, list)):
320        bits = [_un_chain(p, kwargs) for p in path]
321        out = []
322        for pbit in zip(*bits):
323            paths, protocols, kwargs = zip(*pbit)
324            if len(set(protocols)) > 1:
325                raise ValueError("Protocol mismatch in URL chain")
326            if len(set(paths)) == 1:
327                paths = paths[0]
328            else:
329                paths = list(paths)
330            out.append([paths, protocols[0], kwargs[0]])
331        return out
332    x = re.compile(".*[^a-z]+.*")  # test for non protocol-like single word
333    bits = (
334        [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
335        if "::" in path
336        else [path]
337    )
338    if len(bits) < 2:
339        return []
340    # [[url, protocol, kwargs], ...]
341    out = []
342    previous_bit = None
343    for bit in reversed(bits):
344        protocol = split_protocol(bit)[0] or "file"
345        cls = get_filesystem_class(protocol)
346        extra_kwargs = cls._get_kwargs_from_urls(bit)
347        kws = kwargs.get(protocol, {})
348        kw = dict(**extra_kwargs, **kws)
349        bit = cls._strip_protocol(bit)
350        if (
351            protocol in {"blockcache", "filecache", "simplecache"}
352            and "target_protocol" not in kw
353        ):
354            bit = previous_bit
355        out.append((bit, protocol, kw))
356        previous_bit = bit
357    out = list(reversed(out))
358    return out
361def url_to_fs(url, **kwargs):
362    """
363    Turn fully-qualified and potentially chained URL into filesystem instance
365    Parameters
366    ----------
367    url : str
368        The fsspec-compatible URL
369    **kwargs: dict
370        Extra options that make sense to a particular storage connection, e.g.
371        host, port, username, password, etc.
373    Returns
374    -------
375    filesystem : FileSystem
376        The new filesystem discovered from ``url`` and created with
377        ``**kwargs``.
378    urlpath : str
379        The file-systems-specific URL for ``url``.
380    """
381    chain = _un_chain(url, kwargs)
382    if len(chain) > 1:
383        inkwargs = {}
384        # Reverse iterate the chain, creating a nested target_* structure
385        for i, ch in enumerate(reversed(chain)):
386            urls, protocol, kw = ch
387            if i == len(chain) - 1:
388                inkwargs = dict(**kw, **inkwargs)
389                continue
390            inkwargs["target_options"] = dict(**kw, **inkwargs)
391            inkwargs["target_protocol"] = protocol
392            inkwargs["fo"] = urls
393        urlpath, protocol, _ = chain[0]
394        fs = filesystem(protocol, **inkwargs)
395    else:
396        protocol = split_protocol(url)[0]
397        cls = get_filesystem_class(protocol)
399        options = cls._get_kwargs_from_urls(url)
400        update_storage_options(options, kwargs)
401        fs = cls(**options)
402        urlpath = fs._strip_protocol(url)
403    return fs, urlpath
406def open(
407    urlpath,
408    mode="rb",
409    compression=None,
410    encoding="utf8",
411    errors=None,
412    protocol=None,
413    newline=None,
414    **kwargs,
416    """Given a path or paths, return one ``OpenFile`` object.
418    Parameters
419    ----------
420    urlpath: string or list
421        Absolute or relative filepath. Prefix with a protocol like ``s3://``
422        to read from alternative filesystems. Should not include glob
423        character(s).
424    mode: 'rb', 'wt', etc.
425    compression: string or None
426        If given, open file using compression codec. Can either be a compression
427        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
428        compression from the filename suffix.
429    encoding: str
430        For text mode only
431    errors: None or str
432        Passed to TextIOWrapper in text mode
433    protocol: str or None
434        If given, overrides the protocol found in the URL.
435    newline: bytes or None
436        Used for line terminator in text mode. If None, uses system default;
437        if blank, uses no translation.
438    **kwargs: dict
439        Extra options that make sense to a particular storage connection, e.g.
440        host, port, username, password, etc.
442    Examples
443    --------
444    >>> openfile = open('2015-01-01.csv')  # doctest: +SKIP
445    >>> openfile = open(
446    ...     's3://bucket/2015-01-01.csv.gz', compression='gzip'
447    ... )  # doctest: +SKIP
448    >>> with openfile as f:
449    ...     df = pd.read_csv(f)  # doctest: +SKIP
450    ...
452    Returns
453    -------
454    ``OpenFile`` object.
455    """
456    return open_files(
457        urlpath=[urlpath],
458        mode=mode,
459        compression=compression,
460        encoding=encoding,
461        errors=errors,
462        protocol=protocol,
463        newline=newline,
464        expand=False,
465        **kwargs,
466    )[0]
469def open_local(url, mode="rb", **storage_options):
470    """Open file(s) which can be resolved to local
472    For files which either are local, or get downloaded upon open
473    (e.g., by file caching)
475    Parameters
476    ----------
477    url: str or list(str)
478    mode: str
479        Must be read mode
480    storage_options:
481        passed on to FS for or used by open_files (e.g., compression)
482    """
483    if "r" not in mode:
484        raise ValueError("Can only ensure local files when reading")
485    of = open_files(url, mode=mode, **storage_options)
486    if not getattr(of[0].fs, "local_file", False):
487        raise ValueError(
488            "open_local can only be used on a filesystem which"
489            " has attribute local_file=True"
490        )
491    with of as files:
492        paths = [f.name for f in files]
493    if isinstance(url, str) and not has_magic(url):
494        return paths[0]
495    return paths
498def get_compression(urlpath, compression):
499    if compression == "infer":
500        compression = infer_compression(urlpath)
501    if compression is not None and compression not in compr:
502        raise ValueError("Compression type %s not supported" % compression)
503    return compression
506def split_protocol(urlpath):
507    """Return protocol, path pair"""
508    urlpath = stringify_path(urlpath)
509    if "://" in urlpath:
510        protocol, path = urlpath.split("://", 1)
511        if len(protocol) > 1:
512            # excludes Windows paths
513            return protocol, path
514    return None, urlpath
517def strip_protocol(urlpath):
518    """Return only path part of full URL, according to appropriate backend"""
519    protocol, _ = split_protocol(urlpath)
520    cls = get_filesystem_class(protocol)
521    return cls._strip_protocol(urlpath)
524def expand_paths_if_needed(paths, mode, num, fs, name_function):
525    """Expand paths if they have a ``*`` in them.
527    :param paths: list of paths
528    mode: str
529        Mode in which to open files.
530    num: int
531        If opening in writing mode, number of files we expect to create.
532    fs: filesystem object
533    name_function: callable
534        If opening in writing mode, this callable is used to generate path
535        names. Names are generated for each partition by
536        ``urlpath.replace('*', name_function(partition_index))``.
537    :return: list of paths
538    """
539    expanded_paths = []
540    paths = list(paths)
541    if "w" in mode and sum([1 for p in paths if "*" in p]) > 1:
542        raise ValueError("When writing data, only one filename mask can be specified.")
543    elif "w" in mode:
544        num = max(num, len(paths))
545    for curr_path in paths:
546        if "*" in curr_path:
547            if "w" in mode:
548                # expand using name_function
549                expanded_paths.extend(_expand_paths(curr_path, name_function, num))
550            else:
551                # expand using glob
552                expanded_paths.extend(fs.glob(curr_path))
553        else:
554            expanded_paths.append(curr_path)
555    # if we generated more paths that asked for, trim the list
556    if "w" in mode and len(expanded_paths) > num:
557        expanded_paths = expanded_paths[:num]
558    return expanded_paths
561def get_fs_token_paths(
562    urlpath,
563    mode="rb",
564    num=1,
565    name_function=None,
566    storage_options=None,
567    protocol=None,
568    expand=True,
570    """Filesystem, deterministic token, and paths from a urlpath and options.
572    Parameters
573    ----------
574    urlpath: string or iterable
575        Absolute or relative filepath, URL (may include protocols like
576        ``s3://``), or globstring pointing to data.
577    mode: str, optional
578        Mode in which to open files.
579    num: int, optional
580        If opening in writing mode, number of files we expect to create.
581    name_function: callable, optional
582        If opening in writing mode, this callable is used to generate path
583        names. Names are generated for each partition by
584        ``urlpath.replace('*', name_function(partition_index))``.
585    storage_options: dict, optional
586        Additional keywords to pass to the filesystem class.
587    protocol: str or None
588        To override the protocol specifier in the URL
589    expand: bool
590        Expand string paths for writing, assuming the path is a directory
591    """
592    if isinstance(urlpath, (list, tuple, set)):
593        if not urlpath:
594            raise ValueError("empty urlpath sequence")
595        urlpath = [stringify_path(u) for u in urlpath]
596    else:
597        urlpath = stringify_path(urlpath)
598    chain = _un_chain(urlpath, storage_options or {})
599    if len(chain) > 1:
600        inkwargs = {}
601        # Reverse iterate the chain, creating a nested target_* structure
602        for i, ch in enumerate(reversed(chain)):
603            urls, nested_protocol, kw = ch
604            if i == len(chain) - 1:
605                inkwargs = dict(**kw, **inkwargs)
606                continue
607            inkwargs["target_options"] = dict(**kw, **inkwargs)
608            inkwargs["target_protocol"] = nested_protocol
609            inkwargs["fo"] = urls
610        paths, protocol, _ = chain[0]
611        fs = filesystem(protocol, **inkwargs)
612        if isinstance(paths, (list, tuple, set)):
613            paths = [fs._strip_protocol(u) for u in paths]
614        else:
615            paths = fs._strip_protocol(paths)
616    else:
617        if isinstance(urlpath, (list, tuple, set)):
618            protocols, paths = zip(*map(split_protocol, urlpath))
619            if protocol is None:
620                protocol = protocols[0]
621                if not all(p == protocol for p in protocols):
622                    raise ValueError(
623                        "When specifying a list of paths, all paths must "
624                        "share the same protocol"
625                    )
626            cls = get_filesystem_class(protocol)
627            optionss = list(map(cls._get_kwargs_from_urls, urlpath))
628            paths = [cls._strip_protocol(u) for u in urlpath]
629            options = optionss[0]
630            if not all(o == options for o in optionss):
631                raise ValueError(
632                    "When specifying a list of paths, all paths must "
633                    "share the same file-system options"
634                )
635            update_storage_options(options, storage_options)
636            fs = cls(**options)
637        else:
638            protocols = split_protocol(urlpath)[0]
639            protocol = protocol or protocols
640            cls = get_filesystem_class(protocol)
641            options = cls._get_kwargs_from_urls(urlpath)
642            paths = cls._strip_protocol(urlpath)
643            update_storage_options(options, storage_options)
644            fs = cls(**options)
646    if isinstance(paths, (list, tuple, set)):
647        paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
648    else:
649        if "w" in mode and expand:
650            paths = _expand_paths(paths, name_function, num)
651        elif "*" in paths:
652            paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
653        else:
654            paths = [paths]
656    return fs, fs._fs_token, paths
659def _expand_paths(path, name_function, num):
660    if isinstance(path, str):
661        if path.count("*") > 1:
662            raise ValueError("Output path spec must contain exactly one '*'.")
663        elif "*" not in path:
664            path = os.path.join(path, "*.part")
666        if name_function is None:
667            name_function = build_name_function(num - 1)
669        paths = [path.replace("*", name_function(i)) for i in range(num)]
670        if paths != sorted(paths):
671            logger.warning(
672                "In order to preserve order between partitions"
673                " paths created with ``name_function`` should "
674                "sort to partition order"
675            )
676    elif isinstance(path, (tuple, list)):
677        assert len(path) == num
678        paths = list(path)
679    else:
680        raise ValueError(
681            "Path should be either\n"
682            "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
683            "2. A directory: 'foo/\n"
684            "3. A path with a '*' in it: 'foo.*.json'"
685        )
686    return paths