1from distutils.version import LooseVersion
2import json
3import warnings
4
5from pandas import DataFrame
6
7from geopandas._compat import import_optional_dependency
8from geopandas.array import from_wkb
9from geopandas import GeoDataFrame
10import geopandas
11from .file import _expand_user
12
13METADATA_VERSION = "0.1.0"
14# reference: https://github.com/geopandas/geo-arrow-spec
15
16# Metadata structure:
17# {
18#     "geo": {
19#         "columns": {
20#             "<name>": {
21#                 "crs": "<WKT or None: REQUIRED>",
22#                 "encoding": "WKB"
23#             }
24#         },
25#         "creator": {
26#             "library": "geopandas",
27#             "version": "<geopandas.__version__>"
28#         }
29#         "primary_column": "<str: REQUIRED>",
30#         "schema_version": "<METADATA_VERSION>"
31#     }
32# }
33
34
35def _is_fsspec_url(url):
36    return (
37        isinstance(url, str)
38        and "://" in url
39        and not url.startswith(("http://", "https://"))
40    )
41
42
43def _create_metadata(df):
44    """Create and encode geo metadata dict.
45
46    Parameters
47    ----------
48    df : GeoDataFrame
49
50    Returns
51    -------
52    dict
53    """
54
55    # Construct metadata for each geometry
56    column_metadata = {}
57    for col in df.columns[df.dtypes == "geometry"]:
58        series = df[col]
59        column_metadata[col] = {
60            "crs": series.crs.to_wkt() if series.crs else None,
61            "encoding": "WKB",
62            "bbox": series.total_bounds.tolist(),
63        }
64
65    return {
66        "primary_column": df._geometry_column_name,
67        "columns": column_metadata,
68        "schema_version": METADATA_VERSION,
69        "creator": {"library": "geopandas", "version": geopandas.__version__},
70    }
71
72
73def _encode_metadata(metadata):
74    """Encode metadata dict to UTF-8 JSON string
75
76    Parameters
77    ----------
78    metadata : dict
79
80    Returns
81    -------
82    UTF-8 encoded JSON string
83    """
84    return json.dumps(metadata).encode("utf-8")
85
86
87def _decode_metadata(metadata_str):
88    """Decode a UTF-8 encoded JSON string to dict
89
90    Parameters
91    ----------
92    metadata_str : string (UTF-8 encoded)
93
94    Returns
95    -------
96    dict
97    """
98    if metadata_str is None:
99        return None
100
101    return json.loads(metadata_str.decode("utf-8"))
102
103
104def _validate_dataframe(df):
105    """Validate that the GeoDataFrame conforms to requirements for writing
106    to Parquet format.
107
108    Raises `ValueError` if the GeoDataFrame is not valid.
109
110    copied from `pandas.io.parquet`
111
112    Parameters
113    ----------
114    df : GeoDataFrame
115    """
116
117    if not isinstance(df, DataFrame):
118        raise ValueError("Writing to Parquet/Feather only supports IO with DataFrames")
119
120    # must have value column names (strings only)
121    if df.columns.inferred_type not in {"string", "unicode", "empty"}:
122        raise ValueError("Writing to Parquet/Feather requires string column names")
123
124    # index level names must be strings
125    valid_names = all(
126        isinstance(name, str) for name in df.index.names if name is not None
127    )
128    if not valid_names:
129        raise ValueError("Index level names must be strings")
130
131
132def _validate_metadata(metadata):
133    """Validate geo metadata.
134    Must not be empty, and must contain the structure specified above.
135
136    Raises ValueError if metadata is not valid.
137
138    Parameters
139    ----------
140    metadata : dict
141    """
142
143    if not metadata:
144        raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")
145
146    required_keys = ("primary_column", "columns")
147    for key in required_keys:
148        if metadata.get(key, None) is None:
149            raise ValueError(
150                "'geo' metadata in Parquet/Feather file is missing required key: "
151                "'{key}'".format(key=key)
152            )
153
154    if not isinstance(metadata["columns"], dict):
155        raise ValueError("'columns' in 'geo' metadata must be a dict")
156
157    # Validate that geometry columns have required metadata and values
158    required_col_keys = ("crs", "encoding")
159    for col, column_metadata in metadata["columns"].items():
160        for key in required_col_keys:
161            if key not in column_metadata:
162                raise ValueError(
163                    "'geo' metadata in Parquet/Feather file is missing required key "
164                    "'{key}' for column '{col}'".format(key=key, col=col)
165                )
166
167        if column_metadata["encoding"] != "WKB":
168            raise ValueError("Only WKB geometry encoding is supported")
169
170
171def _geopandas_to_arrow(df, index=None):
172    """
173    Helper function with main, shared logic for to_parquet/to_feather.
174    """
175    from pyarrow import Table
176
177    warnings.warn(
178        "this is an initial implementation of Parquet/Feather file support and "
179        "associated metadata.  This is tracking version 0.1.0 of the metadata "
180        "specification at "
181        "https://github.com/geopandas/geo-arrow-spec\n\n"
182        "This metadata specification does not yet make stability promises.  "
183        "We do not yet recommend using this in a production setting unless you "
184        "are able to rewrite your Parquet/Feather files.\n\n"
185        "To further ignore this warning, you can do: \n"
186        "import warnings; warnings.filterwarnings('ignore', "
187        "message='.*initial implementation of Parquet.*')",
188        UserWarning,
189        stacklevel=4,
190    )
191
192    _validate_dataframe(df)
193
194    # create geo metadata before altering incoming data frame
195    geo_metadata = _create_metadata(df)
196
197    df = df.to_wkb()
198
199    table = Table.from_pandas(df, preserve_index=index)
200
201    # Store geopandas specific file-level metadata
202    # This must be done AFTER creating the table or it is not persisted
203    metadata = table.schema.metadata
204    metadata.update({b"geo": _encode_metadata(geo_metadata)})
205    return table.replace_schema_metadata(metadata)
206
207
208def _to_parquet(df, path, index=None, compression="snappy", **kwargs):
209    """
210    Write a GeoDataFrame to the Parquet format.
211
212    Any geometry columns present are serialized to WKB format in the file.
213
214    Requires 'pyarrow'.
215
216    WARNING: this is an initial implementation of Parquet file support and
217    associated metadata.  This is tracking version 0.1.0 of the metadata
218    specification at:
219    https://github.com/geopandas/geo-arrow-spec
220
221    This metadata specification does not yet make stability promises.  As such,
222    we do not yet recommend using this in a production setting unless you are
223    able to rewrite your Parquet files.
224
225
226    .. versionadded:: 0.8
227
228    Parameters
229    ----------
230    path : str, path object
231    index : bool, default None
232        If ``True``, always include the dataframe's index(es) as columns
233        in the file output.
234        If ``False``, the index(es) will not be written to the file.
235        If ``None``, the index(ex) will be included as columns in the file
236        output except `RangeIndex` which is stored as metadata only.
237    compression : {'snappy', 'gzip', 'brotli', None}, default 'snappy'
238        Name of the compression to use. Use ``None`` for no compression.
239    kwargs
240        Additional keyword arguments passed to pyarrow.parquet.write_table().
241    """
242    parquet = import_optional_dependency(
243        "pyarrow.parquet", extra="pyarrow is required for Parquet support."
244    )
245
246    path = _expand_user(path)
247    table = _geopandas_to_arrow(df, index=index)
248    parquet.write_table(table, path, compression=compression, **kwargs)
249
250
251def _to_feather(df, path, index=None, compression=None, **kwargs):
252    """
253    Write a GeoDataFrame to the Feather format.
254
255    Any geometry columns present are serialized to WKB format in the file.
256
257    Requires 'pyarrow' >= 0.17.
258
259    WARNING: this is an initial implementation of Feather file support and
260    associated metadata.  This is tracking version 0.1.0 of the metadata
261    specification at:
262    https://github.com/geopandas/geo-arrow-spec
263
264    This metadata specification does not yet make stability promises.  As such,
265    we do not yet recommend using this in a production setting unless you are
266    able to rewrite your Feather files.
267
268    .. versionadded:: 0.8
269
270    Parameters
271    ----------
272    path : str, path object
273    index : bool, default None
274        If ``True``, always include the dataframe's index(es) as columns
275        in the file output.
276        If ``False``, the index(es) will not be written to the file.
277        If ``None``, the index(ex) will be included as columns in the file
278        output except `RangeIndex` which is stored as metadata only.
279    compression : {'zstd', 'lz4', 'uncompressed'}, optional
280        Name of the compression to use. Use ``"uncompressed"`` for no
281        compression. By default uses LZ4 if available, otherwise uncompressed.
282    kwargs
283        Additional keyword arguments passed to pyarrow.feather.write_feather().
284    """
285    feather = import_optional_dependency(
286        "pyarrow.feather", extra="pyarrow is required for Feather support."
287    )
288    # TODO move this into `import_optional_dependency`
289    import pyarrow
290
291    if pyarrow.__version__ < LooseVersion("0.17.0"):
292        raise ImportError("pyarrow >= 0.17 required for Feather support")
293
294    path = _expand_user(path)
295    table = _geopandas_to_arrow(df, index=index)
296    feather.write_feather(table, path, compression=compression, **kwargs)
297
298
299def _arrow_to_geopandas(table):
300    """
301    Helper function with main, shared logic for read_parquet/read_feather.
302    """
303    df = table.to_pandas()
304
305    metadata = table.schema.metadata
306    if metadata is None or b"geo" not in metadata:
307        raise ValueError(
308            """Missing geo metadata in Parquet/Feather file.
309            Use pandas.read_parquet/read_feather() instead."""
310        )
311
312    try:
313        metadata = _decode_metadata(metadata.get(b"geo", b""))
314
315    except (TypeError, json.decoder.JSONDecodeError):
316        raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")
317
318    _validate_metadata(metadata)
319
320    # Find all geometry columns that were read from the file.  May
321    # be a subset if 'columns' parameter is used.
322    geometry_columns = df.columns.intersection(metadata["columns"])
323
324    if not len(geometry_columns):
325        raise ValueError(
326            """No geometry columns are included in the columns read from
327            the Parquet/Feather file.  To read this file without geometry columns,
328            use pandas.read_parquet/read_feather() instead."""
329        )
330
331    geometry = metadata["primary_column"]
332
333    # Missing geometry likely indicates a subset of columns was read;
334    # promote the first available geometry to the primary geometry.
335    if len(geometry_columns) and geometry not in geometry_columns:
336        geometry = geometry_columns[0]
337
338        # if there are multiple non-primary geometry columns, raise a warning
339        if len(geometry_columns) > 1:
340            warnings.warn(
341                "Multiple non-primary geometry columns read from Parquet/Feather "
342                "file. The first column read was promoted to the primary geometry."
343            )
344
345    # Convert the WKB columns that are present back to geometry.
346    for col in geometry_columns:
347        df[col] = from_wkb(df[col].values, crs=metadata["columns"][col]["crs"])
348
349    return GeoDataFrame(df, geometry=geometry)
350
351
352def _get_filesystem_path(path, filesystem=None, storage_options=None):
353    """
354    Get the filesystem and path for a given filesystem and path.
355
356    If the filesystem is not None then it's just returned as is.
357    """
358    import pyarrow
359
360    if (
361        isinstance(path, str)
362        and storage_options is None
363        and filesystem is None
364        and LooseVersion(pyarrow.__version__) >= "5.0.0"
365    ):
366        # Use the native pyarrow filesystem if possible.
367        try:
368            from pyarrow.fs import FileSystem
369
370            filesystem, path = FileSystem.from_uri(path)
371        except Exception:
372            # fallback to use get_handle / fsspec for filesystems
373            # that pyarrow doesn't support
374            pass
375
376    if _is_fsspec_url(path) and filesystem is None:
377        fsspec = import_optional_dependency(
378            "fsspec", extra="fsspec is requred for 'storage_options'."
379        )
380        filesystem, path = fsspec.core.url_to_fs(path, **(storage_options or {}))
381
382    if filesystem is None and storage_options:
383        raise ValueError(
384            "Cannot provide 'storage_options' with non-fsspec path '{}'".format(path)
385        )
386
387    return filesystem, path
388
389
390def _read_parquet(path, columns=None, storage_options=None, **kwargs):
391    """
392    Load a Parquet object from the file path, returning a GeoDataFrame.
393
394    You can read a subset of columns in the file using the ``columns`` parameter.
395    However, the structure of the returned GeoDataFrame will depend on which
396    columns you read:
397
398    * if no geometry columns are read, this will raise a ``ValueError`` - you
399      should use the pandas `read_parquet` method instead.
400    * if the primary geometry column saved to this file is not included in
401      columns, the first available geometry column will be set as the geometry
402      column of the returned GeoDataFrame.
403
404    Requires 'pyarrow'.
405
406    .. versionadded:: 0.8
407
408    Parameters
409    ----------
410    path : str, path object
411    columns : list-like of strings, default=None
412        If not None, only these columns will be read from the file.  If
413        the primary geometry column is not included, the first secondary
414        geometry read from the file will be set as the geometry column
415        of the returned GeoDataFrame.  If no geometry columns are present,
416        a ``ValueError`` will be raised.
417    storage_options : dict, optional
418        Extra options that make sense for a particular storage connection, e.g. host,
419        port, username, password, etc. For HTTP(S) URLs the key-value pairs are
420        forwarded to urllib as header options. For other URLs (e.g. starting with
421        "s3://", and "gcs://") the key-value pairs are forwarded to fsspec. Please
422        see fsspec and urllib for more details.
423
424        When no storage options are provided and a filesystem is implemented by
425        both ``pyarrow.fs`` and ``fsspec`` (e.g. "s3://") then the ``pyarrow.fs``
426        filesystem is preferred. Provide the instantiated fsspec filesystem using
427        the ``filesystem`` keyword if you wish to use its implementation.
428    **kwargs
429        Any additional kwargs passed to pyarrow.parquet.read_table().
430
431    Returns
432    -------
433    GeoDataFrame
434
435    Examples
436    --------
437    >>> df = geopandas.read_parquet("data.parquet")  # doctest: +SKIP
438
439    Specifying columns to read:
440
441    >>> df = geopandas.read_parquet(
442    ...     "data.parquet",
443    ...     columns=["geometry", "pop_est"]
444    ... )  # doctest: +SKIP
445    """
446
447    parquet = import_optional_dependency(
448        "pyarrow.parquet", extra="pyarrow is required for Parquet support."
449    )
450    # TODO(https://github.com/pandas-dev/pandas/pull/41194): see if pandas
451    # adds filesystem as a keyword and match that.
452    filesystem = kwargs.pop("filesystem", None)
453    filesystem, path = _get_filesystem_path(
454        path, filesystem=filesystem, storage_options=storage_options
455    )
456
457    path = _expand_user(path)
458    kwargs["use_pandas_metadata"] = True
459    table = parquet.read_table(path, columns=columns, filesystem=filesystem, **kwargs)
460
461    return _arrow_to_geopandas(table)
462
463
464def _read_feather(path, columns=None, **kwargs):
465    """
466    Load a Feather object from the file path, returning a GeoDataFrame.
467
468    You can read a subset of columns in the file using the ``columns`` parameter.
469    However, the structure of the returned GeoDataFrame will depend on which
470    columns you read:
471
472    * if no geometry columns are read, this will raise a ``ValueError`` - you
473      should use the pandas `read_feather` method instead.
474    * if the primary geometry column saved to this file is not included in
475      columns, the first available geometry column will be set as the geometry
476      column of the returned GeoDataFrame.
477
478    Requires 'pyarrow' >= 0.17.
479
480    .. versionadded:: 0.8
481
482    Parameters
483    ----------
484    path : str, path object
485    columns : list-like of strings, default=None
486        If not None, only these columns will be read from the file.  If
487        the primary geometry column is not included, the first secondary
488        geometry read from the file will be set as the geometry column
489        of the returned GeoDataFrame.  If no geometry columns are present,
490        a ``ValueError`` will be raised.
491    **kwargs
492        Any additional kwargs passed to pyarrow.feather.read_table().
493
494    Returns
495    -------
496    GeoDataFrame
497
498    Examples
499    --------
500    >>> df = geopandas.read_feather("data.feather")  # doctest: +SKIP
501
502    Specifying columns to read:
503
504    >>> df = geopandas.read_feather(
505    ...     "data.feather",
506    ...     columns=["geometry", "pop_est"]
507    ... )  # doctest: +SKIP
508    """
509
510    feather = import_optional_dependency(
511        "pyarrow.feather", extra="pyarrow is required for Feather support."
512    )
513    # TODO move this into `import_optional_dependency`
514    import pyarrow
515
516    if pyarrow.__version__ < LooseVersion("0.17.0"):
517        raise ImportError("pyarrow >= 0.17 required for Feather support")
518
519    path = _expand_user(path)
520    table = feather.read_table(path, columns=columns, **kwargs)
521    return _arrow_to_geopandas(table)
522