1from json import loads
2import os
3from pkg_resources import parse_version
4import pytest
5import platform
6
7from sqlalchemy import __version__ as SA_VERSION
8from sqlalchemy import create_engine, MetaData, Column, Integer, bindparam
9from sqlalchemy.orm import sessionmaker
10from sqlalchemy.ext.declarative import declarative_base
11from sqlalchemy.event import listen
12from sqlalchemy.exc import IntegrityError
13from sqlalchemy.sql import select, func
14
15from geoalchemy2 import Geometry
16from geoalchemy2.elements import WKTElement, WKBElement
17from geoalchemy2.shape import from_shape, to_shape
18from geoalchemy2.compat import str as str_
19
20from shapely.geometry import LineString
21
22
23if 'SPATIALITE_LIBRARY_PATH' not in os.environ:
24    pytest.skip('SPATIALITE_LIBRARY_PATH is not defined, skip SpatiaLite tests',
25                allow_module_level=True)
26
27if platform.python_implementation().lower() == 'pypy':
28    pytest.skip('skip SpatiaLite tests on PyPy', allow_module_level=True)
29
30
31def load_spatialite(dbapi_conn, connection_record):
32    dbapi_conn.enable_load_extension(True)
33    dbapi_conn.load_extension(os.environ['SPATIALITE_LIBRARY_PATH'])
34    dbapi_conn.enable_load_extension(False)
35
36
37engine = create_engine(
38    os.environ.get('SPATIALITE_DB_PATH', 'sqlite:///spatialdb'), echo=False)
39listen(engine, 'connect', load_spatialite)
40
41metadata = MetaData(engine)
42Base = declarative_base(metadata=metadata)
43
44
45class Lake(Base):
46    __tablename__ = 'lake'
47    id = Column(Integer, primary_key=True)
48    geom = Column(Geometry(geometry_type='LINESTRING', srid=4326,
49                           management=True))
50
51    def __init__(self, geom):
52        self.geom = geom
53
54
55session = sessionmaker(bind=engine)()
56
57session.execute('SELECT InitSpatialMetaData()')
58
59
60class TestInsertionCore():
61
62    def setup(self):
63        metadata.drop_all(checkfirst=True)
64        metadata.create_all()
65        self.conn = engine.connect()
66
67    def teardown(self):
68        self.conn.close()
69        metadata.drop_all()
70
71    def test_insert(self):
72        conn = self.conn
73
74        # Issue two inserts using DBAPI's executemany() method. This tests
75        # the Geometry type's bind_processor and bind_expression functions.
76        conn.execute(Lake.__table__.insert(), [
77            {'geom': 'SRID=4326;LINESTRING(0 0,1 1)'},
78            {'geom': WKTElement('LINESTRING(0 0,2 2)', srid=4326)},
79            {'geom': from_shape(LineString([[0, 0], [3, 3]]), srid=4326)},
80            {'geom': None}
81        ])
82
83        results = conn.execute(Lake.__table__.select())
84        rows = results.fetchall()
85
86        row = rows[0]
87        assert isinstance(row[1], WKBElement)
88        wkt = session.execute(row[1].ST_AsText()).scalar()
89        assert wkt == 'LINESTRING(0 0, 1 1)'
90        srid = session.execute(row[1].ST_SRID()).scalar()
91        assert srid == 4326
92
93        row = rows[1]
94        assert isinstance(row[1], WKBElement)
95        wkt = session.execute(row[1].ST_AsText()).scalar()
96        assert wkt == 'LINESTRING(0 0, 2 2)'
97        srid = session.execute(row[1].ST_SRID()).scalar()
98        assert srid == 4326
99
100        row = rows[2]
101        assert isinstance(row[1], WKBElement)
102        wkt = session.execute(row[1].ST_AsText()).scalar()
103        assert wkt == 'LINESTRING(0 0, 3 3)'
104        srid = session.execute(row[1].ST_SRID()).scalar()
105        assert srid == 4326
106
107        assert rows[3] == (4, None)
108
109
110class TestInsertionORM():
111
112    def setup(self):
113        metadata.drop_all(checkfirst=True)
114        metadata.create_all()
115
116    def teardown(self):
117        session.rollback()
118        metadata.drop_all()
119
120    def test_WKT(self):
121        lake = Lake('LINESTRING(0 0,1 1)')
122        session.add(lake)
123
124        with pytest.raises(IntegrityError):
125            session.flush()
126
127    def test_WKTElement(self):
128        lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326))
129        session.add(lake)
130        session.flush()
131        session.expire(lake)
132        assert isinstance(lake.geom, WKBElement)
133        assert str(lake.geom) == '0102000020E6100000020000000000000000000000000000000000000000000' \
134                                 '0000000F03F000000000000F03F'
135        wkt = session.execute(lake.geom.ST_AsText()).scalar()
136        assert wkt == 'LINESTRING(0 0, 1 1)'
137        srid = session.execute(lake.geom.ST_SRID()).scalar()
138        assert srid == 4326
139
140    def test_WKBElement(self):
141        shape = LineString([[0, 0], [1, 1]])
142        lake = Lake(from_shape(shape, srid=4326))
143        session.add(lake)
144        session.flush()
145        session.expire(lake)
146        assert isinstance(lake.geom, WKBElement)
147        assert str(lake.geom) == '0102000020E6100000020000000000000000000000000000000000000000000' \
148                                 '0000000F03F000000000000F03F'
149        wkt = session.execute(lake.geom.ST_AsText()).scalar()
150        assert wkt == 'LINESTRING(0 0, 1 1)'
151        srid = session.execute(lake.geom.ST_SRID()).scalar()
152        assert srid == 4326
153
154
155class TestShapely():
156
157    def setup(self):
158        metadata.drop_all(checkfirst=True)
159        metadata.create_all()
160
161    def teardown(self):
162        session.rollback()
163        metadata.drop_all()
164
165    def test_to_shape(self):
166        lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326))
167        session.add(lake)
168        session.flush()
169        session.expire(lake)
170        lake = session.query(Lake).one()
171        assert isinstance(lake.geom, WKBElement)
172        assert isinstance(lake.geom.data, str_)
173        assert lake.geom.srid == 4326
174        s = to_shape(lake.geom)
175        assert isinstance(s, LineString)
176        assert s.wkt == 'LINESTRING (0 0, 1 1)'
177        lake = Lake(lake.geom)
178        session.add(lake)
179        session.flush()
180        session.expire(lake)
181        assert isinstance(lake.geom, WKBElement)
182        assert isinstance(lake.geom.data, str_)
183        assert lake.geom.srid == 4326
184
185
186class TestCallFunction():
187
188    def setup(self):
189        metadata.drop_all(checkfirst=True)
190        metadata.create_all()
191
192    def teardown(self):
193        session.rollback()
194        metadata.drop_all()
195
196    def _create_one_lake(self):
197        lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326))
198        session.add(lake)
199        session.flush()
200        return lake.id
201
202    def test_ST_GeometryType(self):
203        lake_id = self._create_one_lake()
204
205        s = select([func.ST_GeometryType(Lake.__table__.c.geom)])
206        r1 = session.execute(s).scalar()
207        assert r1 == 'LINESTRING'
208
209        lake = session.query(Lake).get(lake_id)
210        r2 = session.execute(lake.geom.ST_GeometryType()).scalar()
211        assert r2 == 'LINESTRING'
212
213        r3 = session.query(Lake.geom.ST_GeometryType()).scalar()
214        assert r3 == 'LINESTRING'
215
216        r4 = session.query(Lake).filter(
217            Lake.geom.ST_GeometryType() == 'LINESTRING').one()
218        assert isinstance(r4, Lake)
219        assert r4.id == lake_id
220
221    def test_ST_Buffer(self):
222        lake_id = self._create_one_lake()
223
224        s = select([func.ST_Buffer(Lake.__table__.c.geom, 2)])
225        r1 = session.execute(s).scalar()
226        assert isinstance(r1, WKBElement)
227
228        lake = session.query(Lake).get(lake_id)
229        r2 = session.execute(lake.geom.ST_Buffer(2)).scalar()
230        assert isinstance(r2, WKBElement)
231
232        r3 = session.query(Lake.geom.ST_Buffer(2)).scalar()
233        assert isinstance(r3, WKBElement)
234
235        assert r1.data == r2.data == r3.data
236
237        r4 = session.query(Lake).filter(
238            func.ST_Within(WKTElement('POINT(0 0)', srid=4326),
239                           Lake.geom.ST_Buffer(2))).one()
240        assert isinstance(r4, Lake)
241        assert r4.id == lake_id
242
243    def test_ST_GeoJSON(self):
244        lake_id = self._create_one_lake()
245
246        def _test(r):
247            r = loads(r)
248            assert r["type"] == "LineString"
249            assert r["coordinates"] == [[0, 0], [1, 1]]
250
251        s = select([func.ST_AsGeoJSON(Lake.__table__.c.geom)])
252        r = session.execute(s).scalar()
253        _test(r)
254
255        lake = session.query(Lake).get(lake_id)
256        r = session.execute(lake.geom.ST_AsGeoJSON()).scalar()
257        _test(r)
258
259        r = session.query(Lake.geom.ST_AsGeoJSON()).scalar()
260        _test(r)
261
262    @pytest.mark.skipif(
263        True,
264        reason='Spatialite does not support the feature version of AsGeoJson() yet')
265    def test_ST_GeoJSON_feature(self):
266        ss3 = select([Lake, bindparam('dummy_val', 10).label('dummy_attr')]).alias()
267        s3 = select([func.ST_AsGeoJSON(ss3, 'geom')])
268        r3 = session.execute(s3).scalar()
269        assert loads(r3) == {
270            "type": "Feature",
271            "geometry": {
272                "type": "LineString",
273                "coordinates": [[0, 0], [1, 1]]
274            },
275            "properties": {"dummy_attr": 10, "id": 1}
276        }
277
278    @pytest.mark.skipif(
279        parse_version(SA_VERSION) < parse_version("1.3.4"),
280        reason='Case-insensitivity is only available for sqlalchemy>=1.3.4')
281    def test_comparator_case_insensitivity(self):
282        lake_id = self._create_one_lake()
283
284        s = select([func.ST_Buffer(Lake.__table__.c.geom, 2)])
285        r1 = session.execute(s).scalar()
286        assert isinstance(r1, WKBElement)
287
288        lake = session.query(Lake).get(lake_id)
289
290        r2 = session.execute(lake.geom.ST_Buffer(2)).scalar()
291        assert isinstance(r2, WKBElement)
292
293        r3 = session.execute(lake.geom.st_buffer(2)).scalar()
294        assert isinstance(r3, WKBElement)
295
296        r4 = session.execute(lake.geom.St_BuFfEr(2)).scalar()
297        assert isinstance(r4, WKBElement)
298
299        r5 = session.query(Lake.geom.ST_Buffer(2)).scalar()
300        assert isinstance(r5, WKBElement)
301
302        r6 = session.query(Lake.geom.st_buffer(2)).scalar()
303        assert isinstance(r6, WKBElement)
304
305        r7 = session.query(Lake.geom.St_BuFfEr(2)).scalar()
306        assert isinstance(r7, WKBElement)
307
308        assert (
309            r1.data == r2.data == r3.data == r4.data == r5.data == r6.data
310            == r7.data)
311
312
313class TestNullable():
314
315    class NotNullableLake(Base):
316        __tablename__ = 'NotNullablelake'
317        id = Column(Integer, primary_key=True)
318        geom = Column(Geometry(geometry_type='LINESTRING', srid=4326,
319                               management=True, nullable=False))
320
321        def __init__(self, geom):
322            self.geom = geom
323
324    def setup(self):
325        metadata.drop_all(checkfirst=True)
326        metadata.create_all()
327        self.conn = engine.connect()
328
329    def teardown(self):
330        self.conn.close()
331        metadata.drop_all()
332
333    def test_insert(self):
334        conn = self.conn
335
336        # Insert geometries
337        conn.execute(TestNullable.NotNullableLake.__table__.insert(), [
338            {'geom': 'SRID=4326;LINESTRING(0 0,1 1)'},
339            {'geom': WKTElement('LINESTRING(0 0,2 2)', srid=4326)},
340            {'geom': from_shape(LineString([[0, 0], [3, 3]]), srid=4326)}
341        ])
342
343        # Fail when trying to insert null geometry
344        with pytest.raises(IntegrityError):
345            conn.execute(TestNullable.NotNullableLake.__table__.insert(), [
346                {'geom': None}
347            ])
348