1""" parquet compat """ 2 3from distutils.version import LooseVersion 4import io 5import os 6from typing import Any, AnyStr, Dict, List, Optional, Tuple 7from warnings import catch_warnings 8 9from pandas._typing import FilePathOrBuffer, StorageOptions 10from pandas.compat._optional import import_optional_dependency 11from pandas.errors import AbstractMethodError 12from pandas.util._decorators import doc 13 14from pandas import DataFrame, MultiIndex, get_option 15from pandas.core import generic 16 17from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path 18 19 20def get_engine(engine: str) -> "BaseImpl": 21 """ return our implementation """ 22 if engine == "auto": 23 engine = get_option("io.parquet.engine") 24 25 if engine == "auto": 26 # try engines in this order 27 engine_classes = [PyArrowImpl, FastParquetImpl] 28 29 error_msgs = "" 30 for engine_class in engine_classes: 31 try: 32 return engine_class() 33 except ImportError as err: 34 error_msgs += "\n - " + str(err) 35 36 raise ImportError( 37 "Unable to find a usable engine; " 38 "tried using: 'pyarrow', 'fastparquet'.\n" 39 "A suitable version of " 40 "pyarrow or fastparquet is required for parquet " 41 "support.\n" 42 "Trying to import the above resulted in these errors:" 43 f"{error_msgs}" 44 ) 45 46 if engine == "pyarrow": 47 return PyArrowImpl() 48 elif engine == "fastparquet": 49 return FastParquetImpl() 50 51 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") 52 53 54def _get_path_or_handle( 55 path: FilePathOrBuffer, 56 fs: Any, 57 storage_options: StorageOptions = None, 58 mode: str = "rb", 59 is_dir: bool = False, 60) -> Tuple[FilePathOrBuffer, Optional[IOHandles], Any]: 61 """File handling for PyArrow.""" 62 path_or_handle = stringify_path(path) 63 if is_fsspec_url(path_or_handle) and fs is None: 64 fsspec = import_optional_dependency("fsspec") 65 66 fs, path_or_handle = fsspec.core.url_to_fs( 67 path_or_handle, **(storage_options or {}) 68 ) 69 elif storage_options: 70 raise ValueError("storage_options passed with buffer or non-fsspec filepath") 71 72 handles = None 73 if ( 74 not fs 75 and not is_dir 76 and isinstance(path_or_handle, str) 77 and not os.path.isdir(path_or_handle) 78 ): 79 # use get_handle only when we are very certain that it is not a directory 80 # fsspec resources can also point to directories 81 # this branch is used for example when reading from non-fsspec URLs 82 handles = get_handle(path_or_handle, mode, is_text=False) 83 fs = None 84 path_or_handle = handles.handle 85 return path_or_handle, handles, fs 86 87 88class BaseImpl: 89 @staticmethod 90 def validate_dataframe(df: DataFrame): 91 92 if not isinstance(df, DataFrame): 93 raise ValueError("to_parquet only supports IO with DataFrames") 94 95 # must have value column names for all index levels (strings only) 96 if isinstance(df.columns, MultiIndex): 97 if not all( 98 x.inferred_type in {"string", "empty"} for x in df.columns.levels 99 ): 100 raise ValueError( 101 """ 102 parquet must have string column names for all values in 103 each level of the MultiIndex 104 """ 105 ) 106 else: 107 if df.columns.inferred_type not in {"string", "empty"}: 108 raise ValueError("parquet must have string column names") 109 110 # index level names must be strings 111 valid_names = all( 112 isinstance(name, str) for name in df.index.names if name is not None 113 ) 114 if not valid_names: 115 raise ValueError("Index level names must be strings") 116 117 def write(self, df: DataFrame, path, compression, **kwargs): 118 raise AbstractMethodError(self) 119 120 def read(self, path, columns=None, **kwargs): 121 raise AbstractMethodError(self) 122 123 124class PyArrowImpl(BaseImpl): 125 def __init__(self): 126 import_optional_dependency( 127 "pyarrow", extra="pyarrow is required for parquet support." 128 ) 129 import pyarrow.parquet 130 131 # import utils to register the pyarrow extension types 132 import pandas.core.arrays._arrow_utils # noqa 133 134 self.api = pyarrow 135 136 def write( 137 self, 138 df: DataFrame, 139 path: FilePathOrBuffer[AnyStr], 140 compression: Optional[str] = "snappy", 141 index: Optional[bool] = None, 142 storage_options: StorageOptions = None, 143 partition_cols: Optional[List[str]] = None, 144 **kwargs, 145 ): 146 self.validate_dataframe(df) 147 148 from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} 149 if index is not None: 150 from_pandas_kwargs["preserve_index"] = index 151 152 table = self.api.Table.from_pandas(df, **from_pandas_kwargs) 153 154 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 155 path, 156 kwargs.pop("filesystem", None), 157 storage_options=storage_options, 158 mode="wb", 159 is_dir=partition_cols is not None, 160 ) 161 try: 162 if partition_cols is not None: 163 # writes to multiple files under the given path 164 self.api.parquet.write_to_dataset( 165 table, 166 path_or_handle, 167 compression=compression, 168 partition_cols=partition_cols, 169 **kwargs, 170 ) 171 else: 172 # write to single output file 173 self.api.parquet.write_table( 174 table, path_or_handle, compression=compression, **kwargs 175 ) 176 finally: 177 if handles is not None: 178 handles.close() 179 180 def read( 181 self, 182 path, 183 columns=None, 184 use_nullable_dtypes=False, 185 storage_options: StorageOptions = None, 186 **kwargs, 187 ): 188 kwargs["use_pandas_metadata"] = True 189 190 to_pandas_kwargs = {} 191 if use_nullable_dtypes: 192 if LooseVersion(self.api.__version__) >= "0.16": 193 import pandas as pd 194 195 mapping = { 196 self.api.int8(): pd.Int8Dtype(), 197 self.api.int16(): pd.Int16Dtype(), 198 self.api.int32(): pd.Int32Dtype(), 199 self.api.int64(): pd.Int64Dtype(), 200 self.api.uint8(): pd.UInt8Dtype(), 201 self.api.uint16(): pd.UInt16Dtype(), 202 self.api.uint32(): pd.UInt32Dtype(), 203 self.api.uint64(): pd.UInt64Dtype(), 204 self.api.bool_(): pd.BooleanDtype(), 205 self.api.string(): pd.StringDtype(), 206 } 207 to_pandas_kwargs["types_mapper"] = mapping.get 208 else: 209 raise ValueError( 210 "'use_nullable_dtypes=True' is only supported for pyarrow >= 0.16 " 211 f"({self.api.__version__} is installed" 212 ) 213 214 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 215 path, 216 kwargs.pop("filesystem", None), 217 storage_options=storage_options, 218 mode="rb", 219 ) 220 try: 221 return self.api.parquet.read_table( 222 path_or_handle, columns=columns, **kwargs 223 ).to_pandas(**to_pandas_kwargs) 224 finally: 225 if handles is not None: 226 handles.close() 227 228 229class FastParquetImpl(BaseImpl): 230 def __init__(self): 231 # since pandas is a dependency of fastparquet 232 # we need to import on first use 233 fastparquet = import_optional_dependency( 234 "fastparquet", extra="fastparquet is required for parquet support." 235 ) 236 self.api = fastparquet 237 238 def write( 239 self, 240 df: DataFrame, 241 path, 242 compression="snappy", 243 index=None, 244 partition_cols=None, 245 storage_options: StorageOptions = None, 246 **kwargs, 247 ): 248 self.validate_dataframe(df) 249 # thriftpy/protocol/compact.py:339: 250 # DeprecationWarning: tostring() is deprecated. 251 # Use tobytes() instead. 252 253 if "partition_on" in kwargs and partition_cols is not None: 254 raise ValueError( 255 "Cannot use both partition_on and " 256 "partition_cols. Use partition_cols for partitioning data" 257 ) 258 elif "partition_on" in kwargs: 259 partition_cols = kwargs.pop("partition_on") 260 261 if partition_cols is not None: 262 kwargs["file_scheme"] = "hive" 263 264 # cannot use get_handle as write() does not accept file buffers 265 path = stringify_path(path) 266 if is_fsspec_url(path): 267 fsspec = import_optional_dependency("fsspec") 268 269 # if filesystem is provided by fsspec, file must be opened in 'wb' mode. 270 kwargs["open_with"] = lambda path, _: fsspec.open( 271 path, "wb", **(storage_options or {}) 272 ).open() 273 elif storage_options: 274 raise ValueError( 275 "storage_options passed with file object or non-fsspec file path" 276 ) 277 278 with catch_warnings(record=True): 279 self.api.write( 280 path, 281 df, 282 compression=compression, 283 write_index=index, 284 partition_on=partition_cols, 285 **kwargs, 286 ) 287 288 def read( 289 self, path, columns=None, storage_options: StorageOptions = None, **kwargs 290 ): 291 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False) 292 if use_nullable_dtypes: 293 raise ValueError( 294 "The 'use_nullable_dtypes' argument is not supported for the " 295 "fastparquet engine" 296 ) 297 path = stringify_path(path) 298 parquet_kwargs = {} 299 handles = None 300 if is_fsspec_url(path): 301 fsspec = import_optional_dependency("fsspec") 302 303 parquet_kwargs["open_with"] = lambda path, _: fsspec.open( 304 path, "rb", **(storage_options or {}) 305 ).open() 306 elif isinstance(path, str) and not os.path.isdir(path): 307 # use get_handle only when we are very certain that it is not a directory 308 # fsspec resources can also point to directories 309 # this branch is used for example when reading from non-fsspec URLs 310 handles = get_handle(path, "rb", is_text=False) 311 path = handles.handle 312 parquet_file = self.api.ParquetFile(path, **parquet_kwargs) 313 314 result = parquet_file.to_pandas(columns=columns, **kwargs) 315 316 if handles is not None: 317 handles.close() 318 return result 319 320 321@doc(storage_options=generic._shared_docs["storage_options"]) 322def to_parquet( 323 df: DataFrame, 324 path: Optional[FilePathOrBuffer] = None, 325 engine: str = "auto", 326 compression: Optional[str] = "snappy", 327 index: Optional[bool] = None, 328 storage_options: StorageOptions = None, 329 partition_cols: Optional[List[str]] = None, 330 **kwargs, 331) -> Optional[bytes]: 332 """ 333 Write a DataFrame to the parquet format. 334 335 Parameters 336 ---------- 337 df : DataFrame 338 path : str or file-like object, default None 339 If a string, it will be used as Root Directory path 340 when writing a partitioned dataset. By file-like object, 341 we refer to objects with a write() method, such as a file handle 342 (e.g. via builtin open function) or io.BytesIO. The engine 343 fastparquet does not accept file-like objects. If path is None, 344 a bytes object is returned. 345 346 .. versionchanged:: 1.2.0 347 348 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' 349 Parquet library to use. If 'auto', then the option 350 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 351 behavior is to try 'pyarrow', falling back to 'fastparquet' if 352 'pyarrow' is unavailable. 353 compression : {{'snappy', 'gzip', 'brotli', None}}, default 'snappy' 354 Name of the compression to use. Use ``None`` for no compression. 355 index : bool, default None 356 If ``True``, include the dataframe's index(es) in the file output. If 357 ``False``, they will not be written to the file. 358 If ``None``, similar to ``True`` the dataframe's index(es) 359 will be saved. However, instead of being saved as values, 360 the RangeIndex will be stored as a range in the metadata so it 361 doesn't require much space and is faster. Other indexes will 362 be included as columns in the file output. 363 364 .. versionadded:: 0.24.0 365 366 partition_cols : str or list, optional, default None 367 Column names by which to partition the dataset. 368 Columns are partitioned in the order they are given. 369 Must be None if path is not a string. 370 371 .. versionadded:: 0.24.0 372 373 {storage_options} 374 375 .. versionadded:: 1.2.0 376 377 kwargs 378 Additional keyword arguments passed to the engine 379 380 Returns 381 ------- 382 bytes if no path argument is provided else None 383 """ 384 if isinstance(partition_cols, str): 385 partition_cols = [partition_cols] 386 impl = get_engine(engine) 387 388 path_or_buf: FilePathOrBuffer = io.BytesIO() if path is None else path 389 390 impl.write( 391 df, 392 path_or_buf, 393 compression=compression, 394 index=index, 395 partition_cols=partition_cols, 396 storage_options=storage_options, 397 **kwargs, 398 ) 399 400 if path is None: 401 assert isinstance(path_or_buf, io.BytesIO) 402 return path_or_buf.getvalue() 403 else: 404 return None 405 406 407def read_parquet( 408 path, 409 engine: str = "auto", 410 columns=None, 411 use_nullable_dtypes: bool = False, 412 **kwargs, 413): 414 """ 415 Load a parquet object from the file path, returning a DataFrame. 416 417 Parameters 418 ---------- 419 path : str, path object or file-like object 420 Any valid string path is acceptable. The string could be a URL. Valid 421 URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is 422 expected. A local file could be: 423 ``file://localhost/path/to/table.parquet``. 424 A file URL can also be a path to a directory that contains multiple 425 partitioned parquet files. Both pyarrow and fastparquet support 426 paths to directories as well as file URLs. A directory path could be: 427 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir`` 428 429 If you want to pass in a path object, pandas accepts any 430 ``os.PathLike``. 431 432 By file-like object, we refer to objects with a ``read()`` method, 433 such as a file handle (e.g. via builtin ``open`` function) 434 or ``StringIO``. 435 engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' 436 Parquet library to use. If 'auto', then the option 437 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 438 behavior is to try 'pyarrow', falling back to 'fastparquet' if 439 'pyarrow' is unavailable. 440 columns : list, default=None 441 If not None, only these columns will be read from the file. 442 use_nullable_dtypes : bool, default False 443 If True, use dtypes that use ``pd.NA`` as missing value indicator 444 for the resulting DataFrame (only applicable for ``engine="pyarrow"``). 445 As new dtypes are added that support ``pd.NA`` in the future, the 446 output with this option will change to use those dtypes. 447 Note: this is an experimental option, and behaviour (e.g. additional 448 support dtypes) may change without notice. 449 450 .. versionadded:: 1.2.0 451 **kwargs 452 Any additional kwargs are passed to the engine. 453 454 Returns 455 ------- 456 DataFrame 457 """ 458 impl = get_engine(engine) 459 return impl.read( 460 path, columns=columns, use_nullable_dtypes=use_nullable_dtypes, **kwargs 461 ) 462