1from json import loads 2import os 3from pkg_resources import parse_version 4import pytest 5import platform 6 7from sqlalchemy import __version__ as SA_VERSION 8from sqlalchemy import create_engine, MetaData, Column, Integer, bindparam 9from sqlalchemy.orm import sessionmaker 10from sqlalchemy.ext.declarative import declarative_base 11from sqlalchemy.event import listen 12from sqlalchemy.exc import IntegrityError 13from sqlalchemy.sql import select, func 14 15from geoalchemy2 import Geometry 16from geoalchemy2.elements import WKTElement, WKBElement 17from geoalchemy2.shape import from_shape, to_shape 18from geoalchemy2.compat import str as str_ 19 20from shapely.geometry import LineString 21 22 23if 'SPATIALITE_LIBRARY_PATH' not in os.environ: 24 pytest.skip('SPATIALITE_LIBRARY_PATH is not defined, skip SpatiaLite tests', 25 allow_module_level=True) 26 27if platform.python_implementation().lower() == 'pypy': 28 pytest.skip('skip SpatiaLite tests on PyPy', allow_module_level=True) 29 30 31def load_spatialite(dbapi_conn, connection_record): 32 dbapi_conn.enable_load_extension(True) 33 dbapi_conn.load_extension(os.environ['SPATIALITE_LIBRARY_PATH']) 34 dbapi_conn.enable_load_extension(False) 35 36 37engine = create_engine( 38 os.environ.get('SPATIALITE_DB_PATH', 'sqlite:///spatialdb'), echo=False) 39listen(engine, 'connect', load_spatialite) 40 41metadata = MetaData(engine) 42Base = declarative_base(metadata=metadata) 43 44 45class Lake(Base): 46 __tablename__ = 'lake' 47 id = Column(Integer, primary_key=True) 48 geom = Column(Geometry(geometry_type='LINESTRING', srid=4326, 49 management=True)) 50 51 def __init__(self, geom): 52 self.geom = geom 53 54 55session = sessionmaker(bind=engine)() 56 57session.execute('SELECT InitSpatialMetaData()') 58 59 60class TestInsertionCore(): 61 62 def setup(self): 63 metadata.drop_all(checkfirst=True) 64 metadata.create_all() 65 self.conn = engine.connect() 66 67 def teardown(self): 68 self.conn.close() 69 metadata.drop_all() 70 71 def test_insert(self): 72 conn = self.conn 73 74 # Issue two inserts using DBAPI's executemany() method. This tests 75 # the Geometry type's bind_processor and bind_expression functions. 76 conn.execute(Lake.__table__.insert(), [ 77 {'geom': 'SRID=4326;LINESTRING(0 0,1 1)'}, 78 {'geom': WKTElement('LINESTRING(0 0,2 2)', srid=4326)}, 79 {'geom': from_shape(LineString([[0, 0], [3, 3]]), srid=4326)}, 80 {'geom': None} 81 ]) 82 83 results = conn.execute(Lake.__table__.select()) 84 rows = results.fetchall() 85 86 row = rows[0] 87 assert isinstance(row[1], WKBElement) 88 wkt = session.execute(row[1].ST_AsText()).scalar() 89 assert wkt == 'LINESTRING(0 0, 1 1)' 90 srid = session.execute(row[1].ST_SRID()).scalar() 91 assert srid == 4326 92 93 row = rows[1] 94 assert isinstance(row[1], WKBElement) 95 wkt = session.execute(row[1].ST_AsText()).scalar() 96 assert wkt == 'LINESTRING(0 0, 2 2)' 97 srid = session.execute(row[1].ST_SRID()).scalar() 98 assert srid == 4326 99 100 row = rows[2] 101 assert isinstance(row[1], WKBElement) 102 wkt = session.execute(row[1].ST_AsText()).scalar() 103 assert wkt == 'LINESTRING(0 0, 3 3)' 104 srid = session.execute(row[1].ST_SRID()).scalar() 105 assert srid == 4326 106 107 assert rows[3] == (4, None) 108 109 110class TestInsertionORM(): 111 112 def setup(self): 113 metadata.drop_all(checkfirst=True) 114 metadata.create_all() 115 116 def teardown(self): 117 session.rollback() 118 metadata.drop_all() 119 120 def test_WKT(self): 121 lake = Lake('LINESTRING(0 0,1 1)') 122 session.add(lake) 123 124 with pytest.raises(IntegrityError): 125 session.flush() 126 127 def test_WKTElement(self): 128 lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326)) 129 session.add(lake) 130 session.flush() 131 session.expire(lake) 132 assert isinstance(lake.geom, WKBElement) 133 assert str(lake.geom) == '0102000020E6100000020000000000000000000000000000000000000000000' \ 134 '0000000F03F000000000000F03F' 135 wkt = session.execute(lake.geom.ST_AsText()).scalar() 136 assert wkt == 'LINESTRING(0 0, 1 1)' 137 srid = session.execute(lake.geom.ST_SRID()).scalar() 138 assert srid == 4326 139 140 def test_WKBElement(self): 141 shape = LineString([[0, 0], [1, 1]]) 142 lake = Lake(from_shape(shape, srid=4326)) 143 session.add(lake) 144 session.flush() 145 session.expire(lake) 146 assert isinstance(lake.geom, WKBElement) 147 assert str(lake.geom) == '0102000020E6100000020000000000000000000000000000000000000000000' \ 148 '0000000F03F000000000000F03F' 149 wkt = session.execute(lake.geom.ST_AsText()).scalar() 150 assert wkt == 'LINESTRING(0 0, 1 1)' 151 srid = session.execute(lake.geom.ST_SRID()).scalar() 152 assert srid == 4326 153 154 155class TestShapely(): 156 157 def setup(self): 158 metadata.drop_all(checkfirst=True) 159 metadata.create_all() 160 161 def teardown(self): 162 session.rollback() 163 metadata.drop_all() 164 165 def test_to_shape(self): 166 lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326)) 167 session.add(lake) 168 session.flush() 169 session.expire(lake) 170 lake = session.query(Lake).one() 171 assert isinstance(lake.geom, WKBElement) 172 assert isinstance(lake.geom.data, str_) 173 assert lake.geom.srid == 4326 174 s = to_shape(lake.geom) 175 assert isinstance(s, LineString) 176 assert s.wkt == 'LINESTRING (0 0, 1 1)' 177 lake = Lake(lake.geom) 178 session.add(lake) 179 session.flush() 180 session.expire(lake) 181 assert isinstance(lake.geom, WKBElement) 182 assert isinstance(lake.geom.data, str_) 183 assert lake.geom.srid == 4326 184 185 186class TestCallFunction(): 187 188 def setup(self): 189 metadata.drop_all(checkfirst=True) 190 metadata.create_all() 191 192 def teardown(self): 193 session.rollback() 194 metadata.drop_all() 195 196 def _create_one_lake(self): 197 lake = Lake(WKTElement('LINESTRING(0 0,1 1)', srid=4326)) 198 session.add(lake) 199 session.flush() 200 return lake.id 201 202 def test_ST_GeometryType(self): 203 lake_id = self._create_one_lake() 204 205 s = select([func.ST_GeometryType(Lake.__table__.c.geom)]) 206 r1 = session.execute(s).scalar() 207 assert r1 == 'LINESTRING' 208 209 lake = session.query(Lake).get(lake_id) 210 r2 = session.execute(lake.geom.ST_GeometryType()).scalar() 211 assert r2 == 'LINESTRING' 212 213 r3 = session.query(Lake.geom.ST_GeometryType()).scalar() 214 assert r3 == 'LINESTRING' 215 216 r4 = session.query(Lake).filter( 217 Lake.geom.ST_GeometryType() == 'LINESTRING').one() 218 assert isinstance(r4, Lake) 219 assert r4.id == lake_id 220 221 def test_ST_Buffer(self): 222 lake_id = self._create_one_lake() 223 224 s = select([func.ST_Buffer(Lake.__table__.c.geom, 2)]) 225 r1 = session.execute(s).scalar() 226 assert isinstance(r1, WKBElement) 227 228 lake = session.query(Lake).get(lake_id) 229 r2 = session.execute(lake.geom.ST_Buffer(2)).scalar() 230 assert isinstance(r2, WKBElement) 231 232 r3 = session.query(Lake.geom.ST_Buffer(2)).scalar() 233 assert isinstance(r3, WKBElement) 234 235 assert r1.data == r2.data == r3.data 236 237 r4 = session.query(Lake).filter( 238 func.ST_Within(WKTElement('POINT(0 0)', srid=4326), 239 Lake.geom.ST_Buffer(2))).one() 240 assert isinstance(r4, Lake) 241 assert r4.id == lake_id 242 243 def test_ST_GeoJSON(self): 244 lake_id = self._create_one_lake() 245 246 def _test(r): 247 r = loads(r) 248 assert r["type"] == "LineString" 249 assert r["coordinates"] == [[0, 0], [1, 1]] 250 251 s = select([func.ST_AsGeoJSON(Lake.__table__.c.geom)]) 252 r = session.execute(s).scalar() 253 _test(r) 254 255 lake = session.query(Lake).get(lake_id) 256 r = session.execute(lake.geom.ST_AsGeoJSON()).scalar() 257 _test(r) 258 259 r = session.query(Lake.geom.ST_AsGeoJSON()).scalar() 260 _test(r) 261 262 @pytest.mark.skipif( 263 True, 264 reason='Spatialite does not support the feature version of AsGeoJson() yet') 265 def test_ST_GeoJSON_feature(self): 266 ss3 = select([Lake, bindparam('dummy_val', 10).label('dummy_attr')]).alias() 267 s3 = select([func.ST_AsGeoJSON(ss3, 'geom')]) 268 r3 = session.execute(s3).scalar() 269 assert loads(r3) == { 270 "type": "Feature", 271 "geometry": { 272 "type": "LineString", 273 "coordinates": [[0, 0], [1, 1]] 274 }, 275 "properties": {"dummy_attr": 10, "id": 1} 276 } 277 278 @pytest.mark.skipif( 279 parse_version(SA_VERSION) < parse_version("1.3.4"), 280 reason='Case-insensitivity is only available for sqlalchemy>=1.3.4') 281 def test_comparator_case_insensitivity(self): 282 lake_id = self._create_one_lake() 283 284 s = select([func.ST_Buffer(Lake.__table__.c.geom, 2)]) 285 r1 = session.execute(s).scalar() 286 assert isinstance(r1, WKBElement) 287 288 lake = session.query(Lake).get(lake_id) 289 290 r2 = session.execute(lake.geom.ST_Buffer(2)).scalar() 291 assert isinstance(r2, WKBElement) 292 293 r3 = session.execute(lake.geom.st_buffer(2)).scalar() 294 assert isinstance(r3, WKBElement) 295 296 r4 = session.execute(lake.geom.St_BuFfEr(2)).scalar() 297 assert isinstance(r4, WKBElement) 298 299 r5 = session.query(Lake.geom.ST_Buffer(2)).scalar() 300 assert isinstance(r5, WKBElement) 301 302 r6 = session.query(Lake.geom.st_buffer(2)).scalar() 303 assert isinstance(r6, WKBElement) 304 305 r7 = session.query(Lake.geom.St_BuFfEr(2)).scalar() 306 assert isinstance(r7, WKBElement) 307 308 assert ( 309 r1.data == r2.data == r3.data == r4.data == r5.data == r6.data 310 == r7.data) 311 312 313class TestNullable(): 314 315 class NotNullableLake(Base): 316 __tablename__ = 'NotNullablelake' 317 id = Column(Integer, primary_key=True) 318 geom = Column(Geometry(geometry_type='LINESTRING', srid=4326, 319 management=True, nullable=False)) 320 321 def __init__(self, geom): 322 self.geom = geom 323 324 def setup(self): 325 metadata.drop_all(checkfirst=True) 326 metadata.create_all() 327 self.conn = engine.connect() 328 329 def teardown(self): 330 self.conn.close() 331 metadata.drop_all() 332 333 def test_insert(self): 334 conn = self.conn 335 336 # Insert geometries 337 conn.execute(TestNullable.NotNullableLake.__table__.insert(), [ 338 {'geom': 'SRID=4326;LINESTRING(0 0,1 1)'}, 339 {'geom': WKTElement('LINESTRING(0 0,2 2)', srid=4326)}, 340 {'geom': from_shape(LineString([[0, 0], [3, 3]]), srid=4326)} 341 ]) 342 343 # Fail when trying to insert null geometry 344 with pytest.raises(IntegrityError): 345 conn.execute(TestNullable.NotNullableLake.__table__.insert(), [ 346 {'geom': None} 347 ]) 348