1""" parquet compat """
2
3from distutils.version import LooseVersion
4import io
5import os
6from typing import Any, AnyStr, Dict, List, Optional, Tuple
7from warnings import catch_warnings
8
9from pandas._typing import FilePathOrBuffer, StorageOptions
10from pandas.compat._optional import import_optional_dependency
11from pandas.errors import AbstractMethodError
12from pandas.util._decorators import doc
13
14from pandas import DataFrame, MultiIndex, get_option
15from pandas.core import generic
16
17from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path
18
19
20def get_engine(engine: str) -> "BaseImpl":
21    """ return our implementation """
22    if engine == "auto":
23        engine = get_option("io.parquet.engine")
24
25    if engine == "auto":
26        # try engines in this order
27        engine_classes = [PyArrowImpl, FastParquetImpl]
28
29        error_msgs = ""
30        for engine_class in engine_classes:
31            try:
32                return engine_class()
33            except ImportError as err:
34                error_msgs += "\n - " + str(err)
35
36        raise ImportError(
37            "Unable to find a usable engine; "
38            "tried using: 'pyarrow', 'fastparquet'.\n"
39            "A suitable version of "
40            "pyarrow or fastparquet is required for parquet "
41            "support.\n"
42            "Trying to import the above resulted in these errors:"
43            f"{error_msgs}"
44        )
45
46    if engine == "pyarrow":
47        return PyArrowImpl()
48    elif engine == "fastparquet":
49        return FastParquetImpl()
50
51    raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
52
53
54def _get_path_or_handle(
55    path: FilePathOrBuffer,
56    fs: Any,
57    storage_options: StorageOptions = None,
58    mode: str = "rb",
59    is_dir: bool = False,
60) -> Tuple[FilePathOrBuffer, Optional[IOHandles], Any]:
61    """File handling for PyArrow."""
62    path_or_handle = stringify_path(path)
63    if is_fsspec_url(path_or_handle) and fs is None:
64        fsspec = import_optional_dependency("fsspec")
65
66        fs, path_or_handle = fsspec.core.url_to_fs(
67            path_or_handle, **(storage_options or {})
68        )
69    elif storage_options:
70        raise ValueError("storage_options passed with buffer or non-fsspec filepath")
71
72    handles = None
73    if (
74        not fs
75        and not is_dir
76        and isinstance(path_or_handle, str)
77        and not os.path.isdir(path_or_handle)
78    ):
79        # use get_handle only when we are very certain that it is not a directory
80        # fsspec resources can also point to directories
81        # this branch is used for example when reading from non-fsspec URLs
82        handles = get_handle(path_or_handle, mode, is_text=False)
83        fs = None
84        path_or_handle = handles.handle
85    return path_or_handle, handles, fs
86
87
88class BaseImpl:
89    @staticmethod
90    def validate_dataframe(df: DataFrame):
91
92        if not isinstance(df, DataFrame):
93            raise ValueError("to_parquet only supports IO with DataFrames")
94
95        # must have value column names for all index levels (strings only)
96        if isinstance(df.columns, MultiIndex):
97            if not all(
98                x.inferred_type in {"string", "empty"} for x in df.columns.levels
99            ):
100                raise ValueError(
101                    """
102                    parquet must have string column names for all values in
103                     each level of the MultiIndex
104                    """
105                )
106        else:
107            if df.columns.inferred_type not in {"string", "empty"}:
108                raise ValueError("parquet must have string column names")
109
110        # index level names must be strings
111        valid_names = all(
112            isinstance(name, str) for name in df.index.names if name is not None
113        )
114        if not valid_names:
115            raise ValueError("Index level names must be strings")
116
117    def write(self, df: DataFrame, path, compression, **kwargs):
118        raise AbstractMethodError(self)
119
120    def read(self, path, columns=None, **kwargs):
121        raise AbstractMethodError(self)
122
123
124class PyArrowImpl(BaseImpl):
125    def __init__(self):
126        import_optional_dependency(
127            "pyarrow", extra="pyarrow is required for parquet support."
128        )
129        import pyarrow.parquet
130
131        # import utils to register the pyarrow extension types
132        import pandas.core.arrays._arrow_utils  # noqa
133
134        self.api = pyarrow
135
136    def write(
137        self,
138        df: DataFrame,
139        path: FilePathOrBuffer[AnyStr],
140        compression: Optional[str] = "snappy",
141        index: Optional[bool] = None,
142        storage_options: StorageOptions = None,
143        partition_cols: Optional[List[str]] = None,
144        **kwargs,
145    ):
146        self.validate_dataframe(df)
147
148        from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
149        if index is not None:
150            from_pandas_kwargs["preserve_index"] = index
151
152        table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
153
154        path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
155            path,
156            kwargs.pop("filesystem", None),
157            storage_options=storage_options,
158            mode="wb",
159            is_dir=partition_cols is not None,
160        )
161        try:
162            if partition_cols is not None:
163                # writes to multiple files under the given path
164                self.api.parquet.write_to_dataset(
165                    table,
166                    path_or_handle,
167                    compression=compression,
168                    partition_cols=partition_cols,
169                    **kwargs,
170                )
171            else:
172                # write to single output file
173                self.api.parquet.write_table(
174                    table, path_or_handle, compression=compression, **kwargs
175                )
176        finally:
177            if handles is not None:
178                handles.close()
179
180    def read(
181        self,
182        path,
183        columns=None,
184        use_nullable_dtypes=False,
185        storage_options: StorageOptions = None,
186        **kwargs,
187    ):
188        kwargs["use_pandas_metadata"] = True
189
190        to_pandas_kwargs = {}
191        if use_nullable_dtypes:
192            if LooseVersion(self.api.__version__) >= "0.16":
193                import pandas as pd
194
195                mapping = {
196                    self.api.int8(): pd.Int8Dtype(),
197                    self.api.int16(): pd.Int16Dtype(),
198                    self.api.int32(): pd.Int32Dtype(),
199                    self.api.int64(): pd.Int64Dtype(),
200                    self.api.uint8(): pd.UInt8Dtype(),
201                    self.api.uint16(): pd.UInt16Dtype(),
202                    self.api.uint32(): pd.UInt32Dtype(),
203                    self.api.uint64(): pd.UInt64Dtype(),
204                    self.api.bool_(): pd.BooleanDtype(),
205                    self.api.string(): pd.StringDtype(),
206                }
207                to_pandas_kwargs["types_mapper"] = mapping.get
208            else:
209                raise ValueError(
210                    "'use_nullable_dtypes=True' is only supported for pyarrow >= 0.16 "
211                    f"({self.api.__version__} is installed"
212                )
213
214        path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
215            path,
216            kwargs.pop("filesystem", None),
217            storage_options=storage_options,
218            mode="rb",
219        )
220        try:
221            return self.api.parquet.read_table(
222                path_or_handle, columns=columns, **kwargs
223            ).to_pandas(**to_pandas_kwargs)
224        finally:
225            if handles is not None:
226                handles.close()
227
228
229class FastParquetImpl(BaseImpl):
230    def __init__(self):
231        # since pandas is a dependency of fastparquet
232        # we need to import on first use
233        fastparquet = import_optional_dependency(
234            "fastparquet", extra="fastparquet is required for parquet support."
235        )
236        self.api = fastparquet
237
238    def write(
239        self,
240        df: DataFrame,
241        path,
242        compression="snappy",
243        index=None,
244        partition_cols=None,
245        storage_options: StorageOptions = None,
246        **kwargs,
247    ):
248        self.validate_dataframe(df)
249        # thriftpy/protocol/compact.py:339:
250        # DeprecationWarning: tostring() is deprecated.
251        # Use tobytes() instead.
252
253        if "partition_on" in kwargs and partition_cols is not None:
254            raise ValueError(
255                "Cannot use both partition_on and "
256                "partition_cols. Use partition_cols for partitioning data"
257            )
258        elif "partition_on" in kwargs:
259            partition_cols = kwargs.pop("partition_on")
260
261        if partition_cols is not None:
262            kwargs["file_scheme"] = "hive"
263
264        # cannot use get_handle as write() does not accept file buffers
265        path = stringify_path(path)
266        if is_fsspec_url(path):
267            fsspec = import_optional_dependency("fsspec")
268
269            # if filesystem is provided by fsspec, file must be opened in 'wb' mode.
270            kwargs["open_with"] = lambda path, _: fsspec.open(
271                path, "wb", **(storage_options or {})
272            ).open()
273        elif storage_options:
274            raise ValueError(
275                "storage_options passed with file object or non-fsspec file path"
276            )
277
278        with catch_warnings(record=True):
279            self.api.write(
280                path,
281                df,
282                compression=compression,
283                write_index=index,
284                partition_on=partition_cols,
285                **kwargs,
286            )
287
288    def read(
289        self, path, columns=None, storage_options: StorageOptions = None, **kwargs
290    ):
291        use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
292        if use_nullable_dtypes:
293            raise ValueError(
294                "The 'use_nullable_dtypes' argument is not supported for the "
295                "fastparquet engine"
296            )
297        path = stringify_path(path)
298        parquet_kwargs = {}
299        handles = None
300        if is_fsspec_url(path):
301            fsspec = import_optional_dependency("fsspec")
302
303            parquet_kwargs["open_with"] = lambda path, _: fsspec.open(
304                path, "rb", **(storage_options or {})
305            ).open()
306        elif isinstance(path, str) and not os.path.isdir(path):
307            # use get_handle only when we are very certain that it is not a directory
308            # fsspec resources can also point to directories
309            # this branch is used for example when reading from non-fsspec URLs
310            handles = get_handle(path, "rb", is_text=False)
311            path = handles.handle
312        parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
313
314        result = parquet_file.to_pandas(columns=columns, **kwargs)
315
316        if handles is not None:
317            handles.close()
318        return result
319
320
321@doc(storage_options=generic._shared_docs["storage_options"])
322def to_parquet(
323    df: DataFrame,
324    path: Optional[FilePathOrBuffer] = None,
325    engine: str = "auto",
326    compression: Optional[str] = "snappy",
327    index: Optional[bool] = None,
328    storage_options: StorageOptions = None,
329    partition_cols: Optional[List[str]] = None,
330    **kwargs,
331) -> Optional[bytes]:
332    """
333    Write a DataFrame to the parquet format.
334
335    Parameters
336    ----------
337    df : DataFrame
338    path : str or file-like object, default None
339        If a string, it will be used as Root Directory path
340        when writing a partitioned dataset. By file-like object,
341        we refer to objects with a write() method, such as a file handle
342        (e.g. via builtin open function) or io.BytesIO. The engine
343        fastparquet does not accept file-like objects. If path is None,
344        a bytes object is returned.
345
346        .. versionchanged:: 1.2.0
347
348    engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
349        Parquet library to use. If 'auto', then the option
350        ``io.parquet.engine`` is used. The default ``io.parquet.engine``
351        behavior is to try 'pyarrow', falling back to 'fastparquet' if
352        'pyarrow' is unavailable.
353    compression : {{'snappy', 'gzip', 'brotli', None}}, default 'snappy'
354        Name of the compression to use. Use ``None`` for no compression.
355    index : bool, default None
356        If ``True``, include the dataframe's index(es) in the file output. If
357        ``False``, they will not be written to the file.
358        If ``None``, similar to ``True`` the dataframe's index(es)
359        will be saved. However, instead of being saved as values,
360        the RangeIndex will be stored as a range in the metadata so it
361        doesn't require much space and is faster. Other indexes will
362        be included as columns in the file output.
363
364        .. versionadded:: 0.24.0
365
366    partition_cols : str or list, optional, default None
367        Column names by which to partition the dataset.
368        Columns are partitioned in the order they are given.
369        Must be None if path is not a string.
370
371        .. versionadded:: 0.24.0
372
373    {storage_options}
374
375        .. versionadded:: 1.2.0
376
377    kwargs
378        Additional keyword arguments passed to the engine
379
380    Returns
381    -------
382    bytes if no path argument is provided else None
383    """
384    if isinstance(partition_cols, str):
385        partition_cols = [partition_cols]
386    impl = get_engine(engine)
387
388    path_or_buf: FilePathOrBuffer = io.BytesIO() if path is None else path
389
390    impl.write(
391        df,
392        path_or_buf,
393        compression=compression,
394        index=index,
395        partition_cols=partition_cols,
396        storage_options=storage_options,
397        **kwargs,
398    )
399
400    if path is None:
401        assert isinstance(path_or_buf, io.BytesIO)
402        return path_or_buf.getvalue()
403    else:
404        return None
405
406
407def read_parquet(
408    path,
409    engine: str = "auto",
410    columns=None,
411    use_nullable_dtypes: bool = False,
412    **kwargs,
413):
414    """
415    Load a parquet object from the file path, returning a DataFrame.
416
417    Parameters
418    ----------
419    path : str, path object or file-like object
420        Any valid string path is acceptable. The string could be a URL. Valid
421        URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is
422        expected. A local file could be:
423        ``file://localhost/path/to/table.parquet``.
424        A file URL can also be a path to a directory that contains multiple
425        partitioned parquet files. Both pyarrow and fastparquet support
426        paths to directories as well as file URLs. A directory path could be:
427        ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
428
429        If you want to pass in a path object, pandas accepts any
430        ``os.PathLike``.
431
432        By file-like object, we refer to objects with a ``read()`` method,
433        such as a file handle (e.g. via builtin ``open`` function)
434        or ``StringIO``.
435    engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
436        Parquet library to use. If 'auto', then the option
437        ``io.parquet.engine`` is used. The default ``io.parquet.engine``
438        behavior is to try 'pyarrow', falling back to 'fastparquet' if
439        'pyarrow' is unavailable.
440    columns : list, default=None
441        If not None, only these columns will be read from the file.
442    use_nullable_dtypes : bool, default False
443        If True, use dtypes that use ``pd.NA`` as missing value indicator
444        for the resulting DataFrame (only applicable for ``engine="pyarrow"``).
445        As new dtypes are added that support ``pd.NA`` in the future, the
446        output with this option will change to use those dtypes.
447        Note: this is an experimental option, and behaviour (e.g. additional
448        support dtypes) may change without notice.
449
450        .. versionadded:: 1.2.0
451    **kwargs
452        Any additional kwargs are passed to the engine.
453
454    Returns
455    -------
456    DataFrame
457    """
458    impl = get_engine(engine)
459    return impl.read(
460        path, columns=columns, use_nullable_dtypes=use_nullable_dtypes, **kwargs
461    )
462