1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18# cython: language_level = 3
19
20from cpython.datetime cimport datetime, PyDateTime_DateTime
21
22from pyarrow.compat import frombytes, tobytes
23from pyarrow.includes.common cimport *
24from pyarrow.includes.libarrow cimport PyDateTime_to_TimePoint
25from pyarrow.lib import _detect_compression
26from pyarrow.lib cimport *
27from pyarrow.util import _stringify_path
28
29from datetime import timezone
30import pathlib
31
32
33cdef inline c_string _path_as_bytes(path) except *:
34    # handle only abstract paths, not bound to any filesystem like pathlib is,
35    # so we only accept plain strings
36    if not isinstance(path, (bytes, str)):
37        raise TypeError('Path must be a string')
38    # tobytes always uses utf-8, which is more or less ok, at least on Windows
39    # since the C++ side then decodes from utf-8. On Unix, os.fsencode may be
40    # better.
41    return tobytes(path)
42
43
44def _normalize_path(FileSystem filesystem, path):
45    """
46    Normalize path for the given filesystem.
47
48    The default implementation of this method is a no-op, but subclasses
49    may allow normalizing irregular path forms (such as Windows local paths).
50    """
51    cdef c_string c_path = _path_as_bytes(path)
52    cdef c_string c_path_normalized
53
54    c_path_normalized = GetResultValue(filesystem.fs.NormalizePath(c_path))
55    return frombytes(c_path_normalized)
56
57
58cdef class FileInfo:
59    """
60    FileSystem entry info.
61    """
62
63    def __init__(self):
64        raise TypeError("FileInfo cannot be instantiated directly, use "
65                        "FileSystem.get_file_info method instead.")
66
67    @staticmethod
68    cdef wrap(CFileInfo info):
69        cdef FileInfo self = FileInfo.__new__(FileInfo)
70        self.info = info
71        return self
72
73    cdef inline CFileInfo unwrap(self) nogil:
74        return self.info
75
76    def __repr__(self):
77        def getvalue(attr):
78            try:
79                return getattr(self, attr)
80            except ValueError:
81                return ''
82
83        s = '<FileInfo for {!r}: type={}'.format(self.path, str(self.type))
84        if self.is_file:
85            s += ', size={}'.format(self.size)
86        s += '>'
87        return s
88
89    @property
90    def type(self):
91        """
92        Type of the file.
93
94        The returned enum values can be the following:
95
96        - FileType.NotFound: target does not exist
97        - FileType.Unknown: target exists but its type is unknown (could be a
98          special file such as a Unix socket or character device, or
99          Windows NUL / CON / ...)
100        - FileType.File: target is a regular file
101        - FileType.Directory: target is a regular directory
102
103        Returns
104        -------
105        type : FileType
106        """
107        return FileType(<int8_t> self.info.type())
108
109    @property
110    def is_file(self):
111        """
112        """
113        return self.type == FileType.File
114
115    @property
116    def path(self):
117        """
118        The full file path in the filesystem.
119        """
120        return frombytes(self.info.path())
121
122    @property
123    def base_name(self):
124        """
125        The file base name.
126
127        Component after the last directory separator.
128        """
129        return frombytes(self.info.base_name())
130
131    @property
132    def size(self):
133        """
134        The size in bytes, if available.
135
136        Only regular files are guaranteed to have a size.
137        """
138        return self.info.size() if self.is_file else None
139
140    @property
141    def extension(self):
142        """
143        The file extension.
144        """
145        return frombytes(self.info.extension())
146
147    @property
148    def mtime(self):
149        """
150        The time of last modification, if available.
151
152        Returns
153        -------
154        mtime : datetime.datetime
155        """
156        cdef int64_t nanoseconds
157        nanoseconds = TimePoint_to_ns(self.info.mtime())
158        return datetime.fromtimestamp(nanoseconds / 1.0e9, timezone.utc)
159
160    @property
161    def mtime_ns(self):
162        """
163        The time of last modification, if available, expressed in nanoseconds
164        since the Unix epoch.
165
166        Returns
167        -------
168        mtime_ns : int
169        """
170        return TimePoint_to_ns(self.info.mtime())
171
172
173cdef class FileSelector:
174    """
175    File and directory selector.
176
177    It contains a set of options that describes how to search for files and
178    directories.
179
180    Parameters
181    ----------
182    base_dir : str
183        The directory in which to select files. Relative paths also work, use
184        '.' for the current directory and '..' for the parent.
185    allow_not_found : bool, default False
186        The behavior if `base_dir` doesn't exist in the filesystem.
187        If false, an error is returned.
188        If true, an empty selection is returned.
189    recursive : bool, default False
190        Whether to recurse into subdirectories.
191    """
192
193    def __init__(self, base_dir, bint allow_not_found=False,
194                 bint recursive=False):
195        self.base_dir = base_dir
196        self.recursive = recursive
197        self.allow_not_found = allow_not_found
198
199    cdef inline CFileSelector unwrap(self) nogil:
200        return self.selector
201
202    @property
203    def base_dir(self):
204        return frombytes(self.selector.base_dir)
205
206    @base_dir.setter
207    def base_dir(self, base_dir):
208        self.selector.base_dir = _path_as_bytes(base_dir)
209
210    @property
211    def allow_not_found(self):
212        return self.selector.allow_not_found
213
214    @allow_not_found.setter
215    def allow_not_found(self, bint allow_not_found):
216        self.selector.allow_not_found = allow_not_found
217
218    @property
219    def recursive(self):
220        return self.selector.recursive
221
222    @recursive.setter
223    def recursive(self, bint recursive):
224        self.selector.recursive = recursive
225
226    def __repr__(self):
227        return ("<FileSelector base_dir={0.base_dir!r} "
228                "recursive={0.recursive}>".format(self))
229
230
231cdef class FileSystem:
232    """
233    Abstract file system API.
234    """
235
236    def __init__(self):
237        raise TypeError("FileSystem is an abstract class, instantiate one of "
238                        "the subclasses instead: LocalFileSystem or "
239                        "SubTreeFileSystem")
240
241    @staticmethod
242    def from_uri(uri):
243        """
244        Create a new FileSystem from URI or Path.
245
246        Recognized URI schemes are "file", "mock", "s3fs", "hdfs" and "viewfs".
247        In addition, the argument can be a pathlib.Path object, or a string
248        describing an absolute local path.
249
250        Parameters
251        ----------
252        uri : string
253            URI-based path, for example: file:///some/local/path.
254
255        Returns
256        -------
257        With (filesystem, path) tuple where path is the abstract path inside
258        the FileSystem instance.
259        """
260        cdef:
261            c_string path
262            CResult[shared_ptr[CFileSystem]] result
263
264        if isinstance(uri, pathlib.Path):
265            # Make absolute
266            uri = uri.resolve().absolute()
267        uri = _stringify_path(uri)
268        result = CFileSystemFromUriOrPath(tobytes(uri), &path)
269        return FileSystem.wrap(GetResultValue(result)), frombytes(path)
270
271    cdef init(self, const shared_ptr[CFileSystem]& wrapped):
272        self.wrapped = wrapped
273        self.fs = wrapped.get()
274
275    @staticmethod
276    cdef wrap(const shared_ptr[CFileSystem]& sp):
277        cdef FileSystem self
278
279        typ = frombytes(sp.get().type_name())
280        if typ == 'local':
281            self = LocalFileSystem.__new__(LocalFileSystem)
282        elif typ == 'mock':
283            self = _MockFileSystem.__new__(_MockFileSystem)
284        elif typ == 'subtree':
285            self = SubTreeFileSystem.__new__(SubTreeFileSystem)
286        elif typ == 's3':
287            from pyarrow._s3fs import S3FileSystem
288            self = S3FileSystem.__new__(S3FileSystem)
289        elif typ == 'hdfs':
290            from pyarrow._hdfs import HadoopFileSystem
291            self = HadoopFileSystem.__new__(HadoopFileSystem)
292        else:
293            raise TypeError('Cannot wrap FileSystem pointer')
294
295        self.init(sp)
296        return self
297
298    cdef inline shared_ptr[CFileSystem] unwrap(self) nogil:
299        return self.wrapped
300
301    def equals(self, FileSystem other):
302        return self.fs.Equals(other.unwrap())
303
304    def __eq__(self, other):
305        try:
306            return self.equals(other)
307        except TypeError:
308            return NotImplemented
309
310    def get_file_info(self, paths_or_selector):
311        """
312        Get info for the given files.
313
314        Any symlink is automatically dereferenced, recursively. A non-existing
315        or unreachable file returns a FileStat object and has a FileType of
316        value NotFound. An exception indicates a truly exceptional condition
317        (low-level I/O error, etc.).
318
319        Parameters
320        ----------
321        paths_or_selector: FileSelector or list of path-likes
322            Either a selector object or a list of path-like objects.
323            The selector's base directory will not be part of the results, even
324            if it exists. If it doesn't exist, use `allow_not_found`.
325
326        Returns
327        -------
328        file_infos : list of FileInfo
329        """
330        cdef:
331            vector[CFileInfo] infos
332            vector[c_string] paths
333            CFileSelector selector
334
335        if isinstance(paths_or_selector, FileSelector):
336            with nogil:
337                selector = (<FileSelector>paths_or_selector).selector
338                infos = GetResultValue(self.fs.GetFileInfo(selector))
339        elif isinstance(paths_or_selector, (list, tuple)):
340            paths = [_path_as_bytes(s) for s in paths_or_selector]
341            with nogil:
342                infos = GetResultValue(self.fs.GetFileInfo(paths))
343        else:
344            raise TypeError('Must pass either paths or a FileSelector')
345
346        return [FileInfo.wrap(info) for info in infos]
347
348    def create_dir(self, path, *, bint recursive=True):
349        """
350        Create a directory and subdirectories.
351
352        This function succeeds if the directory already exists.
353
354        Parameters
355        ----------
356        path : str
357            The path of the new directory.
358        recursive: bool, default True
359            Create nested directories as well.
360        """
361        cdef c_string directory = _path_as_bytes(path)
362        with nogil:
363            check_status(self.fs.CreateDir(directory, recursive=recursive))
364
365    def delete_dir(self, path):
366        """Delete a directory and its contents, recursively.
367
368        Parameters
369        ----------
370        path : str
371            The path of the directory to be deleted.
372        """
373        cdef c_string directory = _path_as_bytes(path)
374        with nogil:
375            check_status(self.fs.DeleteDir(directory))
376
377    def move(self, src, dest):
378        """
379        Move / rename a file or directory.
380
381        If the destination exists:
382        - if it is a non-empty directory, an error is returned
383        - otherwise, if it has the same type as the source, it is replaced
384        - otherwise, behavior is unspecified (implementation-dependent).
385
386        Parameters
387        ----------
388        src : str
389            The path of the file or the directory to be moved.
390        dest : str
391            The destination path where the file or directory is moved to.
392        """
393        cdef:
394            c_string source = _path_as_bytes(src)
395            c_string destination = _path_as_bytes(dest)
396        with nogil:
397            check_status(self.fs.Move(source, destination))
398
399    def copy_file(self, src, dest):
400        """
401        Copy a file.
402
403        If the destination exists and is a directory, an error is returned.
404        Otherwise, it is replaced.
405
406        Parameters
407        ----------
408        src : str
409            The path of the file to be copied from.
410        dest : str
411            The destination path where the file is copied to.
412        """
413        cdef:
414            c_string source = _path_as_bytes(src)
415            c_string destination = _path_as_bytes(dest)
416        with nogil:
417            check_status(self.fs.CopyFile(source, destination))
418
419    def delete_file(self, path):
420        """
421        Delete a file.
422
423        Parameters
424        ----------
425        path : str
426            The path of the file to be deleted.
427        """
428        cdef c_string file = _path_as_bytes(path)
429        with nogil:
430            check_status(self.fs.DeleteFile(file))
431
432    def _wrap_input_stream(self, stream, path, compression, buffer_size):
433        if buffer_size is not None and buffer_size != 0:
434            stream = BufferedInputStream(stream, buffer_size)
435        if compression == 'detect':
436            compression = _detect_compression(path)
437        if compression is not None:
438            stream = CompressedInputStream(stream, compression)
439        return stream
440
441    def _wrap_output_stream(self, stream, path, compression, buffer_size):
442        if buffer_size is not None and buffer_size != 0:
443            stream = BufferedOutputStream(stream, buffer_size)
444        if compression == 'detect':
445            compression = _detect_compression(path)
446        if compression is not None:
447            stream = CompressedOutputStream(stream, compression)
448        return stream
449
450    def open_input_file(self, path):
451        """
452        Open an input file for random access reading.
453
454        Parameters
455        ----------
456        path : str
457            The source to open for reading.
458
459        Returns
460        -------
461        stram : NativeFile
462        """
463        cdef:
464            c_string pathstr = _path_as_bytes(path)
465            NativeFile stream = NativeFile()
466            shared_ptr[CRandomAccessFile] in_handle
467
468        with nogil:
469            in_handle = GetResultValue(self.fs.OpenInputFile(pathstr))
470
471        stream.set_random_access_file(in_handle)
472        stream.is_readable = True
473        return stream
474
475    def open_input_stream(self, path, compression='detect', buffer_size=None):
476        """
477        Open an input stream for sequential reading.
478
479        Parameters
480        ----------
481        source: str
482            The source to open for reading.
483        compression: str optional, default 'detect'
484            The compression algorithm to use for on-the-fly decompression.
485            If "detect" and source is a file path, then compression will be
486            chosen based on the file extension.
487            If None, no compression will be applied. Otherwise, a well-known
488            algorithm name must be supplied (e.g. "gzip").
489        buffer_size: int optional, default None
490            If None or 0, no buffering will happen. Otherwise the size of the
491            temporary read buffer.
492
493        Returns
494        -------
495        stream : NativeFile
496        """
497        cdef:
498            c_string pathstr = _path_as_bytes(path)
499            NativeFile stream = NativeFile()
500            shared_ptr[CInputStream] in_handle
501
502        with nogil:
503            in_handle = GetResultValue(self.fs.OpenInputStream(pathstr))
504
505        stream.set_input_stream(in_handle)
506        stream.is_readable = True
507
508        return self._wrap_input_stream(
509            stream, path=path, compression=compression, buffer_size=buffer_size
510        )
511
512    def open_output_stream(self, path, compression='detect', buffer_size=None):
513        """
514        Open an output stream for sequential writing.
515
516        If the target already exists, existing data is truncated.
517
518        Parameters
519        ----------
520        path : str
521            The source to open for writing.
522        compression: str optional, default 'detect'
523            The compression algorithm to use for on-the-fly compression.
524            If "detect" and source is a file path, then compression will be
525            chosen based on the file extension.
526            If None, no compression will be applied. Otherwise, a well-known
527            algorithm name must be supplied (e.g. "gzip").
528        buffer_size: int optional, default None
529            If None or 0, no buffering will happen. Otherwise the size of the
530            temporary write buffer.
531
532        Returns
533        -------
534        stream : NativeFile
535        """
536        cdef:
537            c_string pathstr = _path_as_bytes(path)
538            NativeFile stream = NativeFile()
539            shared_ptr[COutputStream] out_handle
540
541        with nogil:
542            out_handle = GetResultValue(self.fs.OpenOutputStream(pathstr))
543
544        stream.set_output_stream(out_handle)
545        stream.is_writable = True
546
547        return self._wrap_output_stream(
548            stream, path=path, compression=compression, buffer_size=buffer_size
549        )
550
551    def open_append_stream(self, path, compression='detect', buffer_size=None):
552        """
553        Open an output stream for appending.
554
555        If the target doesn't exist, a new empty file is created.
556
557        Parameters
558        ----------
559        path : str
560            The source to open for writing.
561        compression: str optional, default 'detect'
562            The compression algorithm to use for on-the-fly compression.
563            If "detect" and source is a file path, then compression will be
564            chosen based on the file extension.
565            If None, no compression will be applied. Otherwise, a well-known
566            algorithm name must be supplied (e.g. "gzip").
567        buffer_size: int optional, default None
568            If None or 0, no buffering will happen. Otherwise the size of the
569            temporary write buffer.
570
571        Returns
572        -------
573        stream : NativeFile
574        """
575        cdef:
576            c_string pathstr = _path_as_bytes(path)
577            NativeFile stream = NativeFile()
578            shared_ptr[COutputStream] out_handle
579
580        with nogil:
581            out_handle = GetResultValue(self.fs.OpenAppendStream(pathstr))
582
583        stream.set_output_stream(out_handle)
584        stream.is_writable = True
585
586        return self._wrap_output_stream(
587            stream, path=path, compression=compression, buffer_size=buffer_size
588        )
589
590
591cdef class LocalFileSystem(FileSystem):
592    """
593    A FileSystem implementation accessing files on the local machine.
594
595    Details such as symlinks are abstracted away (symlinks are always followed,
596    except when deleting an entry).
597
598    Parameters
599    ----------
600    use_mmap: bool, default False
601        Whether open_input_stream and open_input_file should return
602        a mmap'ed file or a regular file.
603    """
604
605    def __init__(self, use_mmap=False):
606        cdef:
607            CLocalFileSystemOptions opts
608            shared_ptr[CLocalFileSystem] fs
609
610        opts = CLocalFileSystemOptions.Defaults()
611        opts.use_mmap = use_mmap
612
613        fs = make_shared[CLocalFileSystem](opts)
614        self.init(<shared_ptr[CFileSystem]> fs)
615
616    cdef init(self, const shared_ptr[CFileSystem]& c_fs):
617        FileSystem.init(self, c_fs)
618        self.localfs = <CLocalFileSystem*> c_fs.get()
619
620    def __reduce__(self):
621        cdef CLocalFileSystemOptions opts = self.localfs.options()
622        return LocalFileSystem, (opts.use_mmap,)
623
624
625cdef class SubTreeFileSystem(FileSystem):
626    """
627    Delegates to another implementation after prepending a fixed base path.
628
629    This is useful to expose a logical view of a subtree of a filesystem,
630    for example a directory in a LocalFileSystem.
631
632    Note, that this makes no security guarantee. For example, symlinks may
633    allow to "escape" the subtree and access other parts of the underlying
634    filesystem.
635
636    Parameters
637    ----------
638    base_path: str
639        The root of the subtree.
640    base_fs: FileSystem
641        FileSystem object the operations delegated to.
642    """
643
644    def __init__(self, base_path, FileSystem base_fs):
645        cdef:
646            c_string pathstr
647            shared_ptr[CSubTreeFileSystem] wrapped
648
649        pathstr = _path_as_bytes(base_path)
650        wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped)
651
652        self.init(<shared_ptr[CFileSystem]> wrapped)
653
654    cdef init(self, const shared_ptr[CFileSystem]& wrapped):
655        FileSystem.init(self, wrapped)
656        self.subtreefs = <CSubTreeFileSystem*> wrapped.get()
657
658    def __reduce__(self):
659        return SubTreeFileSystem, (
660            frombytes(self.subtreefs.base_path()),
661            FileSystem.wrap(self.subtreefs.base_fs())
662        )
663
664    @property
665    def base_path(self):
666        return frombytes(self.subtreefs.base_path())
667
668    @property
669    def base_fs(self):
670        return FileSystem.wrap(self.subtreefs.base_fs())
671
672
673cdef class _MockFileSystem(FileSystem):
674
675    def __init__(self, datetime current_time=None):
676        cdef shared_ptr[CMockFileSystem] wrapped
677
678        current_time = current_time or datetime.now()
679        wrapped = make_shared[CMockFileSystem](
680            PyDateTime_to_TimePoint(<PyDateTime_DateTime*> current_time)
681        )
682
683        self.init(<shared_ptr[CFileSystem]> wrapped)
684
685    cdef init(self, const shared_ptr[CFileSystem]& wrapped):
686        FileSystem.init(self, wrapped)
687        self.mockfs = <CMockFileSystem*> wrapped.get()
688