1from __future__ import absolute_import, division, print_function
2
3import io
4import logging
5import os
6import re
7from glob import has_magic
8
9# for backwards compat, we export cache things from here too
10from .caching import (  # noqa: F401
11    BaseCache,
12    BlockCache,
13    BytesCache,
14    MMapCache,
15    ReadAheadCache,
16    caches,
17)
18from .compression import compr
19from .registry import filesystem, get_filesystem_class
20from .utils import (
21    _unstrip_protocol,
22    build_name_function,
23    infer_compression,
24    stringify_path,
25    update_storage_options,
26)
27
28logger = logging.getLogger("fsspec")
29
30
31class OpenFile(object):
32    """
33    File-like object to be used in a context
34
35    Can layer (buffered) text-mode and compression over any file-system, which
36    are typically binary-only.
37
38    These instances are safe to serialize, as the low-level file object
39    is not created until invoked using `with`.
40
41    Parameters
42    ----------
43    fs: FileSystem
44        The file system to use for opening the file. Should be a subclass or duck-type
45        with ``fsspec.spec.AbstractFileSystem``
46    path: str
47        Location to open
48    mode: str like 'rb', optional
49        Mode of the opened file
50    compression: str or None, optional
51        Compression to apply
52    encoding: str or None, optional
53        The encoding to use if opened in text mode.
54    errors: str or None, optional
55        How to handle encoding errors if opened in text mode.
56    newline: None or str
57        Passed to TextIOWrapper in text mode, how to handle line endings.
58    """
59
60    def __init__(
61        self,
62        fs,
63        path,
64        mode="rb",
65        compression=None,
66        encoding=None,
67        errors=None,
68        newline=None,
69    ):
70        self.fs = fs
71        self.path = path
72        self.mode = mode
73        self.compression = get_compression(path, compression)
74        self.encoding = encoding
75        self.errors = errors
76        self.newline = newline
77        self.fobjects = []
78
79    def __reduce__(self):
80        return (
81            OpenFile,
82            (
83                self.fs,
84                self.path,
85                self.mode,
86                self.compression,
87                self.encoding,
88                self.errors,
89                self.newline,
90            ),
91        )
92
93    def __repr__(self):
94        return "<OpenFile '{}'>".format(self.path)
95
96    def __fspath__(self):
97        # may raise if cannot be resolved to local file
98        return self.open().__fspath__()
99
100    def __enter__(self):
101        mode = self.mode.replace("t", "").replace("b", "") + "b"
102
103        f = self.fs.open(self.path, mode=mode)
104
105        self.fobjects = [f]
106
107        if self.compression is not None:
108            compress = compr[self.compression]
109            f = compress(f, mode=mode[0])
110            self.fobjects.append(f)
111
112        if "b" not in self.mode:
113            # assume, for example, that 'r' is equivalent to 'rt' as in builtin
114            f = io.TextIOWrapper(
115                f, encoding=self.encoding, errors=self.errors, newline=self.newline
116            )
117            self.fobjects.append(f)
118
119        return self.fobjects[-1]
120
121    def __exit__(self, *args):
122        self.close()
123
124    def __del__(self):
125        if hasattr(self, "fobjects"):
126            self.fobjects.clear()  # may cause cleanup of objects and close files
127
128    @property
129    def full_name(self):
130        return _unstrip_protocol(self.path, self.fs)
131
132    def open(self):
133        """Materialise this as a real open file without context
134
135        The file should be explicitly closed to avoid enclosed file
136        instances persisting. This code-path monkey-patches the file-like
137        objects, so they can close even if the parent OpenFile object has already
138        been deleted; but a with-context is better style.
139        """
140        out = self.__enter__()
141        closer = out.close
142        fobjects = self.fobjects.copy()[:-1]
143        mode = self.mode
144
145        def close():
146            # this func has no reference to
147            closer()  # original close bound method of the final file-like
148            _close(fobjects, mode)  # call close on other dependent file-likes
149
150        out.close = close
151        return out
152
153    def close(self):
154        """Close all encapsulated file objects"""
155        _close(self.fobjects, self.mode)
156
157
158class OpenFiles(list):
159    """List of OpenFile instances
160
161    Can be used in a single context, which opens and closes all of the
162    contained files. Normal list access to get the elements works as
163    normal.
164
165    A special case is made for caching filesystems - the files will
166    be down/uploaded together at the start or end of the context, and
167    this may happen concurrently, if the target filesystem supports it.
168    """
169
170    def __init__(self, *args, mode="rb", fs=None):
171        self.mode = mode
172        self.fs = fs
173        self.files = []
174        super().__init__(*args)
175
176    def __enter__(self):
177        if self.fs is None:
178            raise ValueError("Context has already been used")
179
180        fs = self.fs
181        while True:
182            if hasattr(fs, "open_many"):
183                # check for concurrent cache download; or set up for upload
184                self.files = fs.open_many(self)
185                return self.files
186            if hasattr(fs, "fs") and fs.fs is not None:
187                fs = fs.fs
188            else:
189                break
190        return [s.__enter__() for s in self]
191
192    def __exit__(self, *args):
193        fs = self.fs
194        if "r" not in self.mode:
195            while True:
196                if hasattr(fs, "open_many"):
197                    # check for concurrent cache upload
198                    fs.commit_many(self.files)
199                    self.files.clear()
200                    return
201                if hasattr(fs, "fs") and fs.fs is not None:
202                    fs = fs.fs
203                else:
204                    break
205        [s.__exit__(*args) for s in self]
206
207    def __repr__(self):
208        return "<List of %s OpenFile instances>" % len(self)
209
210
211def _close(fobjects, mode):
212    for f in reversed(fobjects):
213        if "r" not in mode and not f.closed:
214            f.flush()
215        f.close()
216    fobjects.clear()
217
218
219def open_files(
220    urlpath,
221    mode="rb",
222    compression=None,
223    encoding="utf8",
224    errors=None,
225    name_function=None,
226    num=1,
227    protocol=None,
228    newline=None,
229    auto_mkdir=True,
230    expand=True,
231    **kwargs,
232):
233    """Given a path or paths, return a list of ``OpenFile`` objects.
234
235    For writing, a str path must contain the "*" character, which will be filled
236    in by increasing numbers, e.g., "part*" ->  "part1", "part2" if num=2.
237
238    For either reading or writing, can instead provide explicit list of paths.
239
240    Parameters
241    ----------
242    urlpath: string or list
243        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
244        to read from alternative filesystems. To read from multiple files you
245        can pass a globstring or a list of paths, with the caveat that they
246        must all have the same protocol.
247    mode: 'rb', 'wt', etc.
248    compression: string or None
249        If given, open file using compression codec. Can either be a compression
250        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
251        compression from the filename suffix.
252    encoding: str
253        For text mode only
254    errors: None or str
255        Passed to TextIOWrapper in text mode
256    name_function: function or None
257        if opening a set of files for writing, those files do not yet exist,
258        so we need to generate their names by formatting the urlpath for
259        each sequence number
260    num: int [1]
261        if writing mode, number of files we expect to create (passed to
262        name+function)
263    protocol: str or None
264        If given, overrides the protocol found in the URL.
265    newline: bytes or None
266        Used for line terminator in text mode. If None, uses system default;
267        if blank, uses no translation.
268    auto_mkdir: bool (True)
269        If in write mode, this will ensure the target directory exists before
270        writing, by calling ``fs.mkdirs(exist_ok=True)``.
271    expand: bool
272    **kwargs: dict
273        Extra options that make sense to a particular storage connection, e.g.
274        host, port, username, password, etc.
275
276    Examples
277    --------
278    >>> files = open_files('2015-*-*.csv')  # doctest: +SKIP
279    >>> files = open_files(
280    ...     's3://bucket/2015-*-*.csv.gz', compression='gzip'
281    ... )  # doctest: +SKIP
282
283    Returns
284    -------
285    An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
286    be used as a single context
287    """
288    fs, fs_token, paths = get_fs_token_paths(
289        urlpath,
290        mode,
291        num=num,
292        name_function=name_function,
293        storage_options=kwargs,
294        protocol=protocol,
295        expand=expand,
296    )
297    if "r" not in mode and auto_mkdir:
298        parents = {fs._parent(path) for path in paths}
299        [fs.makedirs(parent, exist_ok=True) for parent in parents]
300    return OpenFiles(
301        [
302            OpenFile(
303                fs,
304                path,
305                mode=mode,
306                compression=compression,
307                encoding=encoding,
308                errors=errors,
309                newline=newline,
310            )
311            for path in paths
312        ],
313        mode=mode,
314        fs=fs,
315    )
316
317
318def _un_chain(path, kwargs):
319    if isinstance(path, (tuple, list)):
320        bits = [_un_chain(p, kwargs) for p in path]
321        out = []
322        for pbit in zip(*bits):
323            paths, protocols, kwargs = zip(*pbit)
324            if len(set(protocols)) > 1:
325                raise ValueError("Protocol mismatch in URL chain")
326            if len(set(paths)) == 1:
327                paths = paths[0]
328            else:
329                paths = list(paths)
330            out.append([paths, protocols[0], kwargs[0]])
331        return out
332    x = re.compile(".*[^a-z]+.*")  # test for non protocol-like single word
333    bits = (
334        [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
335        if "::" in path
336        else [path]
337    )
338    if len(bits) < 2:
339        return []
340    # [[url, protocol, kwargs], ...]
341    out = []
342    previous_bit = None
343    for bit in reversed(bits):
344        protocol = split_protocol(bit)[0] or "file"
345        cls = get_filesystem_class(protocol)
346        extra_kwargs = cls._get_kwargs_from_urls(bit)
347        kws = kwargs.get(protocol, {})
348        kw = dict(**extra_kwargs, **kws)
349        bit = cls._strip_protocol(bit)
350        if (
351            protocol in {"blockcache", "filecache", "simplecache"}
352            and "target_protocol" not in kw
353        ):
354            bit = previous_bit
355        out.append((bit, protocol, kw))
356        previous_bit = bit
357    out = list(reversed(out))
358    return out
359
360
361def url_to_fs(url, **kwargs):
362    """
363    Turn fully-qualified and potentially chained URL into filesystem instance
364
365    Parameters
366    ----------
367    url : str
368        The fsspec-compatible URL
369    **kwargs: dict
370        Extra options that make sense to a particular storage connection, e.g.
371        host, port, username, password, etc.
372
373    Returns
374    -------
375    filesystem : FileSystem
376        The new filesystem discovered from ``url`` and created with
377        ``**kwargs``.
378    urlpath : str
379        The file-systems-specific URL for ``url``.
380    """
381    chain = _un_chain(url, kwargs)
382    if len(chain) > 1:
383        inkwargs = {}
384        # Reverse iterate the chain, creating a nested target_* structure
385        for i, ch in enumerate(reversed(chain)):
386            urls, protocol, kw = ch
387            if i == len(chain) - 1:
388                inkwargs = dict(**kw, **inkwargs)
389                continue
390            inkwargs["target_options"] = dict(**kw, **inkwargs)
391            inkwargs["target_protocol"] = protocol
392            inkwargs["fo"] = urls
393        urlpath, protocol, _ = chain[0]
394        fs = filesystem(protocol, **inkwargs)
395    else:
396        protocol = split_protocol(url)[0]
397        cls = get_filesystem_class(protocol)
398
399        options = cls._get_kwargs_from_urls(url)
400        update_storage_options(options, kwargs)
401        fs = cls(**options)
402        urlpath = fs._strip_protocol(url)
403    return fs, urlpath
404
405
406def open(
407    urlpath,
408    mode="rb",
409    compression=None,
410    encoding="utf8",
411    errors=None,
412    protocol=None,
413    newline=None,
414    **kwargs,
415):
416    """Given a path or paths, return one ``OpenFile`` object.
417
418    Parameters
419    ----------
420    urlpath: string or list
421        Absolute or relative filepath. Prefix with a protocol like ``s3://``
422        to read from alternative filesystems. Should not include glob
423        character(s).
424    mode: 'rb', 'wt', etc.
425    compression: string or None
426        If given, open file using compression codec. Can either be a compression
427        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
428        compression from the filename suffix.
429    encoding: str
430        For text mode only
431    errors: None or str
432        Passed to TextIOWrapper in text mode
433    protocol: str or None
434        If given, overrides the protocol found in the URL.
435    newline: bytes or None
436        Used for line terminator in text mode. If None, uses system default;
437        if blank, uses no translation.
438    **kwargs: dict
439        Extra options that make sense to a particular storage connection, e.g.
440        host, port, username, password, etc.
441
442    Examples
443    --------
444    >>> openfile = open('2015-01-01.csv')  # doctest: +SKIP
445    >>> openfile = open(
446    ...     's3://bucket/2015-01-01.csv.gz', compression='gzip'
447    ... )  # doctest: +SKIP
448    >>> with openfile as f:
449    ...     df = pd.read_csv(f)  # doctest: +SKIP
450    ...
451
452    Returns
453    -------
454    ``OpenFile`` object.
455    """
456    return open_files(
457        urlpath=[urlpath],
458        mode=mode,
459        compression=compression,
460        encoding=encoding,
461        errors=errors,
462        protocol=protocol,
463        newline=newline,
464        expand=False,
465        **kwargs,
466    )[0]
467
468
469def open_local(url, mode="rb", **storage_options):
470    """Open file(s) which can be resolved to local
471
472    For files which either are local, or get downloaded upon open
473    (e.g., by file caching)
474
475    Parameters
476    ----------
477    url: str or list(str)
478    mode: str
479        Must be read mode
480    storage_options:
481        passed on to FS for or used by open_files (e.g., compression)
482    """
483    if "r" not in mode:
484        raise ValueError("Can only ensure local files when reading")
485    of = open_files(url, mode=mode, **storage_options)
486    if not getattr(of[0].fs, "local_file", False):
487        raise ValueError(
488            "open_local can only be used on a filesystem which"
489            " has attribute local_file=True"
490        )
491    with of as files:
492        paths = [f.name for f in files]
493    if isinstance(url, str) and not has_magic(url):
494        return paths[0]
495    return paths
496
497
498def get_compression(urlpath, compression):
499    if compression == "infer":
500        compression = infer_compression(urlpath)
501    if compression is not None and compression not in compr:
502        raise ValueError("Compression type %s not supported" % compression)
503    return compression
504
505
506def split_protocol(urlpath):
507    """Return protocol, path pair"""
508    urlpath = stringify_path(urlpath)
509    if "://" in urlpath:
510        protocol, path = urlpath.split("://", 1)
511        if len(protocol) > 1:
512            # excludes Windows paths
513            return protocol, path
514    return None, urlpath
515
516
517def strip_protocol(urlpath):
518    """Return only path part of full URL, according to appropriate backend"""
519    protocol, _ = split_protocol(urlpath)
520    cls = get_filesystem_class(protocol)
521    return cls._strip_protocol(urlpath)
522
523
524def expand_paths_if_needed(paths, mode, num, fs, name_function):
525    """Expand paths if they have a ``*`` in them.
526
527    :param paths: list of paths
528    mode: str
529        Mode in which to open files.
530    num: int
531        If opening in writing mode, number of files we expect to create.
532    fs: filesystem object
533    name_function: callable
534        If opening in writing mode, this callable is used to generate path
535        names. Names are generated for each partition by
536        ``urlpath.replace('*', name_function(partition_index))``.
537    :return: list of paths
538    """
539    expanded_paths = []
540    paths = list(paths)
541    if "w" in mode and sum([1 for p in paths if "*" in p]) > 1:
542        raise ValueError("When writing data, only one filename mask can be specified.")
543    elif "w" in mode:
544        num = max(num, len(paths))
545    for curr_path in paths:
546        if "*" in curr_path:
547            if "w" in mode:
548                # expand using name_function
549                expanded_paths.extend(_expand_paths(curr_path, name_function, num))
550            else:
551                # expand using glob
552                expanded_paths.extend(fs.glob(curr_path))
553        else:
554            expanded_paths.append(curr_path)
555    # if we generated more paths that asked for, trim the list
556    if "w" in mode and len(expanded_paths) > num:
557        expanded_paths = expanded_paths[:num]
558    return expanded_paths
559
560
561def get_fs_token_paths(
562    urlpath,
563    mode="rb",
564    num=1,
565    name_function=None,
566    storage_options=None,
567    protocol=None,
568    expand=True,
569):
570    """Filesystem, deterministic token, and paths from a urlpath and options.
571
572    Parameters
573    ----------
574    urlpath: string or iterable
575        Absolute or relative filepath, URL (may include protocols like
576        ``s3://``), or globstring pointing to data.
577    mode: str, optional
578        Mode in which to open files.
579    num: int, optional
580        If opening in writing mode, number of files we expect to create.
581    name_function: callable, optional
582        If opening in writing mode, this callable is used to generate path
583        names. Names are generated for each partition by
584        ``urlpath.replace('*', name_function(partition_index))``.
585    storage_options: dict, optional
586        Additional keywords to pass to the filesystem class.
587    protocol: str or None
588        To override the protocol specifier in the URL
589    expand: bool
590        Expand string paths for writing, assuming the path is a directory
591    """
592    if isinstance(urlpath, (list, tuple, set)):
593        if not urlpath:
594            raise ValueError("empty urlpath sequence")
595        urlpath = [stringify_path(u) for u in urlpath]
596    else:
597        urlpath = stringify_path(urlpath)
598    chain = _un_chain(urlpath, storage_options or {})
599    if len(chain) > 1:
600        inkwargs = {}
601        # Reverse iterate the chain, creating a nested target_* structure
602        for i, ch in enumerate(reversed(chain)):
603            urls, nested_protocol, kw = ch
604            if i == len(chain) - 1:
605                inkwargs = dict(**kw, **inkwargs)
606                continue
607            inkwargs["target_options"] = dict(**kw, **inkwargs)
608            inkwargs["target_protocol"] = nested_protocol
609            inkwargs["fo"] = urls
610        paths, protocol, _ = chain[0]
611        fs = filesystem(protocol, **inkwargs)
612        if isinstance(paths, (list, tuple, set)):
613            paths = [fs._strip_protocol(u) for u in paths]
614        else:
615            paths = fs._strip_protocol(paths)
616    else:
617        if isinstance(urlpath, (list, tuple, set)):
618            protocols, paths = zip(*map(split_protocol, urlpath))
619            if protocol is None:
620                protocol = protocols[0]
621                if not all(p == protocol for p in protocols):
622                    raise ValueError(
623                        "When specifying a list of paths, all paths must "
624                        "share the same protocol"
625                    )
626            cls = get_filesystem_class(protocol)
627            optionss = list(map(cls._get_kwargs_from_urls, urlpath))
628            paths = [cls._strip_protocol(u) for u in urlpath]
629            options = optionss[0]
630            if not all(o == options for o in optionss):
631                raise ValueError(
632                    "When specifying a list of paths, all paths must "
633                    "share the same file-system options"
634                )
635            update_storage_options(options, storage_options)
636            fs = cls(**options)
637        else:
638            protocols = split_protocol(urlpath)[0]
639            protocol = protocol or protocols
640            cls = get_filesystem_class(protocol)
641            options = cls._get_kwargs_from_urls(urlpath)
642            paths = cls._strip_protocol(urlpath)
643            update_storage_options(options, storage_options)
644            fs = cls(**options)
645
646    if isinstance(paths, (list, tuple, set)):
647        paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
648    else:
649        if "w" in mode and expand:
650            paths = _expand_paths(paths, name_function, num)
651        elif "*" in paths:
652            paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
653        else:
654            paths = [paths]
655
656    return fs, fs._fs_token, paths
657
658
659def _expand_paths(path, name_function, num):
660    if isinstance(path, str):
661        if path.count("*") > 1:
662            raise ValueError("Output path spec must contain exactly one '*'.")
663        elif "*" not in path:
664            path = os.path.join(path, "*.part")
665
666        if name_function is None:
667            name_function = build_name_function(num - 1)
668
669        paths = [path.replace("*", name_function(i)) for i in range(num)]
670        if paths != sorted(paths):
671            logger.warning(
672                "In order to preserve order between partitions"
673                " paths created with ``name_function`` should "
674                "sort to partition order"
675            )
676    elif isinstance(path, (tuple, list)):
677        assert len(path) == num
678        paths = list(path)
679    else:
680        raise ValueError(
681            "Path should be either\n"
682            "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
683            "2. A directory: 'foo/\n"
684            "3. A path with a '*' in it: 'foo.*.json'"
685        )
686    return paths
687