1import os.path
2
3from pandas import Series
4
5from geopandas import GeoDataFrame
6
7from geopandas.testing import (  # noqa
8    assert_geoseries_equal,
9    geom_almost_equals,
10    geom_equals,
11)
12
13HERE = os.path.abspath(os.path.dirname(__file__))
14PACKAGE_DIR = os.path.dirname(os.path.dirname(HERE))
15
16
17# mock not used here, but the import from here is used in other modules
18try:
19    import unittest.mock as mock  # noqa
20except ImportError:
21    import mock  # noqa
22
23
24def validate_boro_df(df, case_sensitive=False):
25    """Tests a GeoDataFrame that has been read in from the nybb dataset."""
26    assert isinstance(df, GeoDataFrame)
27    # Make sure all the columns are there and the geometries
28    # were properly loaded as MultiPolygons
29    assert len(df) == 5
30    columns = ("BoroCode", "BoroName", "Shape_Leng", "Shape_Area")
31    if case_sensitive:
32        for col in columns:
33            assert col in df.columns
34    else:
35        for col in columns:
36            assert col.lower() in (dfcol.lower() for dfcol in df.columns)
37    assert Series(df.geometry.type).dropna().eq("MultiPolygon").all()
38
39
40def get_srid(df):
41    """Return srid from `df.crs`."""
42    if df.crs is not None:
43        return df.crs.to_epsg() or 0
44    return 0
45
46
47def create_spatialite(con, df):
48    """
49    Return a SpatiaLite connection containing the nybb table.
50
51    Parameters
52    ----------
53    `con`: ``sqlite3.Connection``
54    `df`: ``GeoDataFrame``
55    """
56
57    with con:
58        geom_col = df.geometry.name
59        srid = get_srid(df)
60        con.execute(
61            "CREATE TABLE IF NOT EXISTS nybb "
62            "( ogc_fid INTEGER PRIMARY KEY"
63            ", borocode INTEGER"
64            ", boroname TEXT"
65            ", shape_leng REAL"
66            ", shape_area REAL"
67            ")"
68        )
69        con.execute(
70            "SELECT AddGeometryColumn(?, ?, ?, ?)",
71            ("nybb", geom_col, srid, df.geom_type.dropna().iat[0].upper()),
72        )
73        con.execute("SELECT CreateSpatialIndex(?, ?)", ("nybb", geom_col))
74        sql_row = "INSERT INTO nybb VALUES(?, ?, ?, ?, ?, GeomFromText(?, ?))"
75        con.executemany(
76            sql_row,
77            (
78                (
79                    None,
80                    row.BoroCode,
81                    row.BoroName,
82                    row.Shape_Leng,
83                    row.Shape_Area,
84                    row.geometry.wkt if row.geometry else None,
85                    srid,
86                )
87                for row in df.itertuples(index=False)
88            ),
89        )
90
91
92def create_postgis(con, df, srid=None, geom_col="geom"):
93    """
94    Create a nybb table in the test_geopandas PostGIS database.
95    Returns a boolean indicating whether the database table was successfully
96    created
97    """
98    # Try to create the database, skip the db tests if something goes
99    # wrong
100    # If you'd like these tests to run, create a database called
101    # 'test_geopandas' and enable postgis in it:
102    # > createdb test_geopandas
103    # > psql -c "CREATE EXTENSION postgis" -d test_geopandas
104    if srid is not None:
105        geom_schema = "geometry(MULTIPOLYGON, {})".format(srid)
106        geom_insert = "ST_SetSRID(ST_GeometryFromText(%s), {})".format(srid)
107    else:
108        geom_schema = "geometry"
109        geom_insert = "ST_GeometryFromText(%s)"
110    try:
111        cursor = con.cursor()
112        cursor.execute("DROP TABLE IF EXISTS nybb;")
113
114        sql = """CREATE TABLE nybb (
115            {geom_col}   {geom_schema},
116            borocode     integer,
117            boroname     varchar(40),
118            shape_leng   float,
119            shape_area   float
120            );""".format(
121            geom_col=geom_col, geom_schema=geom_schema
122        )
123        cursor.execute(sql)
124
125        for i, row in df.iterrows():
126            sql = """INSERT INTO nybb VALUES ({}, %s, %s, %s, %s
127            );""".format(
128                geom_insert
129            )
130            cursor.execute(
131                sql,
132                (
133                    row["geometry"].wkt,
134                    row["BoroCode"],
135                    row["BoroName"],
136                    row["Shape_Leng"],
137                    row["Shape_Area"],
138                ),
139            )
140    finally:
141        cursor.close()
142        con.commit()
143