1# -*- coding: utf-8 -*- 2"""QGIS Unit tests for the postgres provider. 3 4Note: to prepare the DB, you need to run the sql files specified in 5tests/testdata/provider/testdata_pg.sh 6 7Read tests/README.md about writing/launching tests with PostgreSQL. 8 9Run with ctest -V -R PyQgsPostgresProvider 10 11.. note:: This program is free software; you can redistribute it and/or modify 12it under the terms of the GNU General Public License as published by 13the Free Software Foundation; either version 2 of the License, or 14(at your option) any later version. 15 16""" 17from builtins import next 18 19__author__ = 'Matthias Kuhn' 20__date__ = '2015-04-23' 21__copyright__ = 'Copyright 2015, The QGIS Project' 22 23import qgis # NOQA 24import psycopg2 25 26import os 27import time 28from datetime import datetime 29 30from qgis.core import ( 31 QgsVectorLayer, 32 QgsVectorLayerExporter, 33 QgsFeatureRequest, 34 QgsFeatureSource, 35 QgsFeature, 36 QgsFieldConstraints, 37 QgsDataProvider, 38 NULL, 39 QgsVectorLayerUtils, 40 QgsSettings, 41 QgsTransactionGroup, 42 QgsReadWriteContext, 43 QgsRectangle, 44 QgsDefaultValue, 45 QgsCoordinateReferenceSystem, 46 QgsProject, 47 QgsWkbTypes, 48 QgsGeometry, 49 QgsProviderRegistry, 50 QgsVectorDataProvider, 51 QgsDataSourceUri, 52 QgsProviderConnectionException, 53) 54from qgis.gui import QgsGui, QgsAttributeForm 55from qgis.PyQt.QtCore import QDate, QTime, QDateTime, QVariant, QDir, QObject, QByteArray, QTemporaryDir 56from qgis.PyQt.QtWidgets import QLabel 57from qgis.testing import start_app, unittest 58from qgis.PyQt.QtXml import QDomDocument 59from utilities import unitTestDataPath, compareWkt 60from providertestbase import ProviderTestCase 61 62QGISAPP = start_app() 63TEST_DATA_DIR = unitTestDataPath() 64 65 66class TestPyQgsPostgresProvider(unittest.TestCase, ProviderTestCase): 67 68 @classmethod 69 def setUpClass(cls): 70 """Run before all tests""" 71 cls.dbconn = 'service=qgis_test' 72 if 'QGIS_PGTEST_DB' in os.environ: 73 cls.dbconn = os.environ['QGIS_PGTEST_DB'] 74 # Create test layers 75 cls.vl = QgsVectorLayer( 76 cls.dbconn + 77 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=', 78 'test', 'postgres') 79 assert cls.vl.isValid() 80 cls.source = cls.vl.dataProvider() 81 cls.poly_vl = QgsVectorLayer( 82 cls.dbconn + 83 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 84 'test', 'postgres') 85 assert cls.poly_vl.isValid() 86 cls.poly_provider = cls.poly_vl.dataProvider() 87 QgsGui.editorWidgetRegistry().initEditors() 88 cls.con = psycopg2.connect(cls.dbconn) 89 90 @classmethod 91 def tearDownClass(cls): 92 """Run after all tests""" 93 94 def execSQLCommand(self, sql): 95 self.assertTrue(self.con) 96 cur = self.con.cursor() 97 self.assertTrue(cur) 98 cur.execute(sql) 99 cur.close() 100 self.con.commit() 101 102 # Create instances of this class for scoped backups, 103 # example: 104 # 105 # backup1 = self.scopedTableBackup('qgis_test', 'datatable1') 106 # backup2 = self.scopedTableBackup('qgis_test', 'datatable2') 107 # 108 def scopedTableBackup(self, schema, table): 109 110 class ScopedBackup(): 111 def __init__(self, tester, schema, table): 112 self.schema = schema 113 self.table = table 114 self.tester = tester 115 tester.execSQLCommand('DROP TABLE IF EXISTS {s}.{t}_edit CASCADE'.format(s=schema, t=table)) 116 tester.execSQLCommand('CREATE TABLE {s}.{t}_edit AS SELECT * FROM {s}.{t}'.format(s=schema, t=table)) 117 118 def __del__(self): 119 self.tester.execSQLCommand('TRUNCATE TABLE {s}.{t}'.format(s=self.schema, t=self.table)) 120 self.tester.execSQLCommand('INSERT INTO {s}.{t} SELECT * FROM {s}.{t}_edit'.format(s=self.schema, t=self.table)) 121 self.tester.execSQLCommand('DROP TABLE {s}.{t}_edit'.format(s=self.schema, t=self.table)) 122 123 return ScopedBackup(self, schema, table) 124 125 def getSource(self): 126 # create temporary table for edit tests 127 self.execSQLCommand( 128 'DROP TABLE IF EXISTS qgis_test."editData" CASCADE') 129 self.execSQLCommand( 130 'CREATE TABLE qgis_test."editData" ( pk SERIAL NOT NULL PRIMARY KEY, cnt integer, name text, name2 text, num_char text, dt timestamp without time zone, "date" date, "time" time without time zone, geom public.geometry(Point, 4326))') 131 self.execSQLCommand("INSERT INTO qgis_test.\"editData\" (pk, cnt, name, name2, num_char, dt, \"date\", \"time\", geom) VALUES " 132 "(5, -200, NULL, 'NuLl', '5', TIMESTAMP '2020-05-04 12:13:14', '2020-05-02', '12:13:01', '0101000020E61000001D5A643BDFC751C01F85EB51B88E5340')," 133 "(3, 300, 'Pear', 'PEaR', '3', NULL, NULL, NULL, NULL)," 134 "(1, 100, 'Orange', 'oranGe', '1', TIMESTAMP '2020-05-03 12:13:14', '2020-05-03', '12:13:14', '0101000020E61000006891ED7C3F9551C085EB51B81E955040')," 135 "(2, 200, 'Apple', 'Apple', '2', TIMESTAMP '2020-05-04 12:14:14', '2020-05-04', '12:14:14', '0101000020E6100000CDCCCCCCCC0C51C03333333333B35140')," 136 "(4, 400, 'Honey', 'Honey', '4', TIMESTAMP '2021-05-04 13:13:14', '2021-05-04', '13:13:14', '0101000020E610000014AE47E17A5450C03333333333935340')") 137 self.execSQLCommand("SELECT setval('qgis_test.\"editData_pk_seq\"', 5, true)") 138 vl = QgsVectorLayer( 139 self.dbconn + 140 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."editData" (geom) sql=', 141 'test', 'postgres') 142 return vl 143 144 def getEditableLayer(self): 145 return self.getSource() 146 147 def getEditableLayerWithCheckConstraint(self): 148 """Returns the layer for attribute change CHECK constraint violation""" 149 150 return QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' srid=4326 type=POINT table="public"."test_check_constraint" (geom) sql=', 'test_check_constraint', 'postgres') 151 152 def enableCompiler(self): 153 QgsSettings().setValue('/qgis/compileExpressions', True) 154 return True 155 156 def disableCompiler(self): 157 QgsSettings().setValue('/qgis/compileExpressions', False) 158 159 def uncompiledFilters(self): 160 return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')', 161 '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')', 162 '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')']) 163 164 def partiallyCompiledFilters(self): 165 return set([]) 166 167 def getGeneratedColumnsData(self): 168 """ 169 return a tuple with the generated column test layer and the expected generated value 170 """ 171 cur = self.con.cursor() 172 cur.execute("SHOW server_version_num") 173 pgversion = int(cur.fetchone()[0]) 174 175 # don't trigger this test when PostgreSQL versions earlier than 12. 176 if pgversion < 120000: 177 return (None, None) 178 else: 179 return (QgsVectorLayer(self.dbconn + ' sslmode=disable table="qgis_test"."generated_columns"', 'test', 'postgres'), 180 """('test:'::text || ((pk)::character varying)::text)""") 181 182 # HERE GO THE PROVIDER SPECIFIC TESTS 183 def testDefaultValue(self): 184 self.source.setProviderProperty( 185 QgsDataProvider.EvaluateDefaultValues, True) 186 self.assertIsInstance(self.source.defaultValue(0), int) 187 self.assertEqual(self.source.defaultValue(1), NULL) 188 self.assertEqual(self.source.defaultValue(2), 'qgis') 189 self.source.setProviderProperty( 190 QgsDataProvider.EvaluateDefaultValues, False) 191 192 def testDefaultValueClause(self): 193 self.source.setProviderProperty( 194 QgsDataProvider.EvaluateDefaultValues, False) 195 self.assertEqual(self.source.defaultValueClause( 196 0), 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)') 197 self.assertFalse(self.source.defaultValueClause(1)) 198 self.assertEqual(self.source.defaultValueClause(2), '\'qgis\'::text') 199 200 def testDateTimeTypes(self): 201 vl = QgsVectorLayer('%s table="qgis_test"."date_times" sql=' % ( 202 self.dbconn), "testdatetimes", "postgres") 203 self.assertTrue(vl.isValid()) 204 205 fields = vl.dataProvider().fields() 206 self.assertEqual(fields.at(fields.indexFromName( 207 'date_field')).type(), QVariant.Date) 208 self.assertEqual(fields.at(fields.indexFromName( 209 'time_field')).type(), QVariant.Time) 210 self.assertEqual(fields.at(fields.indexFromName( 211 'datetime_field')).type(), QVariant.DateTime) 212 213 f = next(vl.getFeatures(QgsFeatureRequest())) 214 215 date_idx = vl.fields().lookupField('date_field') 216 self.assertIsInstance(f.attributes()[date_idx], QDate) 217 self.assertEqual(f.attributes()[date_idx], QDate(2004, 3, 4)) 218 time_idx = vl.fields().lookupField('time_field') 219 self.assertIsInstance(f.attributes()[time_idx], QTime) 220 self.assertEqual(f.attributes()[time_idx], QTime(13, 41, 52)) 221 datetime_idx = vl.fields().lookupField('datetime_field') 222 self.assertIsInstance(f.attributes()[datetime_idx], QDateTime) 223 self.assertEqual(f.attributes()[datetime_idx], QDateTime( 224 QDate(2004, 3, 4), QTime(13, 41, 52))) 225 226 def testBooleanType(self): 227 vl = QgsVectorLayer('{} table="qgis_test"."boolean_table" sql='.format( 228 self.dbconn), "testbool", "postgres") 229 self.assertTrue(vl.isValid()) 230 231 fields = vl.dataProvider().fields() 232 self.assertEqual( 233 fields.at(fields.indexFromName('fld1')).type(), QVariant.Bool) 234 235 values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()} 236 expected = { 237 1: True, 238 2: False, 239 3: NULL 240 } 241 self.assertEqual(values, expected) 242 243 def testByteaType(self): 244 vl = QgsVectorLayer('{} table="qgis_test"."byte_a_table" sql='.format( 245 self.dbconn), "testbytea", "postgres") 246 self.assertTrue(vl.isValid()) 247 248 fields = vl.dataProvider().fields() 249 self.assertEqual(fields.at(fields.indexFromName( 250 'fld1')).type(), QVariant.ByteArray) 251 252 values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()} 253 expected = { 254 1: QByteArray(b'YmludmFsdWU='), 255 2: QByteArray() 256 } 257 self.assertEqual(values, expected) 258 259 # editing binary values 260 self.execSQLCommand( 261 'DROP TABLE IF EXISTS qgis_test."byte_a_table_edit" CASCADE') 262 self.execSQLCommand( 263 'CREATE TABLE qgis_test."byte_a_table_edit" ( pk SERIAL NOT NULL PRIMARY KEY, blobby bytea)') 264 self.execSQLCommand("INSERT INTO qgis_test.\"byte_a_table_edit\" (pk, blobby) VALUES " 265 "(1, encode('bbb', 'base64')::bytea)") 266 vl = QgsVectorLayer( 267 self.dbconn + ' sslmode=disable table="qgis_test"."byte_a_table_edit" sql=', 268 'test', 'postgres') 269 self.assertTrue(vl.isValid()) 270 values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()} 271 expected = { 272 1: QByteArray(b'YmJi') 273 } 274 self.assertEqual(values, expected) 275 276 # change attribute value 277 self.assertTrue(vl.dataProvider().changeAttributeValues( 278 {1: {1: QByteArray(b'bbbvx')}})) 279 values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()} 280 expected = { 281 1: QByteArray(b'bbbvx') 282 } 283 self.assertEqual(values, expected) 284 285 # add feature 286 f = QgsFeature() 287 f.setAttributes([2, QByteArray(b'cccc')]) 288 self.assertTrue(vl.dataProvider().addFeature(f)) 289 values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()} 290 expected = { 291 1: QByteArray(b'bbbvx'), 292 2: QByteArray(b'cccc') 293 } 294 self.assertEqual(values, expected) 295 296 # change feature 297 self.assertTrue(vl.dataProvider().changeFeatures( 298 {2: {1: QByteArray(b'dddd')}}, {})) 299 values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()} 300 expected = { 301 1: QByteArray(b'bbbvx'), 302 2: QByteArray(b'dddd') 303 } 304 self.assertEqual(values, expected) 305 306 def testCitextType(self): 307 vl = QgsVectorLayer('{} table="qgis_test"."citext_table" sql='.format( 308 self.dbconn), "testbytea", "postgres") 309 self.assertTrue(vl.isValid()) 310 311 fields = vl.dataProvider().fields() 312 self.assertEqual( 313 fields.at(fields.indexFromName('fld1')).type(), QVariant.String) 314 315 values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()} 316 expected = { 317 1: 'test val', 318 2: NULL 319 } 320 self.assertEqual(values, expected) 321 322 # editing citext values 323 self.execSQLCommand( 324 'DROP TABLE IF EXISTS qgis_test."citext_table_edit" CASCADE') 325 self.execSQLCommand( 326 'CREATE TABLE qgis_test."citext_table_edit" ( pk SERIAL NOT NULL PRIMARY KEY, txt citext)') 327 self.execSQLCommand("INSERT INTO qgis_test.\"citext_table_edit\" (pk, txt) VALUES " 328 "(1, 'text')") 329 vl = QgsVectorLayer( 330 self.dbconn + ' sslmode=disable table="qgis_test"."citext_table_edit" sql=', 331 'test', 'postgres') 332 self.assertTrue(vl.isValid()) 333 values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()} 334 expected = { 335 1: 'text' 336 } 337 self.assertEqual(values, expected) 338 339 # change attribute value 340 self.assertTrue( 341 vl.dataProvider().changeAttributeValues({1: {1: 'teeeext'}})) 342 values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()} 343 expected = { 344 1: 'teeeext' 345 } 346 self.assertEqual(values, expected) 347 348 # add feature 349 f = QgsFeature() 350 f.setAttributes([2, 'teeeeeeeeeext']) 351 self.assertTrue(vl.dataProvider().addFeature(f)) 352 values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()} 353 expected = { 354 1: 'teeeext', 355 2: 'teeeeeeeeeext' 356 } 357 self.assertEqual(values, expected) 358 359 # change feature 360 self.assertTrue(vl.dataProvider().changeFeatures( 361 {2: {1: 'teeeeeeeeeeeeeeeeeeeeeeext'}}, {})) 362 values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()} 363 expected = { 364 1: 'teeeext', 365 2: 'teeeeeeeeeeeeeeeeeeeeeeext' 366 } 367 self.assertEqual(values, expected) 368 369 def testQueryLayers(self): 370 def test_query(dbconn, query, key): 371 ql = QgsVectorLayer( 372 '%s srid=4326 table="%s" (geom) key=\'%s\' sql=' % ( 373 dbconn, query.replace('"', '\\"'), key), "testgeom", 374 "postgres") 375 self.assertTrue(ql.isValid(), '{} ({})'.format(query, key)) 376 377 test_query(self.dbconn, 378 '(SELECT NULL::integer "Id1", NULL::integer "Id2", NULL::geometry(Point, 4326) geom LIMIT 0)', 379 '"Id1","Id2"') 380 381 def testWkbTypes(self): 382 def test_table(dbconn, table_name, wkt): 383 vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (dbconn, table_name), "testgeom", 384 "postgres") 385 self.assertTrue(vl.isValid()) 386 for f in vl.getFeatures(): 387 self.assertEqual(f.geometry().asWkt(), wkt) 388 389 test_table(self.dbconn, 'p2d', 'Polygon ((0 0, 1 0, 1 1, 0 1, 0 0))') 390 test_table(self.dbconn, 'p3d', 391 'PolygonZ ((0 0 0, 1 0 0, 1 1 0, 0 1 0, 0 0 0))') 392 test_table(self.dbconn, 'triangle2d', 'Polygon ((0 0, 1 0, 1 1, 0 0))') 393 test_table(self.dbconn, 'triangle3d', 394 'PolygonZ ((0 0 0, 1 0 0, 1 1 0, 0 0 0))') 395 test_table(self.dbconn, 'tin2d', 396 'MultiPolygon (((0 0, 1 0, 1 1, 0 0)),((0 0, 0 1, 1 1, 0 0)))') 397 test_table(self.dbconn, 'tin3d', 398 'MultiPolygonZ (((0 0 0, 1 0 0, 1 1 0, 0 0 0)),((0 0 0, 0 1 0, 1 1 0, 0 0 0)))') 399 test_table(self.dbconn, 'ps2d', 400 'MultiPolygon (((0 0, 1 0, 1 1, 0 1, 0 0)))') 401 test_table(self.dbconn, 'ps3d', 402 'MultiPolygonZ (((0 0 0, 0 1 0, 1 1 0, 1 0 0, 0 0 0)),((0 0 1, 1 0 1, 1 1 1, 0 1 1, 0 0 1)),((0 0 0, 0 0 1, 0 1 1, 0 1 0, 0 0 0)),((0 1 0, 0 1 1, 1 1 1, 1 1 0, 0 1 0)),((1 1 0, 1 1 1, 1 0 1, 1 0 0, 1 1 0)),((1 0 0, 1 0 1, 0 0 1, 0 0 0, 1 0 0)))') 403 test_table(self.dbconn, 'mp3d', 404 'MultiPolygonZ (((0 0 0, 0 1 0, 1 1 0, 1 0 0, 0 0 0)),((0 0 1, 1 0 1, 1 1 1, 0 1 1, 0 0 1)),((0 0 0, 0 0 1, 0 1 1, 0 1 0, 0 0 0)),((0 1 0, 0 1 1, 1 1 1, 1 1 0, 0 1 0)),((1 1 0, 1 1 1, 1 0 1, 1 0 0, 1 1 0)),((1 0 0, 1 0 1, 0 0 1, 0 0 0, 1 0 0)))') 405 test_table(self.dbconn, 'pt2d', 'Point (0 0)') 406 test_table(self.dbconn, 'pt3d', 'PointZ (0 0 0)') 407 test_table(self.dbconn, 'ls2d', 'LineString (0 0, 1 1)') 408 test_table(self.dbconn, 'ls3d', 'LineStringZ (0 0 0, 1 1 1)') 409 test_table(self.dbconn, 'mpt2d', 'MultiPoint ((0 0),(1 1))') 410 test_table(self.dbconn, 'mpt3d', 'MultiPointZ ((0 0 0),(1 1 1))') 411 test_table(self.dbconn, 'mls2d', 412 'MultiLineString ((0 0, 1 1),(2 2, 3 3))') 413 test_table(self.dbconn, 'mls3d', 414 'MultiLineStringZ ((0 0 0, 1 1 1),(2 2 2, 3 3 3))') 415 416 test_table(self.dbconn, 'pt4d', 'PointZM (1 2 3 4)') 417 418 def testMetadata(self): 419 """ Test that metadata is correctly acquired from provider """ 420 metadata = self.vl.metadata() 421 self.assertEqual( 422 metadata.crs(), QgsCoordinateReferenceSystem.fromEpsgId(4326)) 423 self.assertEqual(metadata.type(), 'dataset') 424 self.assertEqual(metadata.abstract(), 'QGIS Test Table') 425 426 def testGetFeaturesUniqueId(self): 427 """ 428 Test tables with inheritance for unique ids 429 """ 430 431 def test_unique(features, num_features): 432 featureids = [] 433 for f in features: 434 self.assertFalse(f.id() in featureids) 435 featureids.append(f.id()) 436 self.assertEqual(len(features), num_features) 437 438 vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'someData'), "testgeom", 439 "postgres") 440 self.assertTrue(vl.isValid()) 441 # Test someData 442 test_unique([f for f in vl.getFeatures()], 5) 443 444 # Test base_table_bad: layer is invalid 445 vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'base_table_bad'), 446 "testgeom", "postgres") 447 self.assertFalse(vl.isValid()) 448 # Test base_table_bad with use estimated metadata: layer is valid because the unique test is skipped 449 vl = QgsVectorLayer( 450 '%s srid=4326 estimatedmetadata="true" table="qgis_test".%s (geom) sql=' % ( 451 self.dbconn, 'base_table_bad'), 452 "testgeom", "postgres") 453 self.assertTrue(vl.isValid()) 454 455 # Test base_table_good: layer is valid 456 vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'base_table_good'), 457 "testgeom", "postgres") 458 self.assertTrue(vl.isValid()) 459 test_unique([f for f in vl.getFeatures()], 4) 460 # Test base_table_good with use estimated metadata: layer is valid 461 vl = QgsVectorLayer( 462 '%s srid=4326 estimatedmetadata="true" table="qgis_test".%s (geom) sql=' % ( 463 self.dbconn, 'base_table_good'), 464 "testgeom", "postgres") 465 self.assertTrue(vl.isValid()) 466 test_unique([f for f in vl.getFeatures()], 4) 467 468 # See https://github.com/qgis/QGIS/issues/22258 469 # TODO: accept multi-featured layers, and an array of values/fids 470 def testSignedIdentifiers(self): 471 472 def test_layer(ql, att, val, fidval): 473 self.assertTrue(ql.isValid()) 474 features = ql.getFeatures() 475 att_idx = ql.fields().lookupField(att) 476 count = 0 477 for f in features: 478 count += 1 479 self.assertEqual(f.attributes()[att_idx], val) 480 self.assertEqual(f.id(), fidval) 481 self.assertEqual(count, 1) 482 483 def test(dbconn, query, att, val, fidval): 484 table = query.replace('"', '\\"') 485 uri = '%s table="%s" (g) key=\'%s\'' % (dbconn, table, att) 486 ql = QgsVectorLayer(uri, "t", "postgres") 487 test_layer(ql, att, val, fidval) 488 # now with estimated metadata 489 uri += ' estimatedmetadata="true"' 490 test_layer(ql, att, val, fidval) 491 492 # --- INT16 ---- 493 # zero 494 test(self.dbconn, '(SELECT 0::int2 i, NULL::geometry(Point) g)', 'i', 0, 0) 495 # low positive 496 test(self.dbconn, '(SELECT 1::int2 i, NULL::geometry(Point) g)', 'i', 1, 1) 497 # low negative 498 test(self.dbconn, '(SELECT -1::int2 i, NULL::geometry(Point) g)', 499 'i', -1, 4294967295) 500 # max positive signed 16bit integer 501 test(self.dbconn, '(SELECT 32767::int2 i, NULL::geometry(Point) g)', 502 'i', 32767, 32767) 503 # max negative signed 16bit integer 504 test(self.dbconn, '(SELECT (-32768)::int2 i, NULL::geometry(Point) g)', 505 'i', -32768, 4294934528) 506 507 # --- INT32 ---- 508 # zero 509 test(self.dbconn, '(SELECT 0::int4 i, NULL::geometry(Point) g)', 'i', 0, 0) 510 # low positive 511 test(self.dbconn, '(SELECT 2::int4 i, NULL::geometry(Point) g)', 'i', 2, 2) 512 # low negative 513 test(self.dbconn, '(SELECT -2::int4 i, NULL::geometry(Point) g)', 514 'i', -2, 4294967294) 515 # max positive signed 32bit integer 516 test(self.dbconn, '(SELECT 2147483647::int4 i, NULL::geometry(Point) g)', 517 'i', 2147483647, 2147483647) 518 # max negative signed 32bit integer 519 test(self.dbconn, '(SELECT (-2147483648)::int4 i, NULL::geometry(Point) g)', 520 'i', -2147483648, 2147483648) 521 522 # --- INT64 (FIDs are always 1 because assigned ex-novo) ---- 523 # zero 524 test(self.dbconn, '(SELECT 0::int8 i, NULL::geometry(Point) g)', 'i', 0, 1) 525 # low positive 526 test(self.dbconn, '(SELECT 3::int8 i, NULL::geometry(Point) g)', 'i', 3, 1) 527 # low negative 528 test(self.dbconn, '(SELECT -3::int8 i, NULL::geometry(Point) g)', 'i', -3, 1) 529 # max positive signed 64bit integer 530 test(self.dbconn, '(SELECT 9223372036854775807::int8 i, NULL::geometry(Point) g)', 531 'i', 9223372036854775807, 1) 532 # max negative signed 32bit integer 533 test(self.dbconn, '(SELECT (-9223372036854775808)::int8 i, NULL::geometry(Point) g)', 'i', -9223372036854775808, 534 1) 535 536 def testPktIntInsert(self): 537 vl = QgsVectorLayer('{} table="qgis_test"."{}" key="pk" sql='.format(self.dbconn, 'bikes_view'), "bikes_view", 538 "postgres") 539 self.assertTrue(vl.isValid()) 540 f = QgsFeature(vl.fields()) 541 f['pk'] = NULL 542 f['name'] = 'Cilo' 543 r, f = vl.dataProvider().addFeatures([f]) 544 self.assertTrue(r) 545 self.assertNotEqual(f[0]['pk'], NULL, f[0].attributes()) 546 vl.deleteFeatures([f[0].id()]) 547 548 def testGeneratedFields(self): 549 """Test if GENERATED geometry/geography columns are correctly handled by the provider.""" 550 cur = self.con.cursor() 551 cur.execute("SHOW server_version_num") 552 pgversion = int(cur.fetchone()[0]) 553 554 # GENERATED columns are unsupported by PostgreSQL versions earlier than 12. 555 if pgversion < 120000: 556 return 557 558 # Backup test table (will be edited) 559 self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.test_gen_col_edit CASCADE') 560 self.execSQLCommand('CREATE TABLE qgis_test.test_gen_col_edit AS SELECT id,name,geom FROM qgis_test.test_gen_col') 561 562 # Geometry columns 563 vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 564 self.assertTrue(vl.isValid()) 565 566 # writing geometry... 567 f = QgsFeature(vl.fields()) 568 569 ix_name = f.fieldNameIndex('name') 570 571 f.setGeometry(QgsGeometry.fromWkt('Polygon ((-67 -2, -67 0, -68 0, -70 -1, -67 -2))')) 572 f.setAttribute(ix_name, 'QGIS-3') 573 574 self.assertTrue(vl.startEditing()) 575 self.assertTrue(vl.addFeatures([f])) 576 self.assertTrue(vl.commitChanges()) 577 578 # reading back to see if we saved the centroid correctly. 579 vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 580 f2 = next(vl2.getFeatures(QgsFeatureRequest())) 581 generated_geometry = f2.geometry().asWkt() 582 expected_geometry = 'Point (-68.047619047619051 -0.90476190476190477)' 583 expected_area = 43069568296.34387 584 585 assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry) 586 self.assertAlmostEqual(f2['poly_area'], expected_area, places=4) 587 self.assertEqual(f2['name'], 'QGIS-3') 588 589 # Checking if we can correctly change values of an existing feature. 590 self.assertTrue(vl2.startEditing()) 591 ix2_name = f2.fieldNameIndex('name') 592 fid2 = f2.id() 593 vl2.changeAttributeValue(fid2, ix2_name, 'New') 594 self.assertTrue(vl2.commitChanges()) 595 596 # getting a brand new QgsVectorLayer 597 vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 598 self.assertTrue(vl.isValid()) 599 600 # checking if the name field was correctly updated 601 f = next(vl.getFeatures(QgsFeatureRequest())) 602 self.assertEqual(f['name'], 'New') 603 604 # Now, check if we can change the value of a GENERATED field (we shouldn't) 605 self.assertTrue(vl.startEditing()) 606 ix_area = f.fieldNameIndex('poly_area') 607 fid = f.id() 608 vl.changeAttributeValue(fid, ix_area, 42) 609 self.assertTrue(vl.commitChanges()) 610 611 # reading back 612 vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 613 f2 = next(vl2.getFeatures(QgsFeatureRequest())) 614 self.assertAlmostEqual(f2['poly_area'], expected_area, places=4) 615 616 # now, getting a brand new QgsVectorLayer to check if changes (UPDATE) in the geometry are reflected in the generated fields 617 vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 618 self.assertTrue(vl.isValid()) 619 f = next(vl.getFeatures(QgsFeatureRequest())) 620 vl.startEditing() 621 fid = f.id() 622 vl.changeGeometry(fid, QgsGeometry.fromWkt('Polygon ((-67 -2, -65 0, -68 0, -70 -1, -67 -2))')) 623 vl.commitChanges() 624 625 # reading back... 626 vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres") 627 f2 = next(vl2.getFeatures(QgsFeatureRequest())) 628 generated_geometry = f2.geometry().asWkt() 629 630 generated_geometry = f2.geometry().asWkt() 631 expected_geometry = 'Point (-67.42424242424242209 -0.81818181818181823)' 632 expected_area = 67718478405.28429 633 634 assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry) 635 self.assertAlmostEqual(f2['poly_area'], expected_area, places=4) 636 self.assertEqual(f2['name'], 'New') 637 638 # Geography columns 639 vl3 = QgsVectorLayer('{} table="qgis_test"."{}" (geog) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_geog_col"), "test_gen_geog_col", "postgres") 640 self.assertTrue(vl3.isValid()) 641 642 # writing geography... 643 f3 = QgsFeature(vl3.fields()) 644 f3.setGeometry(QgsGeometry.fromWkt('Polygon ((-67 -2, -67 0, -68 0, -70 -1, -67 -2))')) 645 self.assertTrue(vl3.startEditing()) 646 self.assertTrue(vl3.addFeatures([f3])) 647 self.assertTrue(vl3.commitChanges()) 648 649 # reading back geography and checking values 650 vl4 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_geog_col"), "test_gen_geog_col", "postgres") 651 f4 = next(vl4.getFeatures(QgsFeatureRequest())) 652 generated_geometry = f4.geometry().asWkt() 653 expected_geometry = 'Point (-68.0477406158202 -0.904960604589168)' 654 expected_area = 43088884296.69713 655 656 assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry) 657 self.assertEqual(f4['poly_area'], expected_area) 658 659 # Restore test table (after editing it) 660 self.execSQLCommand('TRUNCATE TABLE qgis_test.test_gen_col') 661 self.execSQLCommand('INSERT INTO qgis_test.test_gen_col(id,name,geom) SELECT id,name,geom FROM qgis_test.test_gen_col_edit') 662 self.execSQLCommand('DROP TABLE qgis_test.test_gen_col_edit') 663 664 def testNonPkBigintField(self): 665 """Test if we can correctly insert, read and change attributes(fields) of type bigint and which are not PKs.""" 666 vl = QgsVectorLayer( 667 '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format( 668 self.dbconn, 'bigint_pk'), 669 "bigint_pk", "postgres") 670 self.assertTrue(vl.isValid()) 671 flds = vl.fields() 672 673 # Backup test table (will be edited) 674 scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk') 675 676 # check if default values are correctly read back 677 f = next(vl.getFeatures(QgsFeatureRequest())) 678 bigint_with_default_idx = vl.fields().lookupField('bigint_attribute_def') 679 self.assertEqual(f.attributes()[bigint_with_default_idx], 42) 680 681 # check if NULL values are correctly read 682 bigint_def_null_idx = vl.fields().lookupField('bigint_attribute') 683 self.assertEqual(f.attributes()[bigint_def_null_idx], NULL) 684 685 # check if we can overwrite a default value 686 vl.startEditing() 687 vl.changeAttributeValue(f.id(), bigint_with_default_idx, 43) 688 689 pkidx = vl.fields().lookupField('pk') 690 editedid = f.attributes()[pkidx] 691 692 self.assertTrue(vl.commitChanges()) 693 vl2 = QgsVectorLayer( 694 '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format( 695 self.dbconn, 'bigint_pk'), 696 "bigint_pk", "postgres") 697 flds = vl2.fields() 698 self.assertTrue(vl2.isValid()) 699 f = next(vl2.getFeatures( 700 QgsFeatureRequest().setFilterExpression('pk = ' + str(editedid)))) 701 bigint_with_default_idx = vl2.fields().lookupField('bigint_attribute_def') 702 self.assertEqual(f.attributes()[bigint_with_default_idx], 43) 703 704 # check if we can insert a new value 705 dp = vl2.dataProvider() 706 dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1) 707 pkidx = vl2.fields().lookupField('pk') 708 vl2.startEditing() 709 f = QgsFeature(vl2.fields()) 710 f['pk'] = NULL 711 f['value'] = 'The answer.' 712 f['bigint_attribute'] = 84 713 f.setAttribute(pkidx, vl2.dataProvider().defaultValue(pkidx)) 714 f.setAttribute(bigint_with_default_idx, 715 vl2.dataProvider().defaultValue(bigint_with_default_idx)) 716 r, f = vl2.dataProvider().addFeatures([f]) 717 self.assertTrue(r) 718 vl2.commitChanges() 719 inserted_id = f[0]['pk'] 720 721 f = next(vl2.getFeatures( 722 QgsFeatureRequest().setFilterExpression('pk = ' + str(inserted_id)))) 723 724 self.assertEqual(f['bigint_attribute'], 84) 725 self.assertEqual(f['bigint_attribute_def'], 42) 726 727 def testPktUpdateBigintPk(self): 728 """Test if we can update objects with positive, zero and negative bigint PKs.""" 729 vl = QgsVectorLayer( 730 '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format( 731 self.dbconn, 'bigint_pk'), 732 "bigint_pk", "postgres") 733 flds = vl.fields() 734 735 # Backup test table (will be edited) 736 scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk') 737 738 self.assertTrue(vl.isValid()) 739 740 vl.startEditing() 741 742 statuses = [-1, -1, -1, -1] 743 # changing values... 744 for ft in vl.getFeatures(): 745 if ft['value'] == 'first value': 746 vl.changeAttributeValue( 747 ft.id(), flds.indexOf('value'), '1st value') 748 statuses[0] = 0 749 elif ft['value'] == 'second value': 750 vl.changeAttributeValue( 751 ft.id(), flds.indexOf('value'), '2nd value') 752 statuses[1] = 0 753 elif ft['value'] == 'zero value': 754 vl.changeAttributeValue( 755 ft.id(), flds.indexOf('value'), '0th value') 756 statuses[2] = 0 757 elif ft['value'] == 'negative value': 758 vl.changeAttributeValue( 759 ft.id(), flds.indexOf('value'), '-1th value') 760 statuses[3] = 0 761 self.assertTrue(vl.commitChanges()) 762 self.assertTrue(all(x == 0 for x in statuses)) 763 764 # now, let's see if the values were changed 765 vl2 = QgsVectorLayer( 766 '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format( 767 self.dbconn, 'bigint_pk'), 768 "bigint_pk", "postgres") 769 self.assertTrue(vl2.isValid()) 770 for ft in vl2.getFeatures(): 771 if ft['value'] == '1st value': 772 statuses[0] = 1 773 elif ft['value'] == '2nd value': 774 statuses[1] = 1 775 elif ft['value'] == '0th value': 776 statuses[2] = 1 777 elif ft['value'] == '-1th value': 778 statuses[3] = 1 779 self.assertTrue(all(x == 1 for x in statuses)) 780 781 def testPktUpdateBigintPkNonFirst(self): 782 """Test if we can update objects with positive, zero and negative bigint PKs in tables whose PK is not the first field""" 783 vl = QgsVectorLayer('{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(self.dbconn, 784 'bigint_non_first_pk'), 785 "bigint_non_first_pk", "postgres") 786 flds = vl.fields() 787 788 self.assertTrue(vl.isValid()) 789 790 vl.startEditing() 791 792 # Backup test table (will be edited) 793 scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_non_first_pk') 794 795 statuses = [-1, -1, -1, -1] 796 values = ['first value', 'second value', 'zero value', 'negative value'] 797 newvalues = ['1st value', '2nd value', '0th value', '-1th value'] 798 # changing values... 799 for ft in vl.getFeatures(): 800 if ft['value'] == values[0]: 801 vl.changeAttributeValue( 802 ft.id(), flds.indexOf('value'), newvalues[0]) 803 statuses[0] = 0 804 elif ft['value'] == values[1]: 805 vl.changeAttributeValue( 806 ft.id(), flds.indexOf('value'), newvalues[1]) 807 statuses[1] = 0 808 elif ft['value'] == values[2]: 809 vl.changeAttributeValue( 810 ft.id(), flds.indexOf('value'), newvalues[2]) 811 statuses[2] = 0 812 elif ft['value'] == values[3]: 813 vl.changeAttributeValue( 814 ft.id(), flds.indexOf('value'), newvalues[3]) 815 statuses[3] = 0 816 self.assertTrue(vl.commitChanges()) 817 for i in range(len(statuses)): 818 self.assertEqual(statuses[i], 0, 'start value "{}" not found'.format(values[i])) 819 820 # now, let's see if the values were changed 821 vl2 = QgsVectorLayer( 822 '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format( 823 self.dbconn, 'bigint_non_first_pk'), 824 "bigint_pk_nonfirst", "postgres") 825 self.assertTrue(vl2.isValid()) 826 for ft in vl2.getFeatures(): 827 if ft['value'] == newvalues[0]: 828 statuses[0] = 1 829 elif ft['value'] == newvalues[1]: 830 statuses[1] = 1 831 elif ft['value'] == newvalues[2]: 832 statuses[2] = 1 833 elif ft['value'] == newvalues[3]: 834 statuses[3] = 1 835 for i in range(len(statuses)): 836 self.assertEqual(statuses[i], 1, 'changed value "{}" not found'.format(newvalues[i])) 837 838 def testPktComposite(self): 839 """ 840 Check that tables with PKs composed of many fields of different types are correctly read and written to 841 """ 842 vl = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2"\' table="qgis_test"."tb_test_compound_pk" (geom)'.format(self.dbconn), "test_compound", "postgres") 843 self.assertTrue(vl.isValid()) 844 845 fields = vl.fields() 846 847 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 1 AND pk2 = 2'))) 848 # first of all: we must be able to fetch a valid feature 849 self.assertTrue(f.isValid()) 850 self.assertEqual(f['pk1'], 1) 851 self.assertEqual(f['pk2'], 2) 852 self.assertEqual(f['value'], 'test 2') 853 854 # Backup test table (will be edited) 855 scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_compound_pk') 856 857 # can we edit a field? 858 vl.startEditing() 859 vl.changeAttributeValue(f.id(), fields.indexOf('value'), 'Edited Test 2') 860 self.assertTrue(vl.commitChanges()) 861 862 # Did we get it right? Let's create a new QgsVectorLayer and try to read back our changes: 863 vl2 = QgsVectorLayer('{} sslmode=disable srid=4326 table="qgis_test"."tb_test_compound_pk" (geom) key=\'"pk1","pk2"\' '.format(self.dbconn), "test_compound2", "postgres") 864 self.assertTrue(vl2.isValid()) 865 f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 1 AND pk2 = 2'))) 866 self.assertTrue(f2.isValid()) 867 868 # Then, making sure we really did change our value. 869 self.assertEqual(f2['value'], 'Edited Test 2') 870 871 # How about inserting a new field? 872 f3 = QgsFeature(vl2.fields()) 873 f3['pk1'] = 4 874 f3['pk2'] = -9223372036854775800 875 f3['value'] = 'other test' 876 vl.startEditing() 877 res, f3 = vl.dataProvider().addFeatures([f3]) 878 self.assertTrue(res) 879 self.assertTrue(vl.commitChanges()) 880 881 # can we catch it on another layer? 882 f4 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk2 = -9223372036854775800'))) 883 884 self.assertTrue(f4.isValid()) 885 expected_attrs = [4, -9223372036854775800, 'other test'] 886 self.assertEqual(f4.attributes(), expected_attrs) 887 888 # Finally, let's delete one of the features. 889 f5 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 2 AND pk2 = 1'))) 890 vl2.startEditing() 891 vl2.deleteFeatures([f5.id()]) 892 self.assertTrue(vl2.commitChanges()) 893 894 # did we really delete? Let's try to get the deleted feature from the first layer. 895 f_iterator = vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 2 AND pk2 = 1')) 896 got_feature = True 897 898 try: 899 f6 = next(f_iterator) 900 got_feature = f6.isValid() 901 except StopIteration: 902 got_feature = False 903 904 self.assertFalse(got_feature) 905 906 def testPktCompositeFloat(self): 907 """ 908 Check that tables with PKs composed of many fields of different types are correctly read and written to 909 """ 910 vl = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2","pk3"\' table="qgis_test"."tb_test_composite_float_pk" (geom)'.format(self.dbconn), "test_composite_float", "postgres") 911 self.assertTrue(vl.isValid()) 912 913 fields = vl.fields() 914 915 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '3.14159274'"))) 916 # first of all: we must be able to fetch a valid feature 917 self.assertTrue(f.isValid()) 918 self.assertEqual(f['pk1'], 1) 919 self.assertEqual(f['pk2'], 2) 920 921 self.assertAlmostEqual(f['pk3'], 3.14159274) 922 self.assertEqual(f['value'], 'test 2') 923 924 # Backup test table (will be edited) 925 scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_composite_float_pk') 926 927 # can we edit a field? 928 vl.startEditing() 929 vl.changeAttributeValue(f.id(), fields.indexOf('value'), 'Edited Test 2') 930 self.assertTrue(vl.commitChanges()) 931 932 # Did we get it right? Let's create a new QgsVectorLayer and try to read back our changes: 933 vl2 = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2","pk3"\' table="qgis_test"."tb_test_composite_float_pk" (geom)'.format(self.dbconn), "test_composite_float2", "postgres") 934 self.assertTrue(vl2.isValid()) 935 f2 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '3.14159274'"))) 936 self.assertTrue(f2.isValid()) 937 938 # just making sure we have the correct feature 939 self.assertAlmostEqual(f2['pk3'], 3.14159274) 940 941 # Then, making sure we really did change our value. 942 self.assertEqual(f2['value'], 'Edited Test 2') 943 944 # How about inserting a new field? 945 f3 = QgsFeature(vl2.fields()) 946 f3['pk1'] = 4 947 f3['pk2'] = -9223372036854775800 948 f3['pk3'] = 7.29154 949 f3['value'] = 'other test' 950 vl.startEditing() 951 res, f3 = vl.dataProvider().addFeatures([f3]) 952 self.assertTrue(res) 953 self.assertTrue(vl.commitChanges()) 954 955 # can we catch it on another layer? 956 f4 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk2 = '-9223372036854775800'"))) 957 958 self.assertTrue(f4.isValid()) 959 expected_attrs = [4, -9223372036854775800, 7.29154, 'other test'] 960 gotten_attrs = [f4['pk1'], f4['pk2'], f4['pk3'], f4['value']] 961 self.assertEqual(gotten_attrs[0], expected_attrs[0]) 962 self.assertEqual(gotten_attrs[1], expected_attrs[1]) 963 self.assertAlmostEqual(gotten_attrs[2], expected_attrs[2], places=4) 964 self.assertEqual(gotten_attrs[3], expected_attrs[3]) 965 966 # Finally, let's delete one of the features. 967 f5 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '7.29154'"))) 968 vl2.startEditing() 969 vl2.deleteFeatures([f5.id()]) 970 self.assertTrue(vl2.commitChanges()) 971 972 # did we really delete? 973 f_iterator = vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '7.29154'")) 974 got_feature = True 975 976 try: 977 f6 = next(f_iterator) 978 got_feature = f6.isValid() 979 except StopIteration: 980 got_feature = False 981 982 self.assertFalse(got_feature) 983 984 def testPktFloatingPoint(self): 985 """ 986 Check if we can handle floating point/numeric primary keys correctly 987 """ 988 989 # 0. Backup test table (will be edited) 990 scopedBackup1 = self.scopedTableBackup('qgis_test', 'tb_test_float_pk') 991 scopedBackup2 = self.scopedTableBackup('qgis_test', 'tb_test_double_pk') 992 993 # 1. 32 bit float (PostgreSQL "REAL" type) 994 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_float_pk" (geom)', "test_float_pk", "postgres") 995 self.assertTrue(vl.isValid()) 996 997 # 1.1. Retrieving 998 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'"))) 999 self.assertTrue(f.isValid()) 1000 self.assertEqual(f['value'], 'first teste') 1001 # 1.2. Editing 1002 self.assertTrue(vl.startEditing()) 1003 vl.changeAttributeValue(f.id(), vl.fields().indexOf('value'), 'Changed first') 1004 self.assertTrue(vl.commitChanges()) 1005 # 1.2.1. Checking edit from another vector layer 1006 vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk1" table="qgis_test"."tb_test_float_pk" (geom)', "test_float_pk2", "postgres") 1007 self.assertTrue(vl2.isValid()) 1008 f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'"))) 1009 self.assertTrue(f2.isValid()) 1010 self.assertEqual(f2['value'], 'Changed first') 1011 # 1.3. Deleting 1012 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'"))) 1013 vl.startEditing() 1014 vl.deleteFeatures([f.id()]) 1015 self.assertTrue(vl.commitChanges()) 1016 # 1.3.1. Checking deletion 1017 f_iterator = vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'")) 1018 got_feature = True 1019 1020 try: 1021 f2 = next(f_iterator) 1022 got_feature = f2.isValid() 1023 except StopIteration: 1024 got_feature = False 1025 self.assertFalse(got_feature) 1026 # 1.4. Inserting new feature 1027 newpointwkt = 'Point(-47.751 -15.644)' 1028 f = QgsFeature(vl.fields()) 1029 f['pk'] = 0.22222222222222222222222 1030 f['value'] = 'newly inserted' 1031 f.setGeometry(QgsGeometry.fromWkt(newpointwkt)) 1032 vl.startEditing() 1033 res, f = vl.dataProvider().addFeatures([f]) 1034 self.assertTrue(res) 1035 self.assertTrue(vl.commitChanges()) 1036 # 1.4.1. Checking insertion 1037 f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '0.22222222222222222222222'"))) 1038 self.assertTrue(f2.isValid()) 1039 self.assertAlmostEqual(f2['pk'], 0.2222222222222222) 1040 self.assertEqual(f2['value'], 'newly inserted') 1041 assert compareWkt(f2.geometry().asWkt(), newpointwkt), "Geometry mismatch. Expected: {} Got: {} \n".format(f2.geometry().asWkt(), newpointwkt) 1042 # One more check: can we retrieve the same row with the value that we got from this layer? 1043 floatpk = f2['pk'] 1044 f3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '{}'".format(floatpk)))) 1045 self.assertTrue(f3.isValid()) 1046 self.assertEqual(f3['value'], 'newly inserted') 1047 self.assertEqual(f3['pk'], floatpk) 1048 1049 # 2. 64 bit float (PostgreSQL "DOUBLE PRECISION" type) 1050 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_double_pk" (geom)', "test_double_pk", "postgres") 1051 self.assertTrue(vl.isValid()) 1052 1053 # 2.1. Retrieving 1054 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'"))) 1055 self.assertTrue(f.isValid()) 1056 self.assertEqual(f['value'], 'first teste') 1057 # 2.2. Editing 1058 self.assertTrue(vl.startEditing()) 1059 vl.changeAttributeValue(f.id(), vl.fields().indexOf('value'), 'Changed first') 1060 self.assertTrue(vl.commitChanges()) 1061 # 2.2.1. Checking edit from another vector layer 1062 vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_double_pk" (geom)', "test_double_pk2", "postgres") 1063 self.assertTrue(vl2.isValid()) 1064 f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'"))) 1065 self.assertTrue(f2.isValid()) 1066 self.assertEqual(f2['value'], 'Changed first') 1067 # 2.3. Deleting 1068 f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'"))) 1069 vl.startEditing() 1070 vl.deleteFeatures([f.id()]) 1071 self.assertTrue(vl.commitChanges()) 1072 # 2.3.1. Checking deletion 1073 f_iterator = vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'")) 1074 got_feature = True 1075 1076 try: 1077 f2 = next(f_iterator) 1078 got_feature = f2.isValid() 1079 except StopIteration: 1080 got_feature = False 1081 self.assertFalse(got_feature) 1082 # 2.4. Inserting new feature 1083 newpointwkt = 'Point(-47.751 -15.644)' 1084 f = QgsFeature(vl.fields()) 1085 f['pk'] = 0.22222222222222222222222 1086 f['value'] = 'newly inserted' 1087 f.setGeometry(QgsGeometry.fromWkt(newpointwkt)) 1088 vl.startEditing() 1089 res, f = vl.dataProvider().addFeatures([f]) 1090 self.assertTrue(res) 1091 self.assertTrue(vl.commitChanges()) 1092 # 2.4.1. Checking insertion 1093 f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '0.22222222222222222222222'"))) 1094 self.assertTrue(f2.isValid()) 1095 self.assertAlmostEqual(f2['pk'], 0.2222222222222222, places=15) 1096 self.assertEqual(f2['value'], 'newly inserted') 1097 assert compareWkt(f2.geometry().asWkt(), newpointwkt), "Geometry mismatch. Expected: {} Got: {} \n".format(f2.geometry().asWkt(), newpointwkt) 1098 # One more check: can we retrieve the same row with the value that we got from this layer? 1099 doublepk = f2['pk'] 1100 f3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '{}'".format(doublepk)))) 1101 self.assertTrue(f3.isValid()) 1102 self.assertEqual(f3['value'], 'newly inserted') 1103 self.assertEqual(f3['pk'], doublepk) 1104 1105 # no NUMERIC/DECIMAL checks here. NUMERIC primary keys are unsupported. 1106 # TODO: implement NUMERIC primary keys/arbitrary precision arithmethics/fixed point math in QGIS. 1107 1108 def testPktMapInsert(self): 1109 vl = QgsVectorLayer('{} table="qgis_test"."{}" key="obj_id" sql='.format(self.dbconn, 'oid_serial_table'), 1110 "oid_serial", "postgres") 1111 self.assertTrue(vl.isValid()) 1112 f = QgsFeature(vl.fields()) 1113 f['obj_id'] = vl.dataProvider().defaultValueClause(0) 1114 f['name'] = 'Test' 1115 r, f = vl.dataProvider().addFeatures([f]) 1116 self.assertTrue(r) 1117 self.assertNotEqual(f[0]['obj_id'], NULL, f[0].attributes()) 1118 vl.deleteFeatures([f[0].id()]) 1119 1120 def testNull(self): 1121 """ 1122 Asserts that 0, '' and NULL are treated as different values on insert 1123 """ 1124 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' table="qgis_test"."constraints" sql=', 'test1', 1125 'postgres') 1126 self.assertTrue(vl.isValid()) 1127 QgsProject.instance().addMapLayer(vl) 1128 tg = QgsTransactionGroup() 1129 tg.addLayer(vl) 1130 vl.startEditing() 1131 1132 def onError(message): 1133 """We should not get here. If we do, fail and say why""" 1134 self.assertFalse(True, message) 1135 1136 vl.raiseError.connect(onError) 1137 1138 f = QgsFeature(vl.fields()) 1139 f['gid'] = 100 1140 f['val'] = 0 1141 f['name'] = '' 1142 self.assertTrue(vl.addFeature(f)) 1143 feature = next(vl.getFeatures('"gid" = 100')) 1144 self.assertEqual(f['val'], feature['val']) 1145 self.assertEqual(f['name'], feature['name']) 1146 1147 def testNestedInsert(self): 1148 tg = QgsTransactionGroup() 1149 l = self.getEditableLayer() 1150 tg.addLayer(l) 1151 l.startEditing() 1152 it = l.getFeatures() 1153 f = next(it) 1154 f['pk'] = NULL 1155 self.assertTrue(l.addFeature(f)) # Should not deadlock during an active iteration 1156 f = next(it) 1157 l.commitChanges() 1158 1159 def testTimeout(self): 1160 """ 1161 Asserts that we will not deadlock if more iterators are opened in parallel than 1162 available in the connection pool 1163 """ 1164 request = QgsFeatureRequest() 1165 request.setTimeout(1) 1166 1167 iterators = list() 1168 for i in range(100): 1169 iterators.append(self.vl.getFeatures(request)) 1170 1171 def testTransactionDirtyName(self): 1172 # create a vector ayer based on postgres 1173 vl = QgsVectorLayer( 1174 self.dbconn + 1175 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 1176 'test', 'postgres') 1177 self.assertTrue(vl.isValid()) 1178 1179 # prepare a project with transactions enabled 1180 p = QgsProject() 1181 p.setAutoTransaction(True) 1182 p.addMapLayers([vl]) 1183 vl.startEditing() 1184 1185 # update the data within the transaction 1186 tr = vl.dataProvider().transaction() 1187 sql = "update qgis_test.some_poly_data set pk=1 where pk=1" 1188 name = "My Awesome Transaction!" 1189 self.assertTrue(tr.executeSql(sql, True, name)[0]) 1190 1191 # test name 1192 self.assertEqual(vl.undoStack().command(0).text(), name) 1193 1194 # rollback 1195 vl.rollBack() 1196 1197 def testTransactionDirty(self): 1198 # create a vector layer based on postgres 1199 vl = QgsVectorLayer( 1200 self.dbconn + 1201 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 1202 'test', 'postgres') 1203 self.assertTrue(vl.isValid()) 1204 1205 # prepare a project with transactions enabled 1206 p = QgsProject() 1207 p.setAutoTransaction(True) 1208 p.addMapLayers([vl]) 1209 vl.startEditing() 1210 1211 # check that the feature used for testing is ok 1212 ft0 = vl.getFeatures('pk=1') 1213 f = QgsFeature() 1214 self.assertTrue(ft0.nextFeature(f)) 1215 1216 # update the data within the transaction 1217 tr = vl.dataProvider().transaction() 1218 sql = "update qgis_test.some_poly_data set pk=33 where pk=1" 1219 self.assertTrue(tr.executeSql(sql, True)[0]) 1220 1221 # check that the pk of the feature has been changed 1222 ft = vl.getFeatures('pk=1') 1223 self.assertFalse(ft.nextFeature(f)) 1224 1225 ft = vl.getFeatures('pk=33') 1226 self.assertTrue(ft.nextFeature(f)) 1227 1228 # underlying data has been modified but the layer is not tagged as 1229 # modified 1230 self.assertTrue(vl.isModified()) 1231 1232 # undo sql query 1233 vl.undoStack().undo() 1234 1235 # check that the original feature with pk is back 1236 ft0 = vl.getFeatures('pk=1') 1237 self.assertTrue(ft0.nextFeature(f)) 1238 1239 # redo 1240 vl.undoStack().redo() 1241 1242 # check that the pk of the feature has been changed 1243 ft1 = vl.getFeatures('pk=1') 1244 self.assertFalse(ft1.nextFeature(f)) 1245 1246 def testTransactionConstraints(self): 1247 # create a vector layer based on postgres 1248 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' table="qgis_test"."check_constraints" sql=', 1249 'test', 'postgres') 1250 self.assertTrue(vl.isValid()) 1251 1252 # prepare a project with transactions enabled 1253 p = QgsProject() 1254 p.setAutoTransaction(True) 1255 p.addMapLayers([vl]) 1256 1257 # get feature 1258 f = QgsFeature() 1259 self.assertTrue(vl.getFeatures('id=1').nextFeature(f)) 1260 self.assertEqual(f.attributes(), [1, 4, 3]) 1261 1262 # start edition 1263 vl.startEditing() 1264 1265 # update attribute form with a failing constraints 1266 # coming from the database if attributes are updated 1267 # one at a time. 1268 # Current feature: a = 4 / b = 3 1269 # Update feature: a = 1 / b = 0 1270 # If updated one at a time, '(a = 1) < (b = 3)' => FAIL! 1271 form = QgsAttributeForm(vl, f) 1272 for w in form.findChildren(QLabel): 1273 if w.buddy(): 1274 spinBox = w.buddy() 1275 if w.text() == 'a': 1276 spinBox.setValue(1) 1277 elif w.text() == 'b': 1278 spinBox.setValue(0) 1279 1280 # save 1281 form.save() 1282 1283 # check new values 1284 self.assertTrue(vl.getFeatures('id=1').nextFeature(f)) 1285 self.assertEqual(f.attributes(), [1, 1, 0]) 1286 1287 def testTransactionTuple(self): 1288 # create a vector layer based on postgres 1289 vl = QgsVectorLayer( 1290 self.dbconn + 1291 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 1292 'test', 'postgres') 1293 self.assertTrue(vl.isValid()) 1294 1295 # prepare a project with transactions enabled 1296 p = QgsProject() 1297 p.setAutoTransaction(True) 1298 p.addMapLayers([vl]) 1299 vl.startEditing() 1300 1301 # execute a query which returns a tuple 1302 tr = vl.dataProvider().transaction() 1303 sql = "select * from qgis_test.some_poly_data" 1304 self.assertTrue(tr.executeSql(sql, False)[0]) 1305 1306 # underlying data has not been modified 1307 self.assertFalse(vl.isModified()) 1308 1309 def testDomainTypes(self): 1310 """Test that domain types are correctly mapped""" 1311 1312 vl = QgsVectorLayer('%s table="qgis_test"."domains" sql=' % 1313 (self.dbconn), "domains", "postgres") 1314 self.assertTrue(vl.isValid()) 1315 1316 fields = vl.dataProvider().fields() 1317 1318 expected = {} 1319 expected['fld_var_char_domain'] = {'type': QVariant.String, 'typeName': 'qgis_test.var_char_domain', 1320 'length': -1} 1321 expected['fld_var_char_domain_6'] = {'type': QVariant.String, 'typeName': 'qgis_test.var_char_domain_6', 1322 'length': 6} 1323 expected['fld_character_domain'] = {'type': QVariant.String, 'typeName': 'qgis_test.character_domain', 1324 'length': 1} 1325 expected['fld_character_domain_6'] = {'type': QVariant.String, 'typeName': 'qgis_test.character_domain_6', 1326 'length': 6} 1327 expected['fld_char_domain'] = { 1328 'type': QVariant.String, 'typeName': 'qgis_test.char_domain', 'length': 1} 1329 expected['fld_char_domain_6'] = { 1330 'type': QVariant.String, 'typeName': 'qgis_test.char_domain_6', 'length': 6} 1331 expected['fld_text_domain'] = { 1332 'type': QVariant.String, 'typeName': 'qgis_test.text_domain', 'length': -1} 1333 expected['fld_numeric_domain'] = {'type': QVariant.Double, 'typeName': 'qgis_test.numeric_domain', 'length': 10, 1334 'precision': 4} 1335 1336 for f, e in list(expected.items()): 1337 self.assertEqual( 1338 fields.at(fields.indexFromName(f)).type(), e['type']) 1339 self.assertEqual(fields.at(fields.indexFromName(f) 1340 ).typeName(), e['typeName']) 1341 self.assertEqual( 1342 fields.at(fields.indexFromName(f)).length(), e['length']) 1343 if 'precision' in e: 1344 self.assertEqual( 1345 fields.at(fields.indexFromName(f)).precision(), e['precision']) 1346 1347 def testRenameAttributes(self): 1348 ''' Test renameAttributes() ''' 1349 vl = QgsVectorLayer('%s table="qgis_test"."rename_table" sql=' % ( 1350 self.dbconn), "renames", "postgres") 1351 provider = vl.dataProvider() 1352 provider.renameAttributes({1: 'field1', 2: 'field2'}) 1353 1354 # bad rename 1355 self.assertFalse(provider.renameAttributes({-1: 'not_a_field'})) 1356 self.assertFalse(provider.renameAttributes({100: 'not_a_field'})) 1357 # already exists 1358 self.assertFalse(provider.renameAttributes({1: 'field2'})) 1359 1360 # rename one field 1361 self.assertTrue(provider.renameAttributes({1: 'newname'})) 1362 self.assertEqual(provider.fields().at(1).name(), 'newname') 1363 vl.updateFields() 1364 fet = next(vl.getFeatures()) 1365 self.assertEqual(fet.fields()[1].name(), 'newname') 1366 1367 # rename two fields 1368 self.assertTrue(provider.renameAttributes( 1369 {1: 'newname2', 2: 'another'})) 1370 self.assertEqual(provider.fields().at(1).name(), 'newname2') 1371 self.assertEqual(provider.fields().at(2).name(), 'another') 1372 vl.updateFields() 1373 fet = next(vl.getFeatures()) 1374 self.assertEqual(fet.fields()[1].name(), 'newname2') 1375 self.assertEqual(fet.fields()[2].name(), 'another') 1376 1377 # close layer and reopen, then recheck to confirm that changes were saved to db 1378 del vl 1379 vl = None 1380 vl = QgsVectorLayer('%s table="qgis_test"."rename_table" sql=' % ( 1381 self.dbconn), "renames", "postgres") 1382 provider = vl.dataProvider() 1383 self.assertEqual(provider.fields().at(1).name(), 'newname2') 1384 self.assertEqual(provider.fields().at(2).name(), 'another') 1385 fet = next(vl.getFeatures()) 1386 self.assertEqual(fet.fields()[1].name(), 'newname2') 1387 self.assertEqual(fet.fields()[2].name(), 'another') 1388 1389 def testEditorWidgetTypes(self): 1390 """Test that editor widget types can be fetched from the qgis_editor_widget_styles table""" 1391 1392 vl = QgsVectorLayer('%s table="qgis_test"."widget_styles" sql=' % ( 1393 self.dbconn), "widget_styles", "postgres") 1394 self.assertTrue(vl.isValid()) 1395 fields = vl.dataProvider().fields() 1396 1397 setup1 = fields.field("fld1").editorWidgetSetup() 1398 self.assertFalse(setup1.isNull()) 1399 self.assertEqual(setup1.type(), "FooEdit") 1400 self.assertEqual(setup1.config(), {"param1": "value1", "param2": "2"}) 1401 1402 best1 = QgsGui.editorWidgetRegistry().findBest(vl, "fld1") 1403 self.assertEqual(best1.type(), "FooEdit") 1404 self.assertEqual(best1.config(), setup1.config()) 1405 1406 self.assertTrue(fields.field("fld2").editorWidgetSetup().isNull()) 1407 1408 best2 = QgsGui.editorWidgetRegistry().findBest(vl, "fld2") 1409 self.assertEqual(best2.type(), "TextEdit") 1410 1411 def testHstore(self): 1412 vl = QgsVectorLayer('%s table="qgis_test"."dict" sql=' % 1413 (self.dbconn), "testhstore", "postgres") 1414 self.assertTrue(vl.isValid()) 1415 1416 fields = vl.dataProvider().fields() 1417 self.assertEqual( 1418 fields.at(fields.indexFromName('value')).type(), QVariant.Map) 1419 1420 f = next(vl.getFeatures(QgsFeatureRequest())) 1421 1422 value_idx = vl.fields().lookupField('value') 1423 self.assertIsInstance(f.attributes()[value_idx], dict) 1424 self.assertEqual(f.attributes()[value_idx], {'a': 'b', '1': '2'}) 1425 1426 new_f = QgsFeature(vl.fields()) 1427 new_f['pk'] = NULL 1428 new_f['value'] = {'simple': '1', 'doubleQuote': '"y"', 1429 'quote': "'q'", 'backslash': '\\'} 1430 r, fs = vl.dataProvider().addFeatures([new_f]) 1431 self.assertTrue(r) 1432 new_pk = fs[0]['pk'] 1433 self.assertNotEqual(new_pk, NULL, fs[0].attributes()) 1434 1435 try: 1436 read_back = vl.getFeature(new_pk) 1437 self.assertEqual(read_back['pk'], new_pk) 1438 self.assertEqual(read_back['value'], new_f['value']) 1439 finally: 1440 self.assertTrue(vl.startEditing()) 1441 self.assertTrue(vl.deleteFeatures([new_pk])) 1442 self.assertTrue(vl.commitChanges()) 1443 1444 def testJson(self): 1445 vl = QgsVectorLayer('%s table="qgis_test"."json" sql=' % 1446 (self.dbconn), "testjson", "postgres") 1447 self.assertTrue(vl.isValid()) 1448 1449 # Backup test table (will be edited) 1450 tableBackup = self.scopedTableBackup('qgis_test', 'json') 1451 1452 attrs = ( 1453 123, 1454 1233.45, 1455 None, 1456 True, 1457 False, 1458 r"String literal with \"quotes\" 'and' other funny chars []{};#/èé*", 1459 [1, 2, 3.4, None], 1460 [True, False], 1461 {'a': 123, 'b': 123.34, 'c': 'a string', 'd': [ 1462 1, 2, 3], 'e': {'a': 123, 'b': 123.45}} 1463 ) 1464 attrs2 = ( 1465 246, 1466 2466.91, 1467 None, 1468 True, 1469 False, 1470 r"Yet another string literal with \"quotes\" 'and' other funny chars: π []{};#/èé*", 1471 [2, 4, 3.14159, None], 1472 [True, False], 1473 {'a': 246, 'b': 246.68, 'c': 'a rounded area: π × r²', 'd': [ 1474 1, 2, 3], 'e': {'a': 246, 'b': 246.91}} 1475 ) 1476 json_idx = vl.fields().lookupField('jvalue') 1477 jsonb_idx = vl.fields().lookupField('jbvalue') 1478 1479 for attr in attrs: 1480 # Add a new feature 1481 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1482 self.dbconn), "testjson", "postgres") 1483 self.assertTrue(vl2.startEditing()) 1484 f = QgsFeature(vl2.fields()) 1485 f.setAttributes([None, attr, attr]) 1486 self.assertTrue(vl2.addFeatures([f])) 1487 self.assertTrue(vl2.commitChanges(), attr) 1488 # Read back 1489 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1490 self.dbconn), "testjson", "postgres") 1491 fid = [f.id() for f in vl2.getFeatures()][-1] 1492 f = vl2.getFeature(fid) 1493 self.assertEqual(f.attributes(), [fid, attr, attr]) 1494 # Change attribute values 1495 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1496 self.dbconn), "testjson", "postgres") 1497 fid = [f.id() for f in vl2.getFeatures()][-1] 1498 self.assertTrue(vl2.startEditing()) 1499 self.assertTrue(vl2.changeAttributeValues( 1500 fid, {json_idx: attr, jsonb_idx: attr})) 1501 self.assertTrue(vl2.commitChanges()) 1502 # Read back 1503 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1504 self.dbconn), "testjson", "postgres") 1505 f = vl2.getFeature(fid) 1506 self.assertEqual(f.attributes(), [fid, attr, attr]) 1507 1508 # Let's check changeFeatures: 1509 for attr in attrs2: 1510 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1511 self.dbconn), "testjson", "postgres") 1512 fid = [f.id() for f in vl2.getFeatures()][-1] 1513 self.assertTrue(vl2.startEditing()) 1514 self.assertTrue(vl2.dataProvider().changeFeatures({fid: {json_idx: attr, jsonb_idx: attr}}, {})) 1515 self.assertTrue(vl2.commitChanges()) 1516 1517 # Read back again 1518 vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % ( 1519 self.dbconn), "testjson", "postgres") 1520 f = vl2.getFeature(fid) 1521 self.assertEqual(f.attributes(), [fid, attr, attr]) 1522 1523 def testStringArray(self): 1524 vl = QgsVectorLayer('%s table="qgis_test"."string_array" sql=' % ( 1525 self.dbconn), "teststringarray", "postgres") 1526 self.assertTrue(vl.isValid()) 1527 1528 fields = vl.dataProvider().fields() 1529 self.assertEqual(fields.at(fields.indexFromName( 1530 'value')).type(), QVariant.StringList) 1531 self.assertEqual(fields.at(fields.indexFromName( 1532 'value')).subType(), QVariant.String) 1533 1534 f = next(vl.getFeatures(QgsFeatureRequest())) 1535 1536 value_idx = vl.fields().lookupField('value') 1537 self.assertIsInstance(f.attributes()[value_idx], list) 1538 self.assertEqual(f.attributes()[value_idx], ['a', 'b', 'c']) 1539 1540 new_f = QgsFeature(vl.fields()) 1541 new_f['pk'] = NULL 1542 new_f['value'] = ['simple', '"doubleQuote"', "'quote'", 'back\\slash'] 1543 r, fs = vl.dataProvider().addFeatures([new_f]) 1544 self.assertTrue(r) 1545 new_pk = fs[0]['pk'] 1546 self.assertNotEqual(new_pk, NULL, fs[0].attributes()) 1547 1548 try: 1549 read_back = vl.getFeature(new_pk) 1550 self.assertEqual(read_back['pk'], new_pk) 1551 self.assertEqual(read_back['value'], new_f['value']) 1552 finally: 1553 self.assertTrue(vl.startEditing()) 1554 self.assertTrue(vl.deleteFeatures([new_pk])) 1555 self.assertTrue(vl.commitChanges()) 1556 1557 def testIntArray(self): 1558 vl = QgsVectorLayer('%s table="qgis_test"."int_array" sql=' % ( 1559 self.dbconn), "testintarray", "postgres") 1560 self.assertTrue(vl.isValid()) 1561 1562 fields = vl.dataProvider().fields() 1563 self.assertEqual( 1564 fields.at(fields.indexFromName('value')).type(), QVariant.List) 1565 self.assertEqual(fields.at(fields.indexFromName( 1566 'value')).subType(), QVariant.Int) 1567 1568 f = next(vl.getFeatures(QgsFeatureRequest())) 1569 1570 value_idx = vl.fields().lookupField('value') 1571 self.assertIsInstance(f.attributes()[value_idx], list) 1572 self.assertEqual(f.attributes()[value_idx], [1, 2, -5]) 1573 1574 def testDoubleArray(self): 1575 vl = QgsVectorLayer('%s table="qgis_test"."double_array" sql=' % ( 1576 self.dbconn), "testdoublearray", "postgres") 1577 self.assertTrue(vl.isValid()) 1578 1579 fields = vl.dataProvider().fields() 1580 self.assertEqual( 1581 fields.at(fields.indexFromName('value')).type(), QVariant.List) 1582 self.assertEqual(fields.at(fields.indexFromName( 1583 'value')).subType(), QVariant.Double) 1584 1585 f = next(vl.getFeatures(QgsFeatureRequest())) 1586 1587 value_idx = vl.fields().lookupField('value') 1588 self.assertIsInstance(f.attributes()[value_idx], list) 1589 self.assertEqual(f.attributes()[value_idx], [1.1, 2, -5.12345]) 1590 1591 def testNotNullConstraint(self): 1592 vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % ( 1593 self.dbconn), "constraints", "postgres") 1594 self.assertTrue(vl.isValid()) 1595 self.assertEqual(len(vl.fields()), 4) 1596 1597 # test some bad field indexes 1598 self.assertEqual(vl.dataProvider().fieldConstraints(-1), 1599 QgsFieldConstraints.Constraints()) 1600 self.assertEqual(vl.dataProvider().fieldConstraints( 1601 1001), QgsFieldConstraints.Constraints()) 1602 1603 self.assertTrue(vl.dataProvider().fieldConstraints(0) & 1604 QgsFieldConstraints.ConstraintNotNull) 1605 self.assertFalse(vl.dataProvider().fieldConstraints(1) 1606 & QgsFieldConstraints.ConstraintNotNull) 1607 self.assertTrue(vl.dataProvider().fieldConstraints(2) & 1608 QgsFieldConstraints.ConstraintNotNull) 1609 self.assertFalse(vl.dataProvider().fieldConstraints(3) 1610 & QgsFieldConstraints.ConstraintNotNull) 1611 1612 # test that constraints have been saved to fields correctly 1613 fields = vl.fields() 1614 self.assertTrue(fields.at(0).constraints().constraints() 1615 & QgsFieldConstraints.ConstraintNotNull) 1616 self.assertEqual(fields.at(0).constraints().constraintOrigin(QgsFieldConstraints.ConstraintNotNull), 1617 QgsFieldConstraints.ConstraintOriginProvider) 1618 self.assertFalse(fields.at(1).constraints().constraints() 1619 & QgsFieldConstraints.ConstraintNotNull) 1620 self.assertTrue(fields.at(2).constraints().constraints() 1621 & QgsFieldConstraints.ConstraintNotNull) 1622 self.assertEqual(fields.at(2).constraints().constraintOrigin(QgsFieldConstraints.ConstraintNotNull), 1623 QgsFieldConstraints.ConstraintOriginProvider) 1624 self.assertFalse(fields.at(3).constraints().constraints() 1625 & QgsFieldConstraints.ConstraintNotNull) 1626 1627 def testUniqueConstraint(self): 1628 vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % ( 1629 self.dbconn), "constraints", "postgres") 1630 self.assertTrue(vl.isValid()) 1631 self.assertEqual(len(vl.fields()), 4) 1632 1633 # test some bad field indexes 1634 self.assertEqual(vl.dataProvider().fieldConstraints(-1), 1635 QgsFieldConstraints.Constraints()) 1636 self.assertEqual(vl.dataProvider().fieldConstraints( 1637 1001), QgsFieldConstraints.Constraints()) 1638 1639 self.assertTrue(vl.dataProvider().fieldConstraints(0) 1640 & QgsFieldConstraints.ConstraintUnique) 1641 self.assertTrue(vl.dataProvider().fieldConstraints(1) 1642 & QgsFieldConstraints.ConstraintUnique) 1643 self.assertTrue(vl.dataProvider().fieldConstraints(2) 1644 & QgsFieldConstraints.ConstraintUnique) 1645 self.assertFalse(vl.dataProvider().fieldConstraints(3) 1646 & QgsFieldConstraints.ConstraintUnique) 1647 1648 # test that constraints have been saved to fields correctly 1649 fields = vl.fields() 1650 self.assertTrue(fields.at(0).constraints().constraints() 1651 & QgsFieldConstraints.ConstraintUnique) 1652 self.assertEqual(fields.at(0).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique), 1653 QgsFieldConstraints.ConstraintOriginProvider) 1654 self.assertTrue(fields.at(1).constraints().constraints() 1655 & QgsFieldConstraints.ConstraintUnique) 1656 self.assertEqual(fields.at(1).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique), 1657 QgsFieldConstraints.ConstraintOriginProvider) 1658 self.assertTrue(fields.at(2).constraints().constraints() 1659 & QgsFieldConstraints.ConstraintUnique) 1660 self.assertEqual(fields.at(2).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique), 1661 QgsFieldConstraints.ConstraintOriginProvider) 1662 self.assertFalse(fields.at(3).constraints().constraints() 1663 & QgsFieldConstraints.ConstraintUnique) 1664 1665 def testConstraintOverwrite(self): 1666 """ test that Postgres provider constraints can't be overwritten by vector layer method """ 1667 vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % ( 1668 self.dbconn), "constraints", "postgres") 1669 self.assertTrue(vl.isValid()) 1670 1671 self.assertTrue(vl.dataProvider().fieldConstraints(0) & 1672 QgsFieldConstraints.ConstraintNotNull) 1673 self.assertTrue(vl.fields().at(0).constraints().constraints() 1674 & QgsFieldConstraints.ConstraintNotNull) 1675 1676 # add a constraint at the layer level 1677 vl.setFieldConstraint(0, QgsFieldConstraints.ConstraintUnique) 1678 1679 # should be no change at provider level 1680 self.assertTrue(vl.dataProvider().fieldConstraints(0) & 1681 QgsFieldConstraints.ConstraintNotNull) 1682 1683 # but layer should still keep provider constraints... 1684 self.assertTrue(vl.fields().at(0).constraints().constraints() 1685 & QgsFieldConstraints.ConstraintNotNull) 1686 self.assertTrue(vl.fieldConstraints( 1687 0) & QgsFieldConstraints.ConstraintNotNull) 1688 # ...in addition to layer level constraint 1689 self.assertTrue(vl.fields().at(0).constraints( 1690 ).constraints() & QgsFieldConstraints.ConstraintUnique) 1691 self.assertTrue(vl.fieldConstraints( 1692 0) & QgsFieldConstraints.ConstraintUnique) 1693 1694 def testVectorLayerUtilsUniqueWithProviderDefault(self): 1695 vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' % 1696 (self.dbconn), "someData", "postgres") 1697 default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)' 1698 vl.dataProvider().setProviderProperty( 1699 QgsDataProvider.EvaluateDefaultValues, False) 1700 self.assertEqual( 1701 vl.dataProvider().defaultValueClause(0), default_clause) 1702 self.assertTrue(QgsVectorLayerUtils.valueExists(vl, 0, 4)) 1703 1704 vl.startEditing() 1705 f = QgsFeature(vl.fields()) 1706 f.setAttribute(0, default_clause) 1707 self.assertFalse( 1708 QgsVectorLayerUtils.valueExists(vl, 0, default_clause)) 1709 self.assertTrue(vl.addFeatures([f])) 1710 1711 # the default value clause should exist... 1712 self.assertTrue(QgsVectorLayerUtils.valueExists(vl, 0, default_clause)) 1713 # but it should not prevent the attribute being validated 1714 self.assertTrue(QgsVectorLayerUtils.validateAttribute(vl, f, 0)) 1715 vl.rollBack() 1716 1717 def testSkipConstraintCheck(self): 1718 vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' % 1719 (self.dbconn), "someData", "postgres") 1720 default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)' 1721 vl.dataProvider().setProviderProperty( 1722 QgsDataProvider.EvaluateDefaultValues, False) 1723 self.assertTrue(vl.dataProvider().skipConstraintCheck( 1724 0, QgsFieldConstraints.ConstraintUnique, default_clause)) 1725 self.assertFalse(vl.dataProvider().skipConstraintCheck( 1726 0, QgsFieldConstraints.ConstraintUnique, 59)) 1727 1728 def testVectorLayerUtilsCreateFeatureWithProviderDefault(self): 1729 vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' % 1730 (self.dbconn), "someData", "postgres") 1731 default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)' 1732 self.assertEqual( 1733 vl.dataProvider().defaultValueClause(0), default_clause) 1734 1735 # If an attribute map is provided, QgsVectorLayerUtils.createFeature must 1736 # respect it, otherwise default values from provider are checked. 1737 # User's choice will not be respected if the value violates unique constraints. 1738 # See https://github.com/qgis/QGIS/issues/27758 1739 f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5, 3: 'map'}) 1740 # changed so that createFeature respects user choice 1741 self.assertEqual(f.attributes(), [ 1742 default_clause, 5, "'qgis'::text", 'map', None, None, None, None, None]) 1743 1744 vl.setDefaultValueDefinition(3, QgsDefaultValue("'mappy'")) 1745 # test ignore vector layer default value expression overrides postgres provider default clause, 1746 # due to user's choice 1747 f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5, 3: 'map'}) 1748 self.assertEqual(f.attributes(), [ 1749 default_clause, 5, "'qgis'::text", 'map', None, None, None, None, None]) 1750 # Since user did not enter a default for field 3, test must return the default value chosen 1751 f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5}) 1752 self.assertEqual(f.attributes(), [ 1753 default_clause, 5, "'qgis'::text", 'mappy', None, None, None, None, None]) 1754 1755 # See https://github.com/qgis/QGIS/issues/23127 1756 def testNumericPrecision(self): 1757 uri = 'point?field=f1:int' 1758 uri += '&field=f2:double(6,4)' 1759 uri += '&field=f3:string(20)' 1760 lyr = QgsVectorLayer(uri, "x", "memory") 1761 self.assertTrue(lyr.isValid()) 1762 f = QgsFeature(lyr.fields()) 1763 f['f1'] = 1 1764 f['f2'] = 123.456 1765 f['f3'] = '12345678.90123456789' 1766 lyr.dataProvider().addFeatures([f]) 1767 uri = '%s table="qgis_test"."b18155" (g) key=\'f1\'' % (self.dbconn) 1768 self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.b18155') 1769 err = QgsVectorLayerExporter.exportLayer( 1770 lyr, uri, "postgres", lyr.crs()) 1771 self.assertEqual(err[0], QgsVectorLayerExporter.NoError, 1772 'unexpected import error {0}'.format(err)) 1773 lyr = QgsVectorLayer(uri, "y", "postgres") 1774 self.assertTrue(lyr.isValid()) 1775 f = next(lyr.getFeatures()) 1776 self.assertEqual(f['f1'], 1) 1777 self.assertEqual(f['f2'], 123.456) 1778 self.assertEqual(f['f3'], '12345678.90123456789') 1779 1780 # See https://github.com/qgis/QGIS/issues/23163 1781 def testImportKey(self): 1782 uri = 'point?field=f1:int' 1783 uri += '&field=F2:double(6,4)' 1784 uri += '&field=f3:string(20)' 1785 lyr = QgsVectorLayer(uri, "x", "memory") 1786 self.assertTrue(lyr.isValid()) 1787 1788 def testKey(lyr, key, kfnames): 1789 self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.import_test') 1790 uri = '%s table="qgis_test"."import_test" (g)' % self.dbconn 1791 if key is not None: 1792 uri += ' key=\'%s\'' % key 1793 err = QgsVectorLayerExporter.exportLayer( 1794 lyr, uri, "postgres", lyr.crs()) 1795 self.assertEqual(err[0], QgsVectorLayerExporter.NoError, 1796 'unexpected import error {0}'.format(err)) 1797 olyr = QgsVectorLayer(uri, "y", "postgres") 1798 self.assertTrue(olyr.isValid()) 1799 flds = lyr.fields() 1800 oflds = olyr.fields() 1801 if key is None: 1802 # if the pkey was not given, it will create a pkey 1803 self.assertEqual(oflds.size(), flds.size() + 1) 1804 self.assertEqual(oflds[0].name(), kfnames[0]) 1805 for i in range(flds.size()): 1806 self.assertEqual(oflds[i + 1].name(), flds[i].name()) 1807 else: 1808 # pkey was given, no extra field generated 1809 self.assertEqual(oflds.size(), flds.size()) 1810 for i in range(oflds.size()): 1811 self.assertEqual(oflds[i].name(), flds[i].name()) 1812 pks = olyr.primaryKeyAttributes() 1813 self.assertEqual(len(pks), len(kfnames)) 1814 for i in range(0, len(kfnames)): 1815 self.assertEqual(oflds[pks[i]].name(), kfnames[i]) 1816 1817 testKey(lyr, 'f1', ['f1']) 1818 testKey(lyr, '"f1"', ['f1']) 1819 testKey(lyr, '"f1","F2"', ['f1', 'F2']) 1820 testKey(lyr, '"f1","F2","f3"', ['f1', 'F2', 'f3']) 1821 testKey(lyr, None, ['id']) 1822 1823 # See https://github.com/qgis/QGIS/issues/25415 1824 def testImportWithoutSchema(self): 1825 1826 def _test(table, schema=None): 1827 self.execSQLCommand('DROP TABLE IF EXISTS %s CASCADE' % table) 1828 uri = 'point?field=f1:int' 1829 uri += '&field=F2:double(6,4)' 1830 uri += '&field=f3:string(20)' 1831 lyr = QgsVectorLayer(uri, "x", "memory") 1832 self.assertTrue(lyr.isValid()) 1833 1834 table = ("%s" % table) if schema is None else ( 1835 "\"%s\".\"%s\"" % (schema, table)) 1836 dest_uri = "%s sslmode=disable table=%s (geom) sql" % ( 1837 self.dbconn, table) 1838 QgsVectorLayerExporter.exportLayer( 1839 lyr, dest_uri, "postgres", lyr.crs()) 1840 olyr = QgsVectorLayer(dest_uri, "y", "postgres") 1841 self.assertTrue(olyr.isValid(), "Failed URI: %s" % dest_uri) 1842 1843 # Test bug 17518 1844 _test('b17518') 1845 1846 # Test fully qualified table (with schema) 1847 _test("b17518", "qgis_test") 1848 1849 # Test empty schema 1850 _test("b17518", "") 1851 1852 # Test public schema 1853 _test("b17518", "public") 1854 1855 # Test fully qualified table (with wrong schema) 1856 with self.assertRaises(AssertionError): 1857 _test("b17518", "qgis_test_wrong") 1858 1859 def testStyle(self): 1860 self.execSQLCommand('DROP TABLE IF EXISTS layer_styles CASCADE') 1861 1862 vl = self.getEditableLayer() 1863 self.assertTrue(vl.isValid()) 1864 self.assertTrue( 1865 vl.dataProvider().isSaveAndLoadStyleToDatabaseSupported()) 1866 self.assertTrue(vl.dataProvider().isDeleteStyleFromDatabaseSupported()) 1867 1868 # table layer_styles does not exit 1869 related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase() 1870 self.assertEqual(related_count, -1) 1871 self.assertEqual(idlist, []) 1872 self.assertEqual(namelist, []) 1873 self.assertEqual(desclist, []) 1874 self.assertNotEqual(errmsg, "") 1875 1876 qml, errmsg = vl.getStyleFromDatabase("1") 1877 self.assertEqual(qml, "") 1878 self.assertNotEqual(errmsg, "") 1879 1880 mFilePath = QDir.toNativeSeparators( 1881 '%s/symbol_layer/%s.qml' % (unitTestDataPath(), "singleSymbol")) 1882 status = vl.loadNamedStyle(mFilePath) 1883 self.assertTrue(status) 1884 1885 # The style is saved as non-default 1886 errorMsg = vl.saveStyleToDatabase( 1887 "by day", "faded greens and elegant patterns", False, "") 1888 self.assertEqual(errorMsg, "") 1889 1890 # the style id should be "1", not "by day" 1891 qml, errmsg = vl.getStyleFromDatabase("by day") 1892 self.assertEqual(qml, "") 1893 self.assertNotEqual(errmsg, "") 1894 1895 related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase() 1896 self.assertEqual(related_count, 1) 1897 self.assertEqual(errmsg, "") 1898 self.assertEqual(idlist, ["1"]) 1899 self.assertEqual(namelist, ["by day"]) 1900 self.assertEqual(desclist, ["faded greens and elegant patterns"]) 1901 1902 qml, errmsg = vl.getStyleFromDatabase("100") 1903 self.assertEqual(qml, "") 1904 self.assertNotEqual(errmsg, "") 1905 1906 qml, errmsg = vl.getStyleFromDatabase("1") 1907 self.assertTrue(qml.startswith('<!DOCTYPE qgis'), qml) 1908 self.assertEqual(errmsg, "") 1909 1910 res, errmsg = vl.deleteStyleFromDatabase("100") 1911 self.assertTrue(res) 1912 self.assertEqual(errmsg, "") 1913 1914 res, errmsg = vl.deleteStyleFromDatabase("1") 1915 self.assertTrue(res) 1916 self.assertEqual(errmsg, "") 1917 1918 # We save now the style again twice but with one as default 1919 errorMsg = vl.saveStyleToDatabase( 1920 "related style", "faded greens and elegant patterns", False, "") 1921 self.assertEqual(errorMsg, "") 1922 errorMsg = vl.saveStyleToDatabase( 1923 "default style", "faded greens and elegant patterns", True, "") 1924 self.assertEqual(errorMsg, "") 1925 1926 related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase() 1927 self.assertEqual(related_count, 2) 1928 self.assertEqual(errmsg, "") 1929 self.assertEqual(idlist, ["3", "2"]) # Ids must be reversed. 1930 self.assertEqual(namelist, ["default style", "related style"]) 1931 self.assertEqual(desclist, ["faded greens and elegant patterns"] * 2) 1932 1933 # We remove these 2 styles 1934 res, errmsg = vl.deleteStyleFromDatabase("2") 1935 self.assertTrue(res) 1936 self.assertEqual(errmsg, "") 1937 res, errmsg = vl.deleteStyleFromDatabase("3") 1938 self.assertTrue(res) 1939 self.assertEqual(errmsg, "") 1940 1941 # table layer_styles does exit, but is now empty 1942 related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase() 1943 self.assertEqual(related_count, 0) 1944 self.assertEqual(idlist, []) 1945 self.assertEqual(namelist, []) 1946 self.assertEqual(desclist, []) 1947 self.assertEqual(errmsg, "") 1948 1949 def testStyleWithGeometryType(self): 1950 """Test saving styles with the additional geometry type 1951 Layers are created from geometries_table 1952 """ 1953 1954 myconn = 'service=\'qgis_test\'' 1955 if 'QGIS_PGTEST_DB' in os.environ: 1956 myconn = os.environ['QGIS_PGTEST_DB'] 1957 1958 # point layer 1959 myPoint = QgsVectorLayer( 1960 myconn + 1961 ' sslmode=disable srid=4326 type=POINT table="qgis_test"."geometries_table" (geom) sql=', 'Point', 1962 'postgres') 1963 self.assertTrue(myPoint.isValid()) 1964 myPoint.saveStyleToDatabase('myPointStyle', '', False, '') 1965 1966 # polygon layer 1967 myPolygon = QgsVectorLayer( 1968 myconn + 1969 ' sslmode=disable srid=4326 type=POLYGON table="qgis_test"."geometries_table" (geom) sql=', 'Poly', 1970 'postgres') 1971 self.assertTrue(myPoint.isValid()) 1972 myPolygon.saveStyleToDatabase('myPolygonStyle', '', False, '') 1973 1974 # how many 1975 related_count, idlist, namelist, desclist, errmsg = myPolygon.listStylesInDatabase() 1976 self.assertEqual(len(idlist), 2) 1977 self.assertEqual(namelist, ['myPolygonStyle', 'myPointStyle']) 1978 1979 # raw psycopg2 query 1980 self.assertTrue(self.con) 1981 cur = self.con.cursor() 1982 self.assertTrue(cur) 1983 cur.execute("select stylename, type from layer_styles order by type") 1984 self.assertEqual(cur.fetchall(), [ 1985 ('myPointStyle', 'Point'), ('myPolygonStyle', 'Polygon')]) 1986 cur.close() 1987 1988 # delete them 1989 myPolygon.deleteStyleFromDatabase(idlist[1]) 1990 myPolygon.deleteStyleFromDatabase(idlist[0]) 1991 styles = myPolygon.listStylesInDatabase() 1992 ids = styles[1] 1993 self.assertEqual(len(ids), 0) 1994 1995 def testSaveStyleInvalidXML(self): 1996 1997 self.execSQLCommand('DROP TABLE IF EXISTS layer_styles CASCADE') 1998 1999 vl = self.getEditableLayer() 2000 self.assertTrue(vl.isValid()) 2001 self.assertTrue( 2002 vl.dataProvider().isSaveAndLoadStyleToDatabaseSupported()) 2003 self.assertTrue(vl.dataProvider().isDeleteStyleFromDatabaseSupported()) 2004 2005 mFilePath = QDir.toNativeSeparators( 2006 '%s/symbol_layer/%s.qml' % (unitTestDataPath(), "fontSymbol")) 2007 status = vl.loadNamedStyle(mFilePath) 2008 self.assertTrue(status) 2009 2010 errorMsg = vl.saveStyleToDatabase( 2011 "fontSymbol", "font with invalid utf8 char", False, "") 2012 self.assertEqual(errorMsg, "") 2013 2014 qml, errmsg = vl.getStyleFromDatabase("1") 2015 self.assertTrue('v="\u001E"' in qml) 2016 self.assertEqual(errmsg, "") 2017 2018 # Test loadStyle from metadata 2019 md = QgsProviderRegistry.instance().providerMetadata('postgres') 2020 qml = md.loadStyle(self.dbconn + " type=POINT table=\"qgis_test\".\"editData\" (geom)", 'fontSymbol') 2021 self.assertTrue(qml.startswith('<!DOCTYPE qgi'), qml) 2022 self.assertTrue('v="\u001E"' in qml) 2023 2024 def testHasMetadata(self): 2025 # views don't have metadata 2026 vl = QgsVectorLayer('{} table="qgis_test"."{}" key="pk" sql='.format(self.dbconn, 'bikes_view'), "bikes_view", 2027 "postgres") 2028 self.assertTrue(vl.isValid()) 2029 self.assertFalse(vl.dataProvider().hasMetadata()) 2030 2031 # ordinary tables have metadata 2032 vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' % 2033 (self.dbconn), "someData", "postgres") 2034 self.assertTrue(vl.isValid()) 2035 self.assertTrue(vl.dataProvider().hasMetadata()) 2036 2037 def testReadExtentOnView(self): 2038 # vector layer based on view 2039 vl0 = QgsVectorLayer( 2040 self.dbconn + 2041 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data_view" (geom) sql=', 2042 'test', 'postgres') 2043 self.assertTrue(vl0.isValid()) 2044 self.assertFalse(vl0.dataProvider().hasMetadata()) 2045 2046 # set a custom extent 2047 originalExtent = vl0.extent() 2048 2049 customExtent = QgsRectangle(-80, 80, -70, 90) 2050 vl0.setExtent(customExtent) 2051 2052 # write xml 2053 doc = QDomDocument("testdoc") 2054 elem = doc.createElement("maplayer") 2055 self.assertTrue(vl0.writeLayerXml(elem, doc, QgsReadWriteContext())) 2056 2057 # read xml with the custom extent. It should not be used by default 2058 vl1 = QgsVectorLayer() 2059 vl1.readLayerXml(elem, QgsReadWriteContext()) 2060 self.assertTrue(vl1.isValid()) 2061 2062 self.assertEqual(vl1.extent(), originalExtent) 2063 2064 # read xml with custom extent with readExtent option. Extent read from 2065 # xml document should be used because we have a view 2066 vl2 = QgsVectorLayer() 2067 vl2.setReadExtentFromXml(True) 2068 vl2.readLayerXml(elem, QgsReadWriteContext()) 2069 self.assertTrue(vl2.isValid()) 2070 2071 self.assertEqual(vl2.extent(), customExtent) 2072 2073 # but a force update on extent should allow retrieveing the data 2074 # provider extent 2075 vl2.updateExtents() 2076 vl2.readLayerXml(elem, QgsReadWriteContext()) 2077 self.assertEqual(vl2.extent(), customExtent) 2078 2079 vl2.updateExtents(force=True) 2080 vl2.readLayerXml(elem, QgsReadWriteContext()) 2081 self.assertEqual(vl2.extent(), originalExtent) 2082 2083 def testReadExtentOnTable(self): 2084 # vector layer based on a standard table 2085 vl0 = QgsVectorLayer( 2086 self.dbconn + 2087 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 2088 'test', 'postgres') 2089 self.assertTrue(vl0.isValid()) 2090 self.assertTrue(vl0.dataProvider().hasMetadata()) 2091 2092 # set a custom extent 2093 originalExtent = vl0.extent() 2094 2095 customExtent = QgsRectangle(-80, 80, -70, 90) 2096 vl0.setExtent(customExtent) 2097 2098 # write xml 2099 doc = QDomDocument("testdoc") 2100 elem = doc.createElement("maplayer") 2101 self.assertTrue(vl0.writeLayerXml(elem, doc, QgsReadWriteContext())) 2102 2103 # read xml with the custom extent. It should not be used by default 2104 vl1 = QgsVectorLayer() 2105 vl1.readLayerXml(elem, QgsReadWriteContext()) 2106 self.assertTrue(vl1.isValid()) 2107 2108 self.assertEqual(vl1.extent(), originalExtent) 2109 2110 # read xml with custom extent with readExtent option. Extent read from 2111 # xml document should be used even if we don't have a view or a 2112 # materialized view 2113 vl2 = QgsVectorLayer() 2114 vl2.setReadExtentFromXml(True) 2115 vl2.readLayerXml(elem, QgsReadWriteContext()) 2116 self.assertTrue(vl2.isValid()) 2117 2118 self.assertEqual(vl2.extent(), customExtent) 2119 2120 # but a force update on extent should allow retrieveing the data 2121 # provider extent 2122 vl2.updateExtents() 2123 vl2.readLayerXml(elem, QgsReadWriteContext()) 2124 self.assertEqual(vl2.extent(), customExtent) 2125 2126 vl2.updateExtents(force=True) 2127 vl2.readLayerXml(elem, QgsReadWriteContext()) 2128 self.assertEqual(vl2.extent(), originalExtent) 2129 2130 def testPreparedFailure(self): 2131 """Test error from issue GH #45100""" 2132 2133 layer = self.getEditableLayerWithCheckConstraint() 2134 self.assertTrue(layer.startEditing()) 2135 old_value = layer.getFeature(1).attribute('i_will_fail_on_no_name') 2136 layer.changeAttributeValue(1, 1, 'no name') 2137 layer.changeGeometry(1, QgsGeometry.fromWkt('point(7 45)')) 2138 self.assertFalse(layer.commitChanges()) 2139 layer.changeAttributeValue(1, 1, old_value) 2140 self.assertTrue(layer.commitChanges()) 2141 2142 def testDeterminePkey(self): 2143 """Test primary key auto-determination""" 2144 2145 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 type=POLYGON table="qgis_test"."authors" sql=', 2146 'test', 'postgres') 2147 self.assertTrue(vl.isValid()) 2148 self.assertTrue(vl.dataProvider().hasMetadata()) 2149 self.assertTrue("key='pk'" in vl.source()) 2150 2151 def testCheckPkUnicityOnView(self): 2152 # vector layer based on view 2153 2154 # This is valid 2155 vl0 = QgsVectorLayer( 2156 self.dbconn + 2157 ' checkPrimaryKeyUnicity=\'0\' sslmode=disable key=\'pk\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2158 'test', 'postgres') 2159 self.assertTrue(vl0.isValid()) 2160 2161 geom = vl0.getFeature(1).geometry().asWkt() 2162 2163 # This is NOT valid 2164 vl0 = QgsVectorLayer( 2165 self.dbconn + 2166 ' checkPrimaryKeyUnicity=\'1\' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2167 'test', 'postgres') 2168 self.assertFalse(vl0.isValid()) 2169 2170 # This is NOT valid because the default is to check unicity 2171 vl0 = QgsVectorLayer( 2172 self.dbconn + 2173 ' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2174 'test', 'postgres') 2175 self.assertFalse(vl0.isValid()) 2176 2177 # This is valid because the readExtentFromXml option is set 2178 # loadDefaultStyle, readExtentFromXml 2179 options = QgsVectorLayer.LayerOptions(True, True) 2180 vl0 = QgsVectorLayer( 2181 self.dbconn + 2182 ' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2183 'test', 'postgres', options) 2184 self.assertTrue(vl0.isValid()) 2185 2186 # Valid because a_unique_int is unique and default is to check unicity 2187 vl0 = QgsVectorLayer( 2188 self.dbconn + 2189 ' sslmode=disable key=\'a_unique_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2190 'test', 'postgres') 2191 self.assertEqual(vl0.getFeature(1).geometry().asWkt(), geom) 2192 2193 # Valid because a_unique_int is unique 2194 vl0 = QgsVectorLayer( 2195 self.dbconn + 2196 ' checkPrimaryKeyUnicity=\'1\' sslmode=disable key=\'a_unique_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=', 2197 'test', 'postgres') 2198 self.assertTrue(vl0.isValid()) 2199 self.assertEqual(vl0.getFeature(1).geometry().asWkt(), geom) 2200 2201 def testNotify(self): 2202 vl0 = QgsVectorLayer( 2203 self.dbconn + 2204 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=', 2205 'test', 'postgres') 2206 vl0.dataProvider().setListening(True) 2207 2208 class Notified(QObject): 2209 2210 def __init__(self): 2211 super(Notified, self).__init__() 2212 self.received = "" 2213 2214 def receive(self, msg): 2215 self.received = msg 2216 2217 notified = Notified() 2218 vl0.dataProvider().notify.connect(notified.receive) 2219 2220 vl0.dataProvider().setListening(True) 2221 2222 cur = self.con.cursor() 2223 ok = False 2224 start = time.time() 2225 while True: 2226 cur.execute("NOTIFY qgis, 'my message'") 2227 self.con.commit() 2228 QGISAPP.processEvents() 2229 if notified.received == "my message": 2230 ok = True 2231 break 2232 if (time.time() - start) > 5: # timeout 2233 break 2234 2235 vl0.dataProvider().notify.disconnect(notified.receive) 2236 vl0.dataProvider().setListening(False) 2237 2238 self.assertTrue(ok) 2239 2240 def testStyleDatabaseWithService(self): 2241 """Test saving style in DB using a service file. 2242 2243 To run this test, you first need to setup the test 2244 database with tests/testdata/provider/testdata_pg.sh 2245 """ 2246 myconn = 'service=\'qgis_test\'' 2247 if 'QGIS_PGTEST_DB' in os.environ: 2248 myconn = os.environ['QGIS_PGTEST_DB'] 2249 myvl = QgsVectorLayer( 2250 myconn + 2251 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=', 2252 'test', 'postgres') 2253 2254 styles = myvl.listStylesInDatabase() 2255 ids = styles[1] 2256 self.assertEqual(len(ids), 0) 2257 2258 myvl.saveStyleToDatabase('mystyle', '', False, '') 2259 styles = myvl.listStylesInDatabase() 2260 ids = styles[1] 2261 self.assertEqual(len(ids), 1) 2262 2263 myvl.deleteStyleFromDatabase(ids[0]) 2264 styles = myvl.listStylesInDatabase() 2265 ids = styles[1] 2266 self.assertEqual(len(ids), 0) 2267 2268 # try with a layer which doesn't have geom 2269 myvl = QgsVectorLayer( 2270 myconn + 2271 ' sslmode=disable key=\'pk\' table="qgis_test"."bikes" sql=', 'test', 'postgres') 2272 self.assertTrue(myvl.isValid()) 2273 2274 myvl.saveStyleToDatabase('mystyle_wo_geom', '', False, '') 2275 styles = myvl.listStylesInDatabase() 2276 ids = styles[1] 2277 self.assertEqual(len(ids), 1) 2278 2279 myvl.deleteStyleFromDatabase(ids[0]) 2280 styles = myvl.listStylesInDatabase() 2281 ids = styles[1] 2282 self.assertEqual(len(ids), 0) 2283 2284 def testCurveToMultipolygon(self): 2285 self.execSQLCommand( 2286 'CREATE TABLE IF NOT EXISTS multicurve(pk SERIAL NOT NULL PRIMARY KEY, geom public.geometry(MultiPolygon, 4326))') 2287 self.execSQLCommand('TRUNCATE multicurve') 2288 2289 vl = QgsVectorLayer( 2290 self.dbconn + 2291 ' sslmode=disable key=\'pk\' srid=4326 type=MULTIPOLYGON table="multicurve" (geom) sql=', 2292 'test', 'postgres') 2293 2294 f = QgsFeature(vl.fields()) 2295 f.setGeometry(QgsGeometry.fromWkt( 2296 'CurvePolygon(CircularString (20 30, 50 30, 50 90, 10 50, 20 30))')) 2297 self.assertTrue(vl.startEditing()) 2298 self.assertTrue(vl.addFeatures([f])) 2299 self.assertTrue(vl.commitChanges()) 2300 2301 f = next(vl.getFeatures(QgsFeatureRequest())) 2302 2303 g = f.geometry().constGet() 2304 self.assertTrue(g) 2305 self.assertEqual(g.wkbType(), QgsWkbTypes.MultiPolygon) 2306 self.assertEqual(g.childCount(), 1) 2307 self.assertTrue(g.childGeometry(0).vertexCount() > 3) 2308 2309 def testMassivePaste(self): 2310 """Speed test to compare createFeature and createFeatures, for regression #21303""" 2311 2312 import time 2313 2314 self.execSQLCommand( 2315 'CREATE TABLE IF NOT EXISTS massive_paste(pk SERIAL NOT NULL PRIMARY KEY, geom public.geometry(Polygon, 4326))') 2316 self.execSQLCommand('TRUNCATE massive_paste') 2317 2318 start_time = time.time() 2319 vl = QgsVectorLayer( 2320 self.dbconn + 2321 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="massive_paste" (geom) sql=', 2322 'test_massive_paste', 'postgres') 2323 self.assertTrue(vl.startEditing()) 2324 features = [] 2325 context = vl.createExpressionContext() 2326 for i in range(4000): 2327 features.append( 2328 QgsVectorLayerUtils.createFeature(vl, QgsGeometry.fromWkt('Polygon ((7 44, 8 45, 8 46, 7 46, 7 44))'), 2329 {0: i}, context)) 2330 self.assertTrue(vl.addFeatures(features)) 2331 self.assertTrue(vl.commitChanges()) 2332 self.assertEqual(vl.featureCount(), 4000) 2333 print("--- %s seconds ---" % (time.time() - start_time)) 2334 2335 self.execSQLCommand('TRUNCATE massive_paste') 2336 start_time = time.time() 2337 vl = QgsVectorLayer( 2338 self.dbconn + 2339 ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="massive_paste" (geom) sql=', 2340 'test_massive_paste', 'postgres') 2341 self.assertTrue(vl.startEditing()) 2342 features_data = [] 2343 context = vl.createExpressionContext() 2344 for i in range(4000): 2345 features_data.append( 2346 QgsVectorLayerUtils.QgsFeatureData(QgsGeometry.fromWkt('Polygon ((7 44, 8 45, 8 46, 7 46, 7 44))'), 2347 {0: i})) 2348 features = QgsVectorLayerUtils.createFeatures( 2349 vl, features_data, context) 2350 self.assertTrue(vl.addFeatures(features)) 2351 self.assertTrue(vl.commitChanges()) 2352 self.assertEqual(vl.featureCount(), 4000) 2353 print("--- %s seconds ---" % (time.time() - start_time)) 2354 2355 def testFilterOnCustomBbox(self): 2356 extent = QgsRectangle(-68, 70, -67, 80) 2357 request = QgsFeatureRequest().setFilterRect(extent) 2358 dbconn = 'service=qgis_test' 2359 uri = '%s srid=4326 key="pk" sslmode=disable table="qgis_test"."some_poly_data_shift_bbox" (geom)' % ( 2360 dbconn) 2361 2362 def _test(vl, ids): 2363 values = {feat['pk']: 'x' for feat in vl.getFeatures(request)} 2364 expected = {x: 'x' for x in ids} 2365 self.assertEqual(values, expected) 2366 2367 vl = QgsVectorLayer(uri, "testgeom", "postgres") 2368 self.assertTrue(vl.isValid()) 2369 _test(vl, [2, 3]) 2370 2371 vl = QgsVectorLayer(uri + ' bbox=shiftbox', "testgeom", "postgres") 2372 self.assertTrue(vl.isValid()) 2373 _test(vl, [1, 3]) 2374 2375 def testValidLayerDiscoverRelationsNone(self): 2376 """ 2377 Test checks that discover relation feature can be used on a layer that has no relation. 2378 """ 2379 vl = QgsVectorLayer( 2380 self.dbconn + 2381 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=', 2382 'test', 'postgres') 2383 self.assertTrue(vl.isValid()) 2384 self.assertEqual(vl.dataProvider().discoverRelations(vl, []), []) 2385 2386 def testInvalidLayerDiscoverRelations(self): 2387 """ 2388 Test that discover relations feature can be used on invalid layer. 2389 """ 2390 vl = QgsVectorLayer('{} table="qgis_test"."invalid_layer" sql='.format(self.dbconn), "invalid_layer", 2391 "postgres") 2392 self.assertFalse(vl.isValid()) 2393 self.assertEqual(vl.dataProvider().discoverRelations(vl, []), []) 2394 2395 def testValidLayerDiscoverRelations(self): 2396 """ 2397 Test implicit relations that can be discovers between tables, based on declared foreign keys. 2398 The test also checks that two distinct relations can be discovered when two foreign keys are declared (see #41138). 2399 """ 2400 vl = QgsVectorLayer( 2401 self.dbconn + 2402 ' sslmode=disable key=\'pk\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referencing_layer"', 2403 'referencing_layer', 'postgres') 2404 vls = [ 2405 QgsVectorLayer( 2406 self.dbconn + 2407 ' sslmode=disable key=\'pk_ref_1\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referenced_layer_1"', 2408 'referenced_layer_1', 'postgres'), 2409 QgsVectorLayer( 2410 self.dbconn + 2411 ' sslmode=disable key=\'pk_ref_2\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referenced_layer_2"', 2412 'referenced_layer_2', 'postgres'), 2413 vl 2414 ] 2415 2416 for lyr in vls: 2417 self.assertTrue(lyr.isValid()) 2418 QgsProject.instance().addMapLayer(lyr) 2419 relations = vl.dataProvider().discoverRelations(vl, vls) 2420 self.assertEqual(len(relations), 2) 2421 for i, r in enumerate(relations): 2422 self.assertEqual(r.referencedLayer(), vls[i]) 2423 2424 def testCheckTidPkOnViews(self): 2425 """Test vector layer based on a view with `ctid` as a key""" 2426 2427 # This is valid 2428 vl0 = QgsVectorLayer( 2429 self.dbconn + 2430 ' checkPrimaryKeyUnicity=\'0\' sslmode=disable key=\'ctid\' srid=4326 type=POINT table="qgis_test"."b31799_test_view_ctid" (geom) sql=', 2431 'test', 'postgres') 2432 self.assertTrue(vl0.isValid()) 2433 self.assertEqual(vl0.featureCount(), 10) 2434 for f in vl0.getFeatures(): 2435 self.assertNotEqual(f.attribute(0), NULL) 2436 2437 def testFeatureCountEstimatedOnTable(self): 2438 """ 2439 Test feature count on table when estimated data is enabled 2440 """ 2441 vl = QgsVectorLayer( 2442 self.dbconn + 2443 ' sslmode=disable key=\'pk\' estimatedmetadata=true srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=', 2444 'test', 'postgres') 2445 self.assertTrue(vl.isValid()) 2446 self.assertTrue(vl.featureCount() > 0) 2447 2448 def testFeatureCountEstimatedOnView(self): 2449 """ 2450 Test feature count on view when estimated data is enabled 2451 """ 2452 self.execSQLCommand('DROP VIEW IF EXISTS qgis_test.somedataview') 2453 self.execSQLCommand( 2454 'CREATE VIEW qgis_test.somedataview AS SELECT * FROM qgis_test."someData"') 2455 vl = QgsVectorLayer( 2456 self.dbconn + 2457 ' sslmode=disable key=\'pk\' estimatedmetadata=true srid=4326 type=POINT table="qgis_test"."somedataview" (geom) sql=', 2458 'test', 'postgres') 2459 self.assertTrue(vl.isValid()) 2460 self.assertTrue(vl.featureCount() > 0) 2461 2462 def testIdentityPk(self): 2463 """Test a table with identity pk, see GH #29560""" 2464 2465 vl = QgsVectorLayer( 2466 self.dbconn + 2467 ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."b29560"(geom) sql=', 2468 'testb29560', 'postgres') 2469 self.assertTrue(vl.isValid()) 2470 2471 feature = QgsFeature(vl.fields()) 2472 geom = QgsGeometry.fromWkt('POLYGON EMPTY') 2473 feature.setGeometry(geom) 2474 self.assertTrue(vl.dataProvider().addFeature(feature)) 2475 2476 self.assertEqual(vl.dataProvider().defaultValueClause(0), "nextval('b29560_gid_seq'::regclass)") 2477 2478 del (vl) 2479 2480 # Verify 2481 vl = QgsVectorLayer( 2482 self.dbconn + 2483 ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."b29560"(geom) sql=', 2484 'testb29560', 'postgres') 2485 self.assertTrue(vl.isValid()) 2486 feature = next(vl.getFeatures()) 2487 self.assertIsNotNone(feature.id()) 2488 2489 @unittest.skipIf(os.environ.get('QGIS_CONTINUOUS_INTEGRATION_RUN', 'true'), 'Test flaky') 2490 def testDefaultValuesAndClauses(self): 2491 """Test whether default values like CURRENT_TIMESTAMP or 2492 now() they are respected. See GH #33383""" 2493 2494 # Create the test table 2495 2496 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable table="public"."test_table_default_values" sql=', 'test', 2497 'postgres') 2498 self.assertTrue(vl.isValid()) 2499 2500 dp = vl.dataProvider() 2501 2502 # Clean the table 2503 dp.deleteFeatures(dp.allFeatureIds()) 2504 2505 # Save it for the test 2506 now = datetime.now() 2507 2508 # Test default values 2509 dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1) 2510 # FIXME: spatialite provider (and OGR) return a NULL here and the following passes 2511 # self.assertTrue(dp.defaultValue(0).isNull()) 2512 self.assertIsNotNone(dp.defaultValue(0)) 2513 self.assertIsNone(dp.defaultValue(1)) 2514 self.assertTrue(dp.defaultValue( 2515 2).startswith(now.strftime('%Y-%m-%d'))) 2516 self.assertTrue(dp.defaultValue( 2517 3).startswith(now.strftime('%Y-%m-%d'))) 2518 self.assertEqual(dp.defaultValue(4), 123) 2519 self.assertEqual(dp.defaultValue(5), 'My default') 2520 2521 # FIXME: the provider should return the clause definition 2522 # regardless of the EvaluateDefaultValues setting 2523 dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 0) 2524 self.assertEqual(dp.defaultValueClause( 2525 0), "nextval('test_table_default_values_id_seq'::regclass)") 2526 self.assertEqual(dp.defaultValueClause(1), '') 2527 self.assertEqual(dp.defaultValueClause(2), "now()") 2528 self.assertEqual(dp.defaultValueClause(3), "CURRENT_TIMESTAMP") 2529 self.assertEqual(dp.defaultValueClause(4), '123') 2530 self.assertEqual(dp.defaultValueClause(5), "'My default'::text") 2531 # FIXME: the test fails if the value is not reset to 1 2532 dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1) 2533 2534 feature = QgsFeature(vl.fields()) 2535 for idx in range(vl.fields().count()): 2536 default = vl.dataProvider().defaultValue(idx) 2537 if default is not None: 2538 feature.setAttribute(idx, default) 2539 else: 2540 feature.setAttribute(idx, 'A comment') 2541 2542 self.assertTrue(vl.dataProvider().addFeature(feature)) 2543 del (vl) 2544 2545 # Verify 2546 vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable table="public"."test_table_default_values" sql=', 'test', 2547 'postgres') 2548 self.assertTrue(vl2.isValid()) 2549 feature = next(vl2.getFeatures()) 2550 self.assertEqual(feature.attribute(1), 'A comment') 2551 self.assertTrue(feature.attribute( 2552 2).startswith(now.strftime('%Y-%m-%d'))) 2553 self.assertTrue(feature.attribute( 2554 3).startswith(now.strftime('%Y-%m-%d'))) 2555 self.assertEqual(feature.attribute(4), 123) 2556 self.assertEqual(feature.attribute(5), 'My default') 2557 2558 def testEncodeDecodeUri(self): 2559 """Test PG encode/decode URI""" 2560 2561 md = QgsProviderRegistry.instance().providerMetadata('postgres') 2562 self.assertEqual(md.decodeUri( 2563 'dbname=\'qgis_tests\' host=localhost port=5432 user=\'myuser\' sslmode=disable estimatedmetadata=true srid=3067 table="public"."basic_map_tiled" (rast)'), 2564 {'dbname': 'qgis_tests', 2565 'estimatedmetadata': True, 2566 'geometrycolumn': 'rast', 2567 'host': 'localhost', 2568 'port': '5432', 2569 'schema': 'public', 2570 'srid': '3067', 2571 'sslmode': 1, 2572 'table': 'basic_map_tiled', 2573 'username': 'myuser'}) 2574 2575 self.assertEqual(md.decodeUri( 2576 'dbname=\'qgis_tests\' host=localhost port=5432 user=\'myuser\' sslmode=disable key=\'id\' estimatedmetadata=true srid=3763 type=MultiPolygon checkPrimaryKeyUnicity=\'1\' table="public"."copas1" (geom)'), 2577 {'dbname': 'qgis_tests', 2578 'estimatedmetadata': True, 2579 'geometrycolumn': 'geom', 2580 'host': 'localhost', 2581 'key': 'id', 2582 'port': '5432', 2583 'schema': 'public', 2584 'srid': '3763', 2585 'sslmode': 1, 2586 'table': 'copas1', 2587 'type': 6, 2588 'username': 'myuser'}) 2589 2590 self.assertEqual(md.encodeUri({'dbname': 'qgis_tests', 2591 'estimatedmetadata': True, 2592 'geometrycolumn': 'geom', 2593 'host': 'localhost', 2594 'key': 'id', 2595 'port': '5432', 2596 'schema': 'public', 2597 'srid': '3763', 2598 'sslmode': 1, 2599 'table': 'copas1', 2600 'type': 6, 2601 'username': 'myuser'}), 2602 "dbname='qgis_tests' user='myuser' srid=3763 estimatedmetadata='true' host='localhost' key='id' port='5432' sslmode='disable' type='MultiPolygon' table=\"public\".\"copas1\" (geom)") 2603 2604 self.assertEqual(md.encodeUri({'dbname': 'qgis_tests', 2605 'estimatedmetadata': True, 2606 'geometrycolumn': 'rast', 2607 'host': 'localhost', 2608 'port': '5432', 2609 'schema': 'public', 2610 'srid': '3067', 2611 'sslmode': 1, 2612 'table': 'basic_map_tiled', 2613 'username': 'myuser'}), 2614 "dbname='qgis_tests' user='myuser' srid=3067 estimatedmetadata='true' host='localhost' port='5432' sslmode='disable' table=\"public\".\"basic_map_tiled\" (rast)") 2615 2616 def _round_trip(uri): 2617 decoded = md.decodeUri(uri) 2618 self.assertEqual(decoded, md.decodeUri(md.encodeUri(decoded))) 2619 2620 uri = self.dbconn + \ 2621 ' sslmode=disable key=\'gid\' srid=3035 table="public"."my_pg_vector" sql=' 2622 decoded = md.decodeUri(uri) 2623 self.assertEqual(decoded, { 2624 'key': 'gid', 2625 'schema': 'public', 2626 'service': 'qgis_test', 2627 'srid': '3035', 2628 'sslmode': QgsDataSourceUri.SslDisable, 2629 'table': 'my_pg_vector', 2630 }) 2631 2632 _round_trip(uri) 2633 2634 uri = self.dbconn + \ 2635 ' sslmode=prefer key=\'gid\' srid=3035 temporalFieldIndex=2 ' + \ 2636 'authcfg=afebeff username=\'my username\' password=\'my secret password=\' ' + \ 2637 'table="public"."my_pg_vector" (the_geom) sql="a_field" != 1223223' 2638 2639 _round_trip(uri) 2640 2641 decoded = md.decodeUri(uri) 2642 self.assertEqual(decoded, { 2643 'authcfg': 'afebeff', 2644 'geometrycolumn': 'the_geom', 2645 'key': 'gid', 2646 'password': 'my secret password=', 2647 'schema': 'public', 2648 'service': 'qgis_test', 2649 'sql': '"a_field" != 1223223', 2650 'srid': '3035', 2651 'sslmode': QgsDataSourceUri.SslPrefer, 2652 'table': 'my_pg_vector', 2653 'username': 'my username', 2654 }) 2655 2656 def testHasSpatialIndex(self): 2657 for layer_name in ('hspi_table', 'hspi_materialized_view'): 2658 columns = {'geom_without_index': QgsFeatureSource.SpatialIndexNotPresent, 'geom_with_index': QgsFeatureSource.SpatialIndexPresent} 2659 for (geometry_column, spatial_index) in columns.items(): 2660 conn = 'service=\'qgis_test\'' 2661 if 'QGIS_PGTEST_DB' in os.environ: 2662 conn = os.environ['QGIS_PGTEST_DB'] 2663 vl = QgsVectorLayer( 2664 conn + 2665 ' sslmode=disable key=\'id\' srid=4326 type=\'Polygon\' table="qgis_test"."{n}" ({c}) sql='.format(n=layer_name, c=geometry_column), 2666 'test', 'postgres') 2667 self.assertTrue(vl.isValid()) 2668 self.assertEqual(vl.hasSpatialIndex(), spatial_index) 2669 2670 def testBBoxFilterOnGeographyType(self): 2671 """Test bounding box filter on geography type""" 2672 2673 vl = QgsVectorLayer( 2674 self.dbconn + 2675 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."testgeog" (geog) sql=', 2676 'test', 'postgres') 2677 2678 self.assertTrue(vl.isValid()) 2679 2680 def _test(vl, extent, ids): 2681 request = QgsFeatureRequest().setFilterRect(extent) 2682 values = {feat['pk']: 'x' for feat in vl.getFeatures(request)} 2683 expected = {x: 'x' for x in ids} 2684 self.assertEqual(values, expected) 2685 2686 _test(vl, QgsRectangle(40 - 0.01, -0.01, 40 + 0.01, 0.01), [1]) 2687 _test(vl, QgsRectangle(40 - 5, -5, 40 + 5, 5), [1]) 2688 _test(vl, QgsRectangle(40 - 5, 0, 40 + 5, 5), [1]) 2689 _test(vl, QgsRectangle(40 - 10, -10, 40 + 10, 10), [1]) # no use of spatial index currently 2690 _test(vl, QgsRectangle(40 - 5, 0.01, 40 + 5, 5), []) # no match 2691 2692 _test(vl, QgsRectangle(40 - 0.01, 60 - 0.01, 40 + 0.01, 60 + 0.01), [2]) 2693 _test(vl, QgsRectangle(40 - 5, 60 - 5, 40 + 5, 60 + 5), [2]) 2694 _test(vl, QgsRectangle(40 - 5, 60 - 0.01, 40 + 5, 60 + 9.99), [2]) 2695 2696 _test(vl, QgsRectangle(40 - 0.01, -60 - 0.01, 40 + 0.01, -60 + 0.01), [3]) 2697 _test(vl, QgsRectangle(40 - 5, -60 - 5, 40 + 5, -60 + 5), [3]) 2698 _test(vl, QgsRectangle(40 - 5, -60 - 9.99, 40 + 5, -60 + 0.01), [3]) 2699 2700 _test(vl, QgsRectangle(-181, -90, 181, 90), [1, 2, 3]) # no use of spatial index currently 2701 2702 def testReadCustomSRID(self): 2703 """Test that we can correctly read the SRS from a custom SRID""" 2704 2705 md = QgsProviderRegistry.instance().providerMetadata("postgres") 2706 conn = md.createConnection(self.dbconn, {}) 2707 2708 # Cleanup if needed 2709 try: 2710 conn.dropVectorTable('qgis_test', 'test_custom_srid') 2711 except QgsProviderConnectionException: 2712 pass 2713 2714 conn.executeSql("DELETE FROM spatial_ref_sys WHERE srid = 543210 AND auth_name='FOO' AND auth_srid=32600;") 2715 conn.executeSql("""INSERT INTO spatial_ref_sys (srid, auth_name, auth_srid, srtext, proj4text) VALUES (543210, 'FOO', 32600, 'PROJCS["my_projection",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",0],PARAMETER["scale_factor",1],PARAMETER["false_easting",0],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]','+proj=tmerc +lat_0=0 +lon_0=0 +k=1 +x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs');""") 2716 2717 conn.executeSql(''' 2718 CREATE TABLE "qgis_test"."test_custom_srid" ( 2719 gid serial primary key, 2720 geom geometry(Point, 543210) 2721 );''') 2722 2723 layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\'table="qgis_test"."test_custom_srid" (geom) sql=', 'test', 'postgres') 2724 2725 conn.executeSql("DELETE FROM spatial_ref_sys WHERE srid = 543210 AND auth_name='FOO' AND auth_srid=32600;") 2726 2727 self.assertTrue(layer.isValid()) 2728 self.assertEqual(layer.crs().description(), 'my_projection') 2729 2730 def testSingleMultiColumnPkSmallData(self): 2731 """Test Single and Multi Column PK, `Small` Data""" 2732 from itertools import combinations 2733 2734 def test_for_pk_combinations(test_type_list, pk_column_name_list, fids_get_count): 2735 pk_column_name = ','.join(pk_column_name_list) 2736 set_new_pk = ''' 2737 ALTER TABLE qgis_test.multi_column_pk_small_data_table DROP CONSTRAINT multi_column_pk_small_data_pk; 2738 ALTER TABLE qgis_test.multi_column_pk_small_data_table 2739 ADD CONSTRAINT multi_column_pk_small_data_pk PRIMARY KEY ({});''' 2740 set_new_layer = ' sslmode=disable key=\'{}\' srid=3857 type=POLYGON table="qgis_test"."multi_column_pk_small_data_{}" (geom) sql=' 2741 error_string = 'from {} with PK - {} : expected {}, got {}' 2742 2743 if 'table' in test_type_list: 2744 self.execSQLCommand(set_new_pk.format(pk_column_name)) 2745 for test_type in test_type_list: 2746 vl = QgsVectorLayer(self.dbconn + set_new_layer.format(pk_column_name, test_type), 'test_multi_column_pk_small_data', 'postgres') 2747 fids = [f.id() for f in vl.getFeatures(QgsFeatureRequest().setLimit(fids_get_count))] 2748 fids2 = [f.id() for f in vl.getFeatures(fids)] 2749 self.assertEqual(fids_get_count, len(fids), "Get with limit " + 2750 error_string.format(test_type, pk_column_name, fids_get_count, len(fids))) 2751 self.assertEqual(fids_get_count, len(fids2), "Get by fids " + 2752 error_string.format(test_type, pk_column_name, fids_get_count, len(fids2))) 2753 2754 self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.multi_column_pk_small_data_table CASCADE;') 2755 self.execSQLCommand(''' 2756 CREATE TABLE qgis_test.multi_column_pk_small_data_table ( 2757 id_serial serial NOT NULL, 2758 id_uuid uuid NOT NULL, 2759 id_int int NOT NULL, 2760 id_bigint bigint NOT NULL, 2761 id_str character varying(20) NOT NULL, 2762 id_inet4 inet NOT NULL, 2763 id_inet6 inet NOT NULL, 2764 id_cidr4 cidr NOT NULL, 2765 id_cidr6 cidr NOT NULL, 2766 id_macaddr macaddr NOT NULL, 2767 id_macaddr8 macaddr8 NOT NULL, 2768 id_timestamp timestamp with time zone NOT NULL, 2769 id_half_null_uuid uuid, 2770 id_all_null_uuid uuid, 2771 geom geometry(Polygon,3857), 2772 CONSTRAINT multi_column_pk_small_data_pk 2773 PRIMARY KEY (id_serial, id_uuid, id_int, id_bigint, id_str) );''') 2774 self.execSQLCommand(''' 2775 CREATE OR REPLACE VIEW qgis_test.multi_column_pk_small_data_view AS 2776 SELECT * FROM qgis_test.multi_column_pk_small_data_table; 2777 DROP MATERIALIZED VIEW IF EXISTS qgis_test.multi_column_pk_small_data_mat_view; 2778 CREATE MATERIALIZED VIEW qgis_test.multi_column_pk_small_data_mat_view AS 2779 SELECT * FROM qgis_test.multi_column_pk_small_data_table;''') 2780 self.execSQLCommand(''' 2781 TRUNCATE qgis_test.multi_column_pk_small_data_table; 2782 INSERT INTO qgis_test.multi_column_pk_small_data_table( 2783 id_uuid, id_int, id_bigint, id_str, id_inet4, id_inet6, id_cidr4, id_cidr6, 2784 id_macaddr, id_macaddr8, id_timestamp, id_half_null_uuid, id_all_null_uuid, geom) 2785 SELECT 2786 ( (10000000)::text || (100000000000 + dy)::text || (100000000000 + dx)::text )::uuid, 2787 dx + 1000000 * dy, --id_int 2788 dx + 1000000 * dy, --id_bigint 2789 dx || E\' ot\\'her \' || dy, --id_str 2790 (\'192.168.0.1\'::inet + dx + 100 * dy )::inet, --id_inet4 2791 (\'2001:4f8:3:ba:2e0:81ff:fe22:d1f1\'::inet + dx + 100 * dy )::inet, --id_inet6 2792 (\'192.168.0.1\'::cidr + dx + 100 * dy )::cidr, --id_cidr4 2793 (\'2001:4f8:3:ba:2e0:81ff:fe22:d1f1\'::cidr + dx + 100 * dy )::cidr, --id_cidr6 2794 ((112233445566 + dx + 100 * dy)::text)::macaddr, --id_macaddr 2795 ((1122334455667788 + dx + 100 * dy)::text)::macaddr8, --id_macaddr8 2796 now() - ((dx||\' hour\')::text)::interval - ((dy||\' day\')::text)::interval, 2797 NULLIF( ( (10000000)::text || (100000000000 + dy)::text || (100000000000 + dx)::text )::uuid, 2798 ( (10000000)::text || (100000000000 + dy + dy%2)::text || (100000000000 + dx)::text )::uuid ), 2799 NULL, 2800 ST_Translate( 2801 ST_GeomFromText(\'POLYGON((3396900.0 6521800.0,3396900.0 6521870.0, 2802 3396830.0 6521870.0,3396830.0 6521800.0,3396900.0 6521800.0))\', 3857 ), 2803 100.0 * dx, 2804 100.0 * dy ) 2805 FROM generate_series(1,3) dx, generate_series(1,3) dy; 2806 REFRESH MATERIALIZED VIEW qgis_test.multi_column_pk_small_data_mat_view;''') 2807 2808 pk_col_list = ("id_serial", "id_uuid", "id_int", "id_bigint", "id_str", "id_inet4", "id_inet6", "id_cidr4", "id_cidr6", "id_macaddr", "id_macaddr8") 2809 test_type_list = ["table", "view", "mat_view"] 2810 for n in [1, 2, len(pk_col_list)]: 2811 pk_col_set_list = list(combinations(pk_col_list, n)) 2812 for pk_col_set in pk_col_set_list: 2813 test_for_pk_combinations(test_type_list, pk_col_set, 7) 2814 2815 for col_name in ["id_serial", "id_uuid", "id_int", "id_bigint", "id_str", "id_inet4"]: 2816 test_for_pk_combinations(["view", "mat_view"], ["id_half_null_uuid", col_name], 7) 2817 test_for_pk_combinations(["view", "mat_view"], ["id_all_null_uuid", col_name], 7) 2818 2819 def testChangeAttributeWithDefaultValue(self): 2820 """Test that we can change an attribute value with its default value""" 2821 2822 md = QgsProviderRegistry.instance().providerMetadata("postgres") 2823 conn = md.createConnection(self.dbconn, {}) 2824 2825 # Cleanup 2826 try: 2827 conn.dropVectorTable('qgis_test', 'test_change_att_w_default_value') 2828 except QgsProviderConnectionException: 2829 pass 2830 2831 conn.executeSql(''' 2832 CREATE TABLE "qgis_test"."test_change_att_w_default_value" ( 2833 id serial primary key, 2834 thetext1 character varying(8) DEFAULT NULL::character varying, 2835 thetext2 character varying(8) DEFAULT NULL, 2836 thetext3 character varying(8) DEFAULT 'blabla', 2837 thenumber integer DEFAULT 2+2 2838 );''') 2839 2840 conn.executeSql(''' 2841 INSERT INTO "qgis_test"."test_change_att_w_default_value" (thetext1,thetext2,thetext3,thenumber) VALUES ('test1','test2','test3',6);''') 2842 2843 layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\'table="qgis_test"."test_change_att_w_default_value" sql=', 'test', 'postgres') 2844 self.assertTrue(layer.isValid()) 2845 self.assertEqual(layer.featureCount(), 1) 2846 feat = next(layer.getFeatures()) 2847 self.assertTrue(feat["thetext1"], "test1") 2848 self.assertTrue(feat["thetext2"], "test2") 2849 self.assertTrue(feat["thetext3"], "test3") 2850 self.assertTrue(feat["thenumber"], 6) 2851 2852 self.assertEqual(layer.dataProvider().defaultValueClause(1), "NULL::character varying") 2853 self.assertEqual(layer.dataProvider().defaultValueClause(2), "NULL::character varying") 2854 self.assertEqual(layer.dataProvider().defaultValueClause(3), "'blabla'::character varying") 2855 self.assertEqual(layer.dataProvider().defaultValueClause(4), "(2 + 2)") 2856 2857 layer.startEditing() 2858 self.assertTrue(layer.changeAttributeValues(1, {1: "NULL::character varying", 2: "NULL::character varying", 2859 3: "'blabla'::character varying", 4: "(2 + 2)"})) 2860 self.assertTrue(layer.commitChanges()) 2861 2862 feat = next(layer.getFeatures()) 2863 2864 # print( "hein |{}| expected=|{}| bool={}".format( feat["thetext"], expected, (feat["thetext"] expected) ) ) 2865 self.assertEqual(feat["thetext1"], NULL) 2866 self.assertEqual(feat["thetext2"], NULL) 2867 self.assertEqual(feat["thetext3"], "blabla") 2868 self.assertEqual(feat["thenumber"], 4) 2869 2870 2871class TestPyQgsPostgresProviderCompoundKey(unittest.TestCase, ProviderTestCase): 2872 2873 @classmethod 2874 def setUpClass(cls): 2875 """Run before all tests""" 2876 cls.dbconn = 'service=qgis_test' 2877 if 'QGIS_PGTEST_DB' in os.environ: 2878 cls.dbconn = os.environ['QGIS_PGTEST_DB'] 2879 # Create test layers 2880 cls.vl = QgsVectorLayer( 2881 cls.dbconn + 2882 ' sslmode=disable key=\'"key1","key2"\' srid=4326 type=POINT table="qgis_test"."someDataCompound" (geom) sql=', 2883 'test', 'postgres') 2884 assert cls.vl.isValid() 2885 cls.source = cls.vl.dataProvider() 2886 2887 @classmethod 2888 def tearDownClass(cls): 2889 """Run after all tests""" 2890 2891 def enableCompiler(self): 2892 QgsSettings().setValue('/qgis/compileExpressions', True) 2893 return True 2894 2895 def disableCompiler(self): 2896 QgsSettings().setValue('/qgis/compileExpressions', False) 2897 2898 def uncompiledFilters(self): 2899 return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')', 2900 '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')', 2901 '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')']) 2902 2903 def partiallyCompiledFilters(self): 2904 return set([]) 2905 2906 def testConstraints(self): 2907 for key in ["key1", "key2"]: 2908 idx = self.vl.dataProvider().fieldNameIndex(key) 2909 self.assertTrue(idx >= 0) 2910 self.assertFalse(self.vl.dataProvider().fieldConstraints( 2911 idx) & QgsFieldConstraints.ConstraintUnique) 2912 2913 def testCompoundPkChanges(self): 2914 """ Check if fields with compound primary keys can be changed """ 2915 vl = self.vl 2916 2917 self.assertTrue(vl.isValid()) 2918 idx_key1 = vl.fields().lookupField('key1') 2919 idx_key2 = vl.fields().lookupField('key2') 2920 # the name "pk" for this datasource is misleading; 2921 # the primary key is actually composed by the fields key1 and key2 2922 idx_pk = vl.fields().lookupField('pk') 2923 idx_name = vl.fields().lookupField('name') 2924 idx_name2 = vl.fields().lookupField('name2') 2925 2926 geomwkt = 'Point(-47.945 -15.812)' 2927 2928 # start editing ordinary attribute. 2929 ft1 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("key1 = 2 AND key2 = 2"))) 2930 self.assertTrue(ft1.isValid()) 2931 original_geometry = ft1.geometry().asWkt() 2932 2933 vl.startEditing() 2934 self.assertTrue(vl.changeAttributeValues(ft1.id(), {idx_name: 'Rose'})) 2935 self.assertTrue(vl.commitChanges()) 2936 2937 # check change 2938 ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("key1 = 2 AND key2 = 2"))) 2939 self.assertEqual(ft2['name'], 'Rose') 2940 self.assertEqual(ft2['name2'], 'Apple') 2941 self.assertEqual(ft2['pk'], 2) 2942 2943 # now, start editing one of the PK field components 2944 vl.startEditing() 2945 2946 self.assertTrue(vl.dataProvider().changeFeatures({ft2.id(): {idx_key2: 42, idx_name: 'Orchid', idx_name2: 'Daisy'}}, {ft2.id(): QgsGeometry.fromWkt(geomwkt)})) 2947 self.assertTrue(vl.commitChanges()) 2948 2949 # let's check if we still have the same fid... 2950 ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterFid(ft2.id()))) 2951 self.assertEqual(ft2['key2'], 42) 2952 self.assertEqual(ft2['name'], 'Orchid') 2953 self.assertEqual(ft2['name2'], 'Daisy') 2954 self.assertTrue(vl.startEditing()) 2955 vl.changeAttributeValues(ft2.id(), {idx_key1: 21, idx_name2: 'Hibiscus'}) 2956 self.assertTrue(vl.commitChanges()) 2957 ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterFid(ft2.id()))) 2958 self.assertEqual(ft2['key1'], 21) 2959 self.assertEqual(ft2['name2'], 'Hibiscus') 2960 2961 # lets get a brand new feature and check how it went... 2962 ft3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 2'))) 2963 self.assertEqual(ft3['name'], 'Orchid') 2964 self.assertEqual(ft3['key1'], 21) 2965 self.assertEqual(ft3['key2'], 42) 2966 2967 assert compareWkt(ft3.geometry().asWkt(), geomwkt), "Geometry mismatch. Expected: {} Got: {}\n".format(ft3.geometry().asWkt(), geomwkt) 2968 2969 # Now, we leave the record as we found it, so further tests can proceed 2970 vl.startEditing() 2971 self.assertTrue(vl.dataProvider().changeFeatures({ft3.id(): {idx_key1: 2, idx_key2: 2, idx_pk: 2, idx_name: 'Apple', idx_name2: 'Apple'}}, {ft3.id(): QgsGeometry.fromWkt(original_geometry)})) 2972 self.assertTrue(vl.commitChanges()) 2973 2974 2975class TestPyQgsPostgresProviderBigintSinglePk(unittest.TestCase, ProviderTestCase): 2976 2977 @classmethod 2978 def setUpClass(cls): 2979 """Run before all tests""" 2980 cls.dbconn = 'service=qgis_test' 2981 if 'QGIS_PGTEST_DB' in os.environ: 2982 cls.dbconn = os.environ['QGIS_PGTEST_DB'] 2983 # Create test layers 2984 cls.vl = QgsVectorLayer( 2985 cls.dbconn + 2986 ' sslmode=disable key=\'"pk"\' srid=4326 type=POINT table="qgis_test"."provider_bigint_single_pk" (geom) sql=', 2987 'bigint_pk', 'postgres') 2988 assert cls.vl.isValid() 2989 cls.source = cls.vl.dataProvider() 2990 cls.con = psycopg2.connect(cls.dbconn) 2991 2992 @classmethod 2993 def tearDownClass(cls): 2994 """Run after all tests""" 2995 2996 def getSource(self): 2997 """ drops/recreates the test data anew, like TestPyQgsPostgresProvider::getSource above. """ 2998 self.execSqlCommand( 2999 "DROP TABLE IF EXISTS qgis_test.provider_edit_bigint_single_pk") 3000 self.execSqlCommand( 3001 "CREATE TABLE qgis_test.provider_edit_bigint_single_pk ( pk bigserial PRIMARY KEY, cnt integer, name text DEFAULT 'qgis', name2 text DEFAULT 'qgis', num_char text, dt timestamp without time zone, \"date\" date, \"time\" time without time zone, geom public.geometry(Point,4326), key1 integer, key2 integer)") 3002 self.execSqlCommand( 3003 "INSERT INTO qgis_test.provider_edit_bigint_single_pk ( key1, key2, pk, cnt, name, name2, num_char, dt, \"date\", \"time\", geom) VALUES" 3004 "(1, 1, 5, -200, NULL, 'NuLl', '5', TIMESTAMP '2020-05-04 12:13:14', '2020-05-02', '12:13:01', '0101000020E61000001D5A643BDFC751C01F85EB51B88E5340')," 3005 "(1, 2, 3, 300, 'Pear', 'PEaR', '3', NULL, NULL, NULL, NULL)," 3006 "(2, 1, 1, 100, 'Orange', 'oranGe', '1', TIMESTAMP '2020-05-03 12:13:14', '2020-05-03', '12:13:14', '0101000020E61000006891ED7C3F9551C085EB51B81E955040')," 3007 "(2, 2, 2, 200, 'Apple', 'Apple', '2', TIMESTAMP '2020-05-04 12:14:14', '2020-05-04', '12:14:14', '0101000020E6100000CDCCCCCCCC0C51C03333333333B35140')," 3008 "(2, 3, 4, 400, 'Honey', 'Honey', '4', TIMESTAMP '2021-05-04 13:13:14', '2021-05-04', '13:13:14', '0101000020E610000014AE47E17A5450C03333333333935340')") 3009 vl = QgsVectorLayer( 3010 self.dbconn + 3011 ' sslmode=disable key=\'"pk"\' srid=4326 type=POINT table="qgis_test"."provider_edit_bigint_single_pk" (geom) sql=', 3012 'edit_bigint_pk', 'postgres') 3013 return vl 3014 3015 def getEditableLayer(self): 3016 return self.getSource() 3017 3018 def execSqlCommand(self, sql): 3019 self.assertTrue(self.con) 3020 cur = self.con.cursor() 3021 self.assertTrue(cur) 3022 cur.execute(sql) 3023 cur.close() 3024 self.con.commit() 3025 3026 def enableCompiler(self): 3027 QgsSettings().setValue('/qgis/compileExpressions', True) 3028 return True 3029 3030 def disableCompiler(self): 3031 QgsSettings().setValue('/qgis/compileExpressions', False) 3032 3033 def uncompiledFilters(self): 3034 return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')', 3035 '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')', 3036 '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')']) 3037 3038 def partiallyCompiledFilters(self): 3039 return set([]) 3040 3041 def testConstraints(self): 3042 idx = self.vl.dataProvider().fieldNameIndex("pk") 3043 self.assertTrue(idx >= 0) 3044 3045 def testGetFeaturesFidTests(self): 3046 fids = [f.id() for f in self.source.getFeatures()] 3047 assert len(fids) == 5, 'Expected 5 features, got {} instead'.format( 3048 len(fids)) 3049 for id in fids: 3050 features = [f for f in self.source.getFeatures( 3051 QgsFeatureRequest().setFilterFid(id))] 3052 self.assertEqual(len(features), 1) 3053 feature = features[0] 3054 self.assertTrue(feature.isValid()) 3055 3056 result = [feature.id()] 3057 expected = [id] 3058 assert result == expected, 'Expected {} and got {} when testing for feature ID filter'.format(expected, 3059 result) 3060 3061 # test that results match QgsFeatureRequest.acceptFeature 3062 request = QgsFeatureRequest().setFilterFid(id) 3063 for f in self.source.getFeatures(): 3064 self.assertEqual(request.acceptFeature(f), f.id() == id) 3065 3066 # TODO: bad features are not tested because the PostgreSQL provider 3067 # doesn't mark explicitly set invalid features as such. 3068 3069 def testGetFeatures(self, source=None, extra_features=[], skip_features=[], changed_attributes={}, 3070 changed_geometries={}): 3071 """ Test that expected results are returned when fetching all features """ 3072 3073 # IMPORTANT - we do not use `for f in source.getFeatures()` as we are also 3074 # testing that existing attributes & geometry in f are overwritten correctly 3075 # (for f in ... uses a new QgsFeature for every iteration) 3076 3077 if not source: 3078 source = self.source 3079 3080 it = source.getFeatures() 3081 f = QgsFeature() 3082 attributes = {} 3083 geometries = {} 3084 while it.nextFeature(f): 3085 # expect feature to be valid 3086 self.assertTrue(f.isValid()) 3087 # some source test datasets will include additional attributes which we ignore, 3088 # so cherry pick desired attributes 3089 attrs = [f['pk'], f['cnt'], f['name'], f['name2'], f['num_char']] 3090 # DON'T force the num_char attribute to be text - some sources (e.g., delimited text) will 3091 # automatically detect that this attribute contains numbers and set it as a numeric 3092 # field 3093 # TODO: PostgreSQL 12 won't accept conversion from integer to text. 3094 # attrs[4] = str(attrs[4]) 3095 attributes[f['pk']] = attrs 3096 geometries[f['pk']] = f.hasGeometry() and f.geometry().asWkt() 3097 3098 expected_attributes = {5: [5, -200, NULL, 'NuLl', '5'], 3099 3: [3, 300, 'Pear', 'PEaR', '3'], 3100 1: [1, 100, 'Orange', 'oranGe', '1'], 3101 2: [2, 200, 'Apple', 'Apple', '2'], 3102 4: [4, 400, 'Honey', 'Honey', '4']} 3103 3104 expected_geometries = {1: 'Point (-70.332 66.33)', 3105 2: 'Point (-68.2 70.8)', 3106 3: None, 3107 4: 'Point(-65.32 78.3)', 3108 5: 'Point(-71.123 78.23)'} 3109 for f in extra_features: 3110 expected_attributes[f[0]] = f.attributes() 3111 if f.hasGeometry(): 3112 expected_geometries[f[0]] = f.geometry().asWkt() 3113 else: 3114 expected_geometries[f[0]] = None 3115 3116 for i in skip_features: 3117 del expected_attributes[i] 3118 del expected_geometries[i] 3119 for i, a in changed_attributes.items(): 3120 for attr_idx, v in a.items(): 3121 expected_attributes[i][attr_idx] = v 3122 for i, g, in changed_geometries.items(): 3123 if g: 3124 expected_geometries[i] = g.asWkt() 3125 else: 3126 expected_geometries[i] = None 3127 3128 self.assertEqual(attributes, expected_attributes, 'Expected {}, got {}'.format( 3129 expected_attributes, attributes)) 3130 3131 self.assertEqual(len(expected_geometries), len(geometries)) 3132 3133 for pk, geom in list(expected_geometries.items()): 3134 if geom: 3135 assert compareWkt(geom, geometries[pk]), "Geometry {} mismatch Expected:\n{}\nGot:\n{}\n".format(pk, 3136 geom, 3137 geometries[ 3138 pk]) 3139 else: 3140 self.assertFalse( 3141 geometries[pk], 'Expected null geometry for {}'.format(pk)) 3142 3143 def testAddFeatureExtraAttributes(self): 3144 if not getattr(self, 'getEditableLayer', None): 3145 return 3146 3147 l = self.getEditableLayer() 3148 self.assertTrue(l.isValid()) 3149 3150 if not l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures: 3151 return 3152 3153 # test that adding features with too many attributes drops these attributes 3154 # we be more tricky and also add a valid feature to stress test the provider 3155 f1 = QgsFeature() 3156 f1.setAttributes([6, -220, 'qgis', 'String', '15']) 3157 f2 = QgsFeature() 3158 f2.setAttributes([7, -230, 'qgis', 'String', '15', 15, 16, 17]) 3159 3160 result, added = l.dataProvider().addFeatures([f1, f2]) 3161 self.assertTrue(result, 3162 'Provider returned False to addFeatures with extra attributes. Providers should accept these features but truncate the extra attributes.') 3163 3164 # make sure feature was added correctly 3165 added = [f for f in l.dataProvider().getFeatures() if f['pk'] == 7][0] 3166 # TODO: The PostgreSQL provider doesn't truncate extra attributes! 3167 self.assertNotEqual(added.attributes(), [7, -230, 'qgis', 'String', '15'], 3168 'The PostgreSQL provider doesn\'t truncate extra attributes.') 3169 3170 def testAddFeatureMissingAttributes(self): 3171 if not getattr(self, 'getEditableLayer', None): 3172 return 3173 3174 l = self.getEditableLayer() 3175 self.assertTrue(l.isValid()) 3176 3177 if not l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures: 3178 return 3179 3180 # test that adding features with missing attributes pads out these 3181 # attributes with NULL values to the correct length. 3182 # changed from ProviderTestBase.testAddFeatureMissingAttributes: we use 3183 # 'qgis' instead of NULL below. 3184 # TODO: Only unmentioned attributes get filled with the DEFAULT table 3185 # value; if the attribute is present, the saved value will be NULL if 3186 # that is indicated, or the value mentioned by the user; there is no 3187 # implicit conversion of PyQGIS::NULL to PostgreSQL DEFAULT. 3188 f1 = QgsFeature() 3189 f1.setAttributes([6, -220, 'qgis', 'String']) 3190 f2 = QgsFeature() 3191 f2.setAttributes([7, 330]) 3192 3193 result, added = l.dataProvider().addFeatures([f1, f2]) 3194 self.assertTrue(result, 3195 'Provider returned False to addFeatures with missing attributes. Providers should accept these features but add NULL attributes to the end of the existing attributes to the required field length.') 3196 f1.setId(added[0].id()) 3197 f2.setId(added[1].id()) 3198 3199 # check result - feature attributes MUST be padded out to required number of fields 3200 f1.setAttributes([6, -220, 'qgis', 'String', NULL]) 3201 f2.setAttributes([7, 330, 'qgis', 'qgis', NULL]) 3202 self.testGetFeatures(l.dataProvider(), [f1, f2]) 3203 3204 def testAddFeature(self): 3205 if not getattr(self, 'getEditableLayer', None): 3206 return 3207 3208 l = self.getEditableLayer() 3209 self.assertTrue(l.isValid()) 3210 3211 f1 = QgsFeature() 3212 # changed from ProviderTestBase.testAddFeature: we use 'qgis' instead 3213 # of NULL below. 3214 # TODO: Only unmentioned attributes get filled with the DEFAULT table 3215 # value; if the attribute is present, the saved value will be NULL if 3216 # that is indicated, or the value mentioned by the user; there is no 3217 # implicit conversion of PyQGIS::NULL to PostgreSQL DEFAULT. 3218 f1.setAttributes([6, -220, 'qgis', 'String', '15']) 3219 f1.setGeometry(QgsGeometry.fromWkt('Point (-72.345 71.987)')) 3220 3221 f2 = QgsFeature() 3222 f2.setAttributes([7, 330, 'Coconut', 'CoCoNut', '13']) 3223 3224 if l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures: 3225 # expect success 3226 result, added = l.dataProvider().addFeatures([f1, f2]) 3227 self.assertTrue( 3228 result, 'Provider reported AddFeatures capability, but returned False to addFeatures') 3229 f1.setId(added[0].id()) 3230 f2.setId(added[1].id()) 3231 3232 # check result 3233 self.testGetFeatures(l.dataProvider(), [f1, f2]) 3234 3235 # add empty list, should return true for consistency 3236 self.assertTrue(l.dataProvider().addFeatures([])) 3237 3238 # ensure that returned features have been given the correct id 3239 f = next(l.getFeatures( 3240 QgsFeatureRequest().setFilterFid(added[0].id()))) 3241 self.assertTrue(f.isValid()) 3242 self.assertEqual(f['cnt'], -220) 3243 3244 f = next(l.getFeatures( 3245 QgsFeatureRequest().setFilterFid(added[1].id()))) 3246 self.assertTrue(f.isValid()) 3247 self.assertEqual(f['cnt'], 330) 3248 else: 3249 # expect fail 3250 self.assertFalse(l.dataProvider().addFeatures([f1, f2]), 3251 'Provider reported no AddFeatures capability, but returned true to addFeatures') 3252 3253 def testModifyPk(self): 3254 """ Check if we can modify a primary key value. Since this PK is bigint, we also exercise the mapping between fid and values """ 3255 3256 vl = self.getEditableLayer() 3257 self.assertTrue(vl.isValid()) 3258 geomwkt = 'Point(-47.945 -15.812)' 3259 3260 feature = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 4'))) 3261 self.assertTrue(feature.isValid()) 3262 3263 self.assertTrue(vl.startEditing()) 3264 idxpk = vl.fields().lookupField('pk') 3265 3266 self.assertTrue(vl.dataProvider().changeFeatures({feature.id(): {idxpk: 42}}, {feature.id(): QgsGeometry.fromWkt(geomwkt)})) 3267 self.assertTrue(vl.commitChanges()) 3268 3269 # read back 3270 ft = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 42'))) 3271 self.assertTrue(ft.isValid()) 3272 self.assertEqual(ft['name'], 'Honey') 3273 assert compareWkt(ft.geometry().asWkt(), geomwkt), "Geometry mismatch. Expected: {} Got: {}\n".format(ft.geometry().asWkt(), geomwkt) 3274 3275 def testDuplicatedFieldNamesInQueryLayers(self): 3276 """Test regresssion GH #36205""" 3277 3278 vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'__rid__\' table="(SELECT row_number() OVER () AS __rid__, * FROM (SELECT * from qgis_test.some_poly_data a, qgis_test.some_poly_data b where ST_Intersects(a.geom,b.geom)) as foo)" sql=', 'test_36205', 'postgres') 3279 self.assertTrue(vl.isValid()) 3280 self.assertEqual(vl.featureCount(), 3) 3281 3282 # This fails because the "geom" field and "pk" fields are ambiguous 3283 # There is no easy fix: all duplicated fields should be explicitly aliased 3284 # and the query internally rewritten 3285 # feature = next(vl.getFeatures()) 3286 # self.assertTrue(vl.isValid()) 3287 3288 def testUnrestrictedGeometryType(self): 3289 """Test geometry column with no explicit geometry type, regression GH #38565""" 3290 3291 md = QgsProviderRegistry.instance().providerMetadata("postgres") 3292 conn = md.createConnection(self.dbconn, {}) 3293 3294 # Cleanup if needed 3295 try: 3296 conn.dropVectorTable('qgis_test', 'test_unrestricted_geometry') 3297 except QgsProviderConnectionException: 3298 pass 3299 3300 conn.executeSql(''' 3301 CREATE TABLE "qgis_test"."test_unrestricted_geometry" ( 3302 gid serial primary key, 3303 geom geometry(Geometry, 4326) 3304 );''') 3305 3306 points = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=POINT table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_points', 'postgres') 3307 lines = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=LINESTRING table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_lines', 'postgres') 3308 polygons = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_polygons', 'postgres') 3309 3310 self.assertTrue(points.isValid()) 3311 self.assertTrue(lines.isValid()) 3312 self.assertTrue(polygons.isValid()) 3313 3314 f = QgsFeature(points.fields()) 3315 f.setGeometry(QgsGeometry.fromWkt('point(9 45)')) 3316 self.assertTrue(points.dataProvider().addFeatures([f])) 3317 self.assertEqual(points.featureCount(), 1) 3318 self.assertEqual(lines.featureCount(), 0) 3319 self.assertEqual(polygons.featureCount(), 0) 3320 3321 # Fetch from iterator 3322 self.assertTrue(compareWkt(next(points.getFeatures()).geometry().asWkt(), 'point(9 45)')) 3323 with self.assertRaises(StopIteration): 3324 next(lines.getFeatures()) 3325 with self.assertRaises(StopIteration): 3326 next(polygons.getFeatures()) 3327 3328 f.setGeometry(QgsGeometry.fromWkt('linestring(9 45, 10 46)')) 3329 self.assertTrue(lines.dataProvider().addFeatures([f])) 3330 self.assertEqual(points.featureCount(), 1) 3331 self.assertEqual(lines.featureCount(), 1) 3332 self.assertEqual(polygons.featureCount(), 0) 3333 3334 # Fetch from iterator 3335 self.assertTrue(compareWkt(next(points.getFeatures()).geometry().asWkt(), 'point(9 45)')) 3336 self.assertTrue(compareWkt(next(lines.getFeatures()).geometry().asWkt(), 'linestring(9 45, 10 46)')) 3337 with self.assertRaises(StopIteration): 3338 next(polygons.getFeatures()) 3339 3340 # Test regression GH #38567 (no SRID requested in the data source URI) 3341 # Cleanup if needed 3342 conn.executeSql('DELETE FROM "qgis_test"."test_unrestricted_geometry" WHERE \'t\'') 3343 3344 points = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=POINT table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_points', 'postgres') 3345 lines = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=LINESTRING table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_lines', 'postgres') 3346 polygons = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=POLYGON table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_polygons', 'postgres') 3347 3348 self.assertTrue(points.isValid()) 3349 self.assertTrue(lines.isValid()) 3350 self.assertTrue(polygons.isValid()) 3351 3352 def test_read_wkb(self): 3353 """ Test to read WKB from Postgis. """ 3354 md = QgsProviderRegistry.instance().providerMetadata("postgres") 3355 conn = md.createConnection(self.dbconn, {}) 3356 results = conn.executeSql("SELECT ST_AsBinary(ST_MakePoint(5, 10));") 3357 wkb = results[0][0] 3358 geom = QgsGeometry() 3359 import binascii 3360 geom.fromWkb(binascii.unhexlify(wkb[2:])) 3361 self.assertEqual(geom.asWkt(), "Point (5 10)") 3362 3363 def testTrustFlag(self): 3364 """Test regression https://github.com/qgis/QGIS/issues/38809""" 3365 3366 vl = QgsVectorLayer( 3367 self.dbconn + 3368 ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."editData" (geom) sql=', 3369 'testTrustFlag', 'postgres') 3370 3371 self.assertTrue(vl.isValid()) 3372 3373 p = QgsProject.instance() 3374 d = QTemporaryDir() 3375 dir_path = d.path() 3376 3377 self.assertTrue(p.addMapLayers([vl])) 3378 project_path = os.path.join(dir_path, 'testTrustFlag.qgs') 3379 self.assertTrue(p.write(project_path)) 3380 3381 del vl 3382 3383 p.clear() 3384 self.assertTrue(p.read(project_path)) 3385 vl = p.mapLayersByName('testTrustFlag')[0] 3386 self.assertTrue(vl.isValid()) 3387 self.assertFalse(p.trustLayerMetadata()) 3388 3389 # Set the trust flag 3390 p.setTrustLayerMetadata(True) 3391 self.assertTrue(p.write(project_path)) 3392 3393 # Re-read 3394 p.clear() 3395 self.assertTrue(p.read(project_path)) 3396 self.assertTrue(p.trustLayerMetadata()) 3397 vl = p.mapLayersByName('testTrustFlag')[0] 3398 self.assertTrue(vl.isValid()) 3399 3400 def testQueryLayerDuplicatedFields(self): 3401 """Test that duplicated fields from a query layer are returned""" 3402 3403 def _get_layer(sql): 3404 return QgsVectorLayer( 3405 self.dbconn + 3406 ' sslmode=disable key=\'__rid__\' table=\'(SELECT row_number() OVER () AS __rid__, * FROM (' + sql + ') as foo)\' sql=', 3407 'test', 'postgres') 3408 3409 l = _get_layer('SELECT 1, 2') 3410 self.assertEqual(l.fields().count(), 3) 3411 self.assertEqual([f.name() for f in l.fields()], ['__rid__', '?column?', '?column? (2)']) 3412 3413 l = _get_layer('SELECT 1 as id, 2 as id') 3414 self.assertEqual(l.fields().count(), 3) 3415 self.assertEqual([f.name() for f in l.fields()], ['__rid__', 'id', 'id (2)']) 3416 3417 def testInsertOnlyFieldIsEditable(self): 3418 """Test issue #40922 when an INSERT only use cannot insert a new feature""" 3419 3420 md = QgsProviderRegistry.instance().providerMetadata("postgres") 3421 conn = md.createConnection(self.dbconn, {}) 3422 conn.executeSql('DROP TABLE IF EXISTS public.insert_only_points') 3423 conn.executeSql('DROP USER IF EXISTS insert_only_user') 3424 conn.executeSql('CREATE USER insert_only_user WITH PASSWORD \'insert_only_user\'') 3425 conn.executeSql('CREATE TABLE insert_only_points (id SERIAL PRIMARY KEY, name VARCHAR(64))') 3426 conn.executeSql("SELECT AddGeometryColumn('public', 'insert_only_points', 'geom', 4326, 'POINT', 2 )") 3427 conn.executeSql('GRANT SELECT ON "public"."insert_only_points" TO insert_only_user') 3428 3429 uri = QgsDataSourceUri(self.dbconn + 3430 ' sslmode=disable key=\'id\'srid=4326 type=POINT table="public"."insert_only_points" (geom) sql=') 3431 uri.setUsername('insert_only_user') 3432 uri.setPassword('insert_only_user') 3433 vl = QgsVectorLayer(uri.uri(), 'test', 'postgres') 3434 self.assertTrue(vl.isValid()) 3435 self.assertFalse(vl.startEditing()) 3436 3437 feature = QgsFeature(vl.fields()) 3438 self.assertFalse(QgsVectorLayerUtils.fieldIsEditable(vl, 0, feature)) 3439 self.assertFalse(QgsVectorLayerUtils.fieldIsEditable(vl, 1, feature)) 3440 3441 conn.executeSql('GRANT INSERT ON "public"."insert_only_points" TO insert_only_user') 3442 vl = QgsVectorLayer(uri.uri(), 'test', 'postgres') 3443 3444 feature = QgsFeature(vl.fields()) 3445 self.assertTrue(vl.startEditing()) 3446 self.assertTrue(QgsVectorLayerUtils.fieldIsEditable(vl, 0, feature)) 3447 self.assertTrue(QgsVectorLayerUtils.fieldIsEditable(vl, 1, feature)) 3448 3449 def testPkeyIntArray(self): 3450 """ 3451 Test issue #42778 when pkey is an int array 3452 """ 3453 md = QgsProviderRegistry.instance().providerMetadata("postgres") 3454 conn = md.createConnection(self.dbconn, {}) 3455 conn.executeSql('DROP TABLE IF EXISTS public.test_pkey_intarray') 3456 conn.executeSql('CREATE TABLE public.test_pkey_intarray (id _int8 PRIMARY KEY, name VARCHAR(64))') 3457 conn.executeSql("""INSERT INTO public.test_pkey_intarray (id, name) VALUES('{0,0,19111815}', 'test')""") 3458 3459 uri = QgsDataSourceUri(self.dbconn + 3460 ' sslmode=disable key=\'id\' table="public"."test_pkey_intarray" sql=') 3461 vl = QgsVectorLayer(uri.uri(), 'test', 'postgres') 3462 self.assertTrue(vl.isValid()) 3463 3464 feat = next(vl.getFeatures()) 3465 self.assertTrue(feat.isValid()) 3466 self.assertEqual(feat["name"], "test") 3467 3468 fid = feat.id() 3469 self.assertTrue(fid > 0) 3470 3471 feat = vl.getFeature(fid) 3472 self.assertTrue(feat.isValid()) 3473 self.assertEqual(feat["name"], "test") 3474 3475 def testExportPkGuessLogic(self): 3476 """Test that when creating an empty layer a NOT NULL UNIQUE numeric field is identified as a PK""" 3477 3478 md = QgsProviderRegistry.instance().providerMetadata("postgres") 3479 conn = md.createConnection(self.dbconn, {}) 3480 conn.executeSql( 3481 'DROP TABLE IF EXISTS qgis_test."testExportPkGuessLogic_source" CASCADE') 3482 conn.executeSql( 3483 'DROP TABLE IF EXISTS qgis_test."testExportPkGuessLogic_exported" CASCADE') 3484 conn.executeSql( 3485 """CREATE TABLE qgis_test."testExportPkGuessLogic_source" ( id bigint generated always as identity primary key, 3486 geom geometry(Point, 4326) check (st_isvalid(geom)), 3487 name text unique, author text not null)""") 3488 3489 source_layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_source" (geom) sql=', 'testExportPkGuessLogic_source', 'postgres') 3490 3491 md = QgsProviderRegistry.instance().providerMetadata('postgres') 3492 conn = md.createConnection(self.dbconn, {}) 3493 table = conn.table("qgis_test", "testExportPkGuessLogic_source") 3494 self.assertEqual(table.primaryKeyColumns(), ['id']) 3495 3496 self.assertTrue(source_layer.isValid()) 3497 3498 # Create the URI as the browser does (no PK information) 3499 uri = self.dbconn + ' sslmode=disable srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_exported" (geom) sql=' 3500 3501 exporter = QgsVectorLayerExporter(uri, 'postgres', source_layer.fields(), source_layer.wkbType(), source_layer.crs(), True, {}) 3502 self.assertTrue(exporter.lastError() == '') 3503 3504 exported_layer = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_exported" (geom) sql=', 'testExportPkGuessLogic_exported', 'postgres') 3505 self.assertTrue(exported_layer.isValid()) 3506 3507 table = conn.table("qgis_test", "testExportPkGuessLogic_exported") 3508 self.assertEqual(table.primaryKeyColumns(), ['id']) 3509 3510 self.assertEqual(exported_layer.fields().names(), ['id', 'name', 'author']) 3511 3512 3513if __name__ == '__main__': 3514 unittest.main() 3515