1# -*- coding: utf-8 -*-
2"""QGIS Unit tests for the postgres provider.
3
4Note: to prepare the DB, you need to run the sql files specified in
5tests/testdata/provider/testdata_pg.sh
6
7Read tests/README.md about writing/launching tests with PostgreSQL.
8
9Run with ctest -V -R PyQgsPostgresProvider
10
11.. note:: This program is free software; you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation; either version 2 of the License, or
14(at your option) any later version.
15
16"""
17from builtins import next
18
19__author__ = 'Matthias Kuhn'
20__date__ = '2015-04-23'
21__copyright__ = 'Copyright 2015, The QGIS Project'
22
23import qgis  # NOQA
24import psycopg2
25
26import os
27import time
28from datetime import datetime
29
30from qgis.core import (
31    QgsVectorLayer,
32    QgsVectorLayerExporter,
33    QgsFeatureRequest,
34    QgsFeatureSource,
35    QgsFeature,
36    QgsFieldConstraints,
37    QgsDataProvider,
38    NULL,
39    QgsVectorLayerUtils,
40    QgsSettings,
41    QgsTransactionGroup,
42    QgsReadWriteContext,
43    QgsRectangle,
44    QgsDefaultValue,
45    QgsCoordinateReferenceSystem,
46    QgsProject,
47    QgsWkbTypes,
48    QgsGeometry,
49    QgsProviderRegistry,
50    QgsVectorDataProvider,
51    QgsDataSourceUri,
52    QgsProviderConnectionException,
53)
54from qgis.gui import QgsGui, QgsAttributeForm
55from qgis.PyQt.QtCore import QDate, QTime, QDateTime, QVariant, QDir, QObject, QByteArray, QTemporaryDir
56from qgis.PyQt.QtWidgets import QLabel
57from qgis.testing import start_app, unittest
58from qgis.PyQt.QtXml import QDomDocument
59from utilities import unitTestDataPath, compareWkt
60from providertestbase import ProviderTestCase
61
62QGISAPP = start_app()
63TEST_DATA_DIR = unitTestDataPath()
64
65
66class TestPyQgsPostgresProvider(unittest.TestCase, ProviderTestCase):
67
68    @classmethod
69    def setUpClass(cls):
70        """Run before all tests"""
71        cls.dbconn = 'service=qgis_test'
72        if 'QGIS_PGTEST_DB' in os.environ:
73            cls.dbconn = os.environ['QGIS_PGTEST_DB']
74        # Create test layers
75        cls.vl = QgsVectorLayer(
76            cls.dbconn +
77            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=',
78            'test', 'postgres')
79        assert cls.vl.isValid()
80        cls.source = cls.vl.dataProvider()
81        cls.poly_vl = QgsVectorLayer(
82            cls.dbconn +
83            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
84            'test', 'postgres')
85        assert cls.poly_vl.isValid()
86        cls.poly_provider = cls.poly_vl.dataProvider()
87        QgsGui.editorWidgetRegistry().initEditors()
88        cls.con = psycopg2.connect(cls.dbconn)
89
90    @classmethod
91    def tearDownClass(cls):
92        """Run after all tests"""
93
94    def execSQLCommand(self, sql):
95        self.assertTrue(self.con)
96        cur = self.con.cursor()
97        self.assertTrue(cur)
98        cur.execute(sql)
99        cur.close()
100        self.con.commit()
101
102    # Create instances of this class for scoped backups,
103    # example:
104    #
105    #  backup1 = self.scopedTableBackup('qgis_test', 'datatable1')
106    #  backup2 = self.scopedTableBackup('qgis_test', 'datatable2')
107    #
108    def scopedTableBackup(self, schema, table):
109
110        class ScopedBackup():
111            def __init__(self, tester, schema, table):
112                self.schema = schema
113                self.table = table
114                self.tester = tester
115                tester.execSQLCommand('DROP TABLE IF EXISTS {s}.{t}_edit CASCADE'.format(s=schema, t=table))
116                tester.execSQLCommand('CREATE TABLE {s}.{t}_edit AS SELECT * FROM {s}.{t}'.format(s=schema, t=table))
117
118            def __del__(self):
119                self.tester.execSQLCommand('TRUNCATE TABLE {s}.{t}'.format(s=self.schema, t=self.table))
120                self.tester.execSQLCommand('INSERT INTO {s}.{t} SELECT * FROM {s}.{t}_edit'.format(s=self.schema, t=self.table))
121                self.tester.execSQLCommand('DROP TABLE {s}.{t}_edit'.format(s=self.schema, t=self.table))
122
123        return ScopedBackup(self, schema, table)
124
125    def getSource(self):
126        # create temporary table for edit tests
127        self.execSQLCommand(
128            'DROP TABLE IF EXISTS qgis_test."editData" CASCADE')
129        self.execSQLCommand(
130            'CREATE TABLE qgis_test."editData" ( pk SERIAL NOT NULL PRIMARY KEY, cnt integer, name text, name2 text, num_char text, dt timestamp without time zone, "date" date,  "time" time without time zone, geom public.geometry(Point, 4326))')
131        self.execSQLCommand("INSERT INTO qgis_test.\"editData\" (pk, cnt, name, name2, num_char, dt, \"date\", \"time\", geom) VALUES "
132                            "(5, -200, NULL, 'NuLl', '5', TIMESTAMP '2020-05-04 12:13:14', '2020-05-02', '12:13:01', '0101000020E61000001D5A643BDFC751C01F85EB51B88E5340'),"
133                            "(3, 300, 'Pear', 'PEaR', '3', NULL, NULL, NULL, NULL),"
134                            "(1, 100, 'Orange', 'oranGe', '1', TIMESTAMP '2020-05-03 12:13:14', '2020-05-03', '12:13:14', '0101000020E61000006891ED7C3F9551C085EB51B81E955040'),"
135                            "(2, 200, 'Apple', 'Apple', '2', TIMESTAMP '2020-05-04 12:14:14', '2020-05-04', '12:14:14', '0101000020E6100000CDCCCCCCCC0C51C03333333333B35140'),"
136                            "(4, 400, 'Honey', 'Honey', '4', TIMESTAMP '2021-05-04 13:13:14', '2021-05-04', '13:13:14', '0101000020E610000014AE47E17A5450C03333333333935340')")
137        self.execSQLCommand("SELECT setval('qgis_test.\"editData_pk_seq\"', 5, true)")
138        vl = QgsVectorLayer(
139            self.dbconn +
140            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."editData" (geom) sql=',
141            'test', 'postgres')
142        return vl
143
144    def getEditableLayer(self):
145        return self.getSource()
146
147    def getEditableLayerWithCheckConstraint(self):
148        """Returns the layer for attribute change CHECK constraint violation"""
149
150        return QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' srid=4326 type=POINT table="public"."test_check_constraint" (geom) sql=', 'test_check_constraint', 'postgres')
151
152    def enableCompiler(self):
153        QgsSettings().setValue('/qgis/compileExpressions', True)
154        return True
155
156    def disableCompiler(self):
157        QgsSettings().setValue('/qgis/compileExpressions', False)
158
159    def uncompiledFilters(self):
160        return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')',
161                    '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')',
162                    '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')'])
163
164    def partiallyCompiledFilters(self):
165        return set([])
166
167    def getGeneratedColumnsData(self):
168        """
169        return a tuple with the generated column test layer and the expected generated value
170        """
171        cur = self.con.cursor()
172        cur.execute("SHOW server_version_num")
173        pgversion = int(cur.fetchone()[0])
174
175        # don't trigger this test when PostgreSQL versions earlier than 12.
176        if pgversion < 120000:
177            return (None, None)
178        else:
179            return (QgsVectorLayer(self.dbconn + ' sslmode=disable table="qgis_test"."generated_columns"', 'test', 'postgres'),
180                    """('test:'::text || ((pk)::character varying)::text)""")
181
182    # HERE GO THE PROVIDER SPECIFIC TESTS
183    def testDefaultValue(self):
184        self.source.setProviderProperty(
185            QgsDataProvider.EvaluateDefaultValues, True)
186        self.assertIsInstance(self.source.defaultValue(0), int)
187        self.assertEqual(self.source.defaultValue(1), NULL)
188        self.assertEqual(self.source.defaultValue(2), 'qgis')
189        self.source.setProviderProperty(
190            QgsDataProvider.EvaluateDefaultValues, False)
191
192    def testDefaultValueClause(self):
193        self.source.setProviderProperty(
194            QgsDataProvider.EvaluateDefaultValues, False)
195        self.assertEqual(self.source.defaultValueClause(
196            0), 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)')
197        self.assertFalse(self.source.defaultValueClause(1))
198        self.assertEqual(self.source.defaultValueClause(2), '\'qgis\'::text')
199
200    def testDateTimeTypes(self):
201        vl = QgsVectorLayer('%s table="qgis_test"."date_times" sql=' % (
202            self.dbconn), "testdatetimes", "postgres")
203        self.assertTrue(vl.isValid())
204
205        fields = vl.dataProvider().fields()
206        self.assertEqual(fields.at(fields.indexFromName(
207            'date_field')).type(), QVariant.Date)
208        self.assertEqual(fields.at(fields.indexFromName(
209            'time_field')).type(), QVariant.Time)
210        self.assertEqual(fields.at(fields.indexFromName(
211            'datetime_field')).type(), QVariant.DateTime)
212
213        f = next(vl.getFeatures(QgsFeatureRequest()))
214
215        date_idx = vl.fields().lookupField('date_field')
216        self.assertIsInstance(f.attributes()[date_idx], QDate)
217        self.assertEqual(f.attributes()[date_idx], QDate(2004, 3, 4))
218        time_idx = vl.fields().lookupField('time_field')
219        self.assertIsInstance(f.attributes()[time_idx], QTime)
220        self.assertEqual(f.attributes()[time_idx], QTime(13, 41, 52))
221        datetime_idx = vl.fields().lookupField('datetime_field')
222        self.assertIsInstance(f.attributes()[datetime_idx], QDateTime)
223        self.assertEqual(f.attributes()[datetime_idx], QDateTime(
224            QDate(2004, 3, 4), QTime(13, 41, 52)))
225
226    def testBooleanType(self):
227        vl = QgsVectorLayer('{} table="qgis_test"."boolean_table" sql='.format(
228            self.dbconn), "testbool", "postgres")
229        self.assertTrue(vl.isValid())
230
231        fields = vl.dataProvider().fields()
232        self.assertEqual(
233            fields.at(fields.indexFromName('fld1')).type(), QVariant.Bool)
234
235        values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()}
236        expected = {
237            1: True,
238            2: False,
239            3: NULL
240        }
241        self.assertEqual(values, expected)
242
243    def testByteaType(self):
244        vl = QgsVectorLayer('{} table="qgis_test"."byte_a_table" sql='.format(
245            self.dbconn), "testbytea", "postgres")
246        self.assertTrue(vl.isValid())
247
248        fields = vl.dataProvider().fields()
249        self.assertEqual(fields.at(fields.indexFromName(
250            'fld1')).type(), QVariant.ByteArray)
251
252        values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()}
253        expected = {
254            1: QByteArray(b'YmludmFsdWU='),
255            2: QByteArray()
256        }
257        self.assertEqual(values, expected)
258
259        # editing binary values
260        self.execSQLCommand(
261            'DROP TABLE IF EXISTS qgis_test."byte_a_table_edit" CASCADE')
262        self.execSQLCommand(
263            'CREATE TABLE qgis_test."byte_a_table_edit" ( pk SERIAL NOT NULL PRIMARY KEY, blobby bytea)')
264        self.execSQLCommand("INSERT INTO qgis_test.\"byte_a_table_edit\" (pk, blobby) VALUES "
265                            "(1, encode('bbb', 'base64')::bytea)")
266        vl = QgsVectorLayer(
267            self.dbconn + ' sslmode=disable table="qgis_test"."byte_a_table_edit" sql=',
268            'test', 'postgres')
269        self.assertTrue(vl.isValid())
270        values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()}
271        expected = {
272            1: QByteArray(b'YmJi')
273        }
274        self.assertEqual(values, expected)
275
276        # change attribute value
277        self.assertTrue(vl.dataProvider().changeAttributeValues(
278            {1: {1: QByteArray(b'bbbvx')}}))
279        values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()}
280        expected = {
281            1: QByteArray(b'bbbvx')
282        }
283        self.assertEqual(values, expected)
284
285        # add feature
286        f = QgsFeature()
287        f.setAttributes([2, QByteArray(b'cccc')])
288        self.assertTrue(vl.dataProvider().addFeature(f))
289        values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()}
290        expected = {
291            1: QByteArray(b'bbbvx'),
292            2: QByteArray(b'cccc')
293        }
294        self.assertEqual(values, expected)
295
296        # change feature
297        self.assertTrue(vl.dataProvider().changeFeatures(
298            {2: {1: QByteArray(b'dddd')}}, {}))
299        values = {feat['pk']: feat['blobby'] for feat in vl.getFeatures()}
300        expected = {
301            1: QByteArray(b'bbbvx'),
302            2: QByteArray(b'dddd')
303        }
304        self.assertEqual(values, expected)
305
306    def testCitextType(self):
307        vl = QgsVectorLayer('{} table="qgis_test"."citext_table" sql='.format(
308            self.dbconn), "testbytea", "postgres")
309        self.assertTrue(vl.isValid())
310
311        fields = vl.dataProvider().fields()
312        self.assertEqual(
313            fields.at(fields.indexFromName('fld1')).type(), QVariant.String)
314
315        values = {feat['id']: feat['fld1'] for feat in vl.getFeatures()}
316        expected = {
317            1: 'test val',
318            2: NULL
319        }
320        self.assertEqual(values, expected)
321
322        # editing citext values
323        self.execSQLCommand(
324            'DROP TABLE IF EXISTS qgis_test."citext_table_edit" CASCADE')
325        self.execSQLCommand(
326            'CREATE TABLE qgis_test."citext_table_edit" ( pk SERIAL NOT NULL PRIMARY KEY, txt citext)')
327        self.execSQLCommand("INSERT INTO qgis_test.\"citext_table_edit\" (pk, txt) VALUES "
328                            "(1, 'text')")
329        vl = QgsVectorLayer(
330            self.dbconn + ' sslmode=disable table="qgis_test"."citext_table_edit" sql=',
331            'test', 'postgres')
332        self.assertTrue(vl.isValid())
333        values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()}
334        expected = {
335            1: 'text'
336        }
337        self.assertEqual(values, expected)
338
339        # change attribute value
340        self.assertTrue(
341            vl.dataProvider().changeAttributeValues({1: {1: 'teeeext'}}))
342        values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()}
343        expected = {
344            1: 'teeeext'
345        }
346        self.assertEqual(values, expected)
347
348        # add feature
349        f = QgsFeature()
350        f.setAttributes([2, 'teeeeeeeeeext'])
351        self.assertTrue(vl.dataProvider().addFeature(f))
352        values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()}
353        expected = {
354            1: 'teeeext',
355            2: 'teeeeeeeeeext'
356        }
357        self.assertEqual(values, expected)
358
359        # change feature
360        self.assertTrue(vl.dataProvider().changeFeatures(
361            {2: {1: 'teeeeeeeeeeeeeeeeeeeeeeext'}}, {}))
362        values = {feat['pk']: feat['txt'] for feat in vl.getFeatures()}
363        expected = {
364            1: 'teeeext',
365            2: 'teeeeeeeeeeeeeeeeeeeeeeext'
366        }
367        self.assertEqual(values, expected)
368
369    def testQueryLayers(self):
370        def test_query(dbconn, query, key):
371            ql = QgsVectorLayer(
372                '%s srid=4326 table="%s" (geom) key=\'%s\' sql=' % (
373                    dbconn, query.replace('"', '\\"'), key), "testgeom",
374                "postgres")
375            self.assertTrue(ql.isValid(), '{} ({})'.format(query, key))
376
377        test_query(self.dbconn,
378                   '(SELECT NULL::integer "Id1", NULL::integer "Id2", NULL::geometry(Point, 4326) geom LIMIT 0)',
379                   '"Id1","Id2"')
380
381    def testWkbTypes(self):
382        def test_table(dbconn, table_name, wkt):
383            vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (dbconn, table_name), "testgeom",
384                                "postgres")
385            self.assertTrue(vl.isValid())
386            for f in vl.getFeatures():
387                self.assertEqual(f.geometry().asWkt(), wkt)
388
389        test_table(self.dbconn, 'p2d', 'Polygon ((0 0, 1 0, 1 1, 0 1, 0 0))')
390        test_table(self.dbconn, 'p3d',
391                   'PolygonZ ((0 0 0, 1 0 0, 1 1 0, 0 1 0, 0 0 0))')
392        test_table(self.dbconn, 'triangle2d', 'Polygon ((0 0, 1 0, 1 1, 0 0))')
393        test_table(self.dbconn, 'triangle3d',
394                   'PolygonZ ((0 0 0, 1 0 0, 1 1 0, 0 0 0))')
395        test_table(self.dbconn, 'tin2d',
396                   'MultiPolygon (((0 0, 1 0, 1 1, 0 0)),((0 0, 0 1, 1 1, 0 0)))')
397        test_table(self.dbconn, 'tin3d',
398                   'MultiPolygonZ (((0 0 0, 1 0 0, 1 1 0, 0 0 0)),((0 0 0, 0 1 0, 1 1 0, 0 0 0)))')
399        test_table(self.dbconn, 'ps2d',
400                   'MultiPolygon (((0 0, 1 0, 1 1, 0 1, 0 0)))')
401        test_table(self.dbconn, 'ps3d',
402                   'MultiPolygonZ (((0 0 0, 0 1 0, 1 1 0, 1 0 0, 0 0 0)),((0 0 1, 1 0 1, 1 1 1, 0 1 1, 0 0 1)),((0 0 0, 0 0 1, 0 1 1, 0 1 0, 0 0 0)),((0 1 0, 0 1 1, 1 1 1, 1 1 0, 0 1 0)),((1 1 0, 1 1 1, 1 0 1, 1 0 0, 1 1 0)),((1 0 0, 1 0 1, 0 0 1, 0 0 0, 1 0 0)))')
403        test_table(self.dbconn, 'mp3d',
404                   'MultiPolygonZ (((0 0 0, 0 1 0, 1 1 0, 1 0 0, 0 0 0)),((0 0 1, 1 0 1, 1 1 1, 0 1 1, 0 0 1)),((0 0 0, 0 0 1, 0 1 1, 0 1 0, 0 0 0)),((0 1 0, 0 1 1, 1 1 1, 1 1 0, 0 1 0)),((1 1 0, 1 1 1, 1 0 1, 1 0 0, 1 1 0)),((1 0 0, 1 0 1, 0 0 1, 0 0 0, 1 0 0)))')
405        test_table(self.dbconn, 'pt2d', 'Point (0 0)')
406        test_table(self.dbconn, 'pt3d', 'PointZ (0 0 0)')
407        test_table(self.dbconn, 'ls2d', 'LineString (0 0, 1 1)')
408        test_table(self.dbconn, 'ls3d', 'LineStringZ (0 0 0, 1 1 1)')
409        test_table(self.dbconn, 'mpt2d', 'MultiPoint ((0 0),(1 1))')
410        test_table(self.dbconn, 'mpt3d', 'MultiPointZ ((0 0 0),(1 1 1))')
411        test_table(self.dbconn, 'mls2d',
412                   'MultiLineString ((0 0, 1 1),(2 2, 3 3))')
413        test_table(self.dbconn, 'mls3d',
414                   'MultiLineStringZ ((0 0 0, 1 1 1),(2 2 2, 3 3 3))')
415
416        test_table(self.dbconn, 'pt4d', 'PointZM (1 2 3 4)')
417
418    def testMetadata(self):
419        """ Test that metadata is correctly acquired from provider """
420        metadata = self.vl.metadata()
421        self.assertEqual(
422            metadata.crs(), QgsCoordinateReferenceSystem.fromEpsgId(4326))
423        self.assertEqual(metadata.type(), 'dataset')
424        self.assertEqual(metadata.abstract(), 'QGIS Test Table')
425
426    def testGetFeaturesUniqueId(self):
427        """
428        Test tables with inheritance for unique ids
429        """
430
431        def test_unique(features, num_features):
432            featureids = []
433            for f in features:
434                self.assertFalse(f.id() in featureids)
435                featureids.append(f.id())
436            self.assertEqual(len(features), num_features)
437
438        vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'someData'), "testgeom",
439                            "postgres")
440        self.assertTrue(vl.isValid())
441        # Test someData
442        test_unique([f for f in vl.getFeatures()], 5)
443
444        # Test base_table_bad: layer is invalid
445        vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'base_table_bad'),
446                            "testgeom", "postgres")
447        self.assertFalse(vl.isValid())
448        # Test base_table_bad with use estimated metadata: layer is valid because the unique test is skipped
449        vl = QgsVectorLayer(
450            '%s srid=4326 estimatedmetadata="true" table="qgis_test".%s (geom) sql=' % (
451                self.dbconn, 'base_table_bad'),
452            "testgeom", "postgres")
453        self.assertTrue(vl.isValid())
454
455        # Test base_table_good: layer is valid
456        vl = QgsVectorLayer('%s srid=4326 table="qgis_test".%s (geom) sql=' % (self.dbconn, 'base_table_good'),
457                            "testgeom", "postgres")
458        self.assertTrue(vl.isValid())
459        test_unique([f for f in vl.getFeatures()], 4)
460        # Test base_table_good with use estimated metadata: layer is valid
461        vl = QgsVectorLayer(
462            '%s srid=4326 estimatedmetadata="true" table="qgis_test".%s (geom) sql=' % (
463                self.dbconn, 'base_table_good'),
464            "testgeom", "postgres")
465        self.assertTrue(vl.isValid())
466        test_unique([f for f in vl.getFeatures()], 4)
467
468    # See https://github.com/qgis/QGIS/issues/22258
469    # TODO: accept multi-featured layers, and an array of values/fids
470    def testSignedIdentifiers(self):
471
472        def test_layer(ql, att, val, fidval):
473            self.assertTrue(ql.isValid())
474            features = ql.getFeatures()
475            att_idx = ql.fields().lookupField(att)
476            count = 0
477            for f in features:
478                count += 1
479                self.assertEqual(f.attributes()[att_idx], val)
480                self.assertEqual(f.id(), fidval)
481            self.assertEqual(count, 1)
482
483        def test(dbconn, query, att, val, fidval):
484            table = query.replace('"', '\\"')
485            uri = '%s table="%s" (g) key=\'%s\'' % (dbconn, table, att)
486            ql = QgsVectorLayer(uri, "t", "postgres")
487            test_layer(ql, att, val, fidval)
488            # now with estimated metadata
489            uri += ' estimatedmetadata="true"'
490            test_layer(ql, att, val, fidval)
491
492        # --- INT16 ----
493        # zero
494        test(self.dbconn, '(SELECT 0::int2 i, NULL::geometry(Point) g)', 'i', 0, 0)
495        # low positive
496        test(self.dbconn, '(SELECT 1::int2 i, NULL::geometry(Point) g)', 'i', 1, 1)
497        # low negative
498        test(self.dbconn, '(SELECT -1::int2 i, NULL::geometry(Point) g)',
499             'i', -1, 4294967295)
500        # max positive signed 16bit integer
501        test(self.dbconn, '(SELECT 32767::int2 i, NULL::geometry(Point) g)',
502             'i', 32767, 32767)
503        # max negative signed 16bit integer
504        test(self.dbconn, '(SELECT (-32768)::int2 i, NULL::geometry(Point) g)',
505             'i', -32768, 4294934528)
506
507        # --- INT32 ----
508        # zero
509        test(self.dbconn, '(SELECT 0::int4 i, NULL::geometry(Point) g)', 'i', 0, 0)
510        # low positive
511        test(self.dbconn, '(SELECT 2::int4 i, NULL::geometry(Point) g)', 'i', 2, 2)
512        # low negative
513        test(self.dbconn, '(SELECT -2::int4 i, NULL::geometry(Point) g)',
514             'i', -2, 4294967294)
515        # max positive signed 32bit integer
516        test(self.dbconn, '(SELECT 2147483647::int4 i, NULL::geometry(Point) g)',
517             'i', 2147483647, 2147483647)
518        # max negative signed 32bit integer
519        test(self.dbconn, '(SELECT (-2147483648)::int4 i, NULL::geometry(Point) g)',
520             'i', -2147483648, 2147483648)
521
522        # --- INT64 (FIDs are always 1 because assigned ex-novo) ----
523        # zero
524        test(self.dbconn, '(SELECT 0::int8 i, NULL::geometry(Point) g)', 'i', 0, 1)
525        # low positive
526        test(self.dbconn, '(SELECT 3::int8 i, NULL::geometry(Point) g)', 'i', 3, 1)
527        # low negative
528        test(self.dbconn, '(SELECT -3::int8 i, NULL::geometry(Point) g)', 'i', -3, 1)
529        # max positive signed 64bit integer
530        test(self.dbconn, '(SELECT 9223372036854775807::int8 i, NULL::geometry(Point) g)',
531             'i', 9223372036854775807, 1)
532        # max negative signed 32bit integer
533        test(self.dbconn, '(SELECT (-9223372036854775808)::int8 i, NULL::geometry(Point) g)', 'i', -9223372036854775808,
534             1)
535
536    def testPktIntInsert(self):
537        vl = QgsVectorLayer('{} table="qgis_test"."{}" key="pk" sql='.format(self.dbconn, 'bikes_view'), "bikes_view",
538                            "postgres")
539        self.assertTrue(vl.isValid())
540        f = QgsFeature(vl.fields())
541        f['pk'] = NULL
542        f['name'] = 'Cilo'
543        r, f = vl.dataProvider().addFeatures([f])
544        self.assertTrue(r)
545        self.assertNotEqual(f[0]['pk'], NULL, f[0].attributes())
546        vl.deleteFeatures([f[0].id()])
547
548    def testGeneratedFields(self):
549        """Test if GENERATED geometry/geography columns are correctly handled by the provider."""
550        cur = self.con.cursor()
551        cur.execute("SHOW server_version_num")
552        pgversion = int(cur.fetchone()[0])
553
554        # GENERATED columns are unsupported by PostgreSQL versions earlier than 12.
555        if pgversion < 120000:
556            return
557
558        # Backup test table (will be edited)
559        self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.test_gen_col_edit CASCADE')
560        self.execSQLCommand('CREATE TABLE qgis_test.test_gen_col_edit AS SELECT id,name,geom FROM qgis_test.test_gen_col')
561
562        # Geometry columns
563        vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
564        self.assertTrue(vl.isValid())
565
566        # writing geometry...
567        f = QgsFeature(vl.fields())
568
569        ix_name = f.fieldNameIndex('name')
570
571        f.setGeometry(QgsGeometry.fromWkt('Polygon ((-67 -2, -67 0, -68 0, -70 -1, -67 -2))'))
572        f.setAttribute(ix_name, 'QGIS-3')
573
574        self.assertTrue(vl.startEditing())
575        self.assertTrue(vl.addFeatures([f]))
576        self.assertTrue(vl.commitChanges())
577
578        # reading back to see if we saved the centroid correctly.
579        vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
580        f2 = next(vl2.getFeatures(QgsFeatureRequest()))
581        generated_geometry = f2.geometry().asWkt()
582        expected_geometry = 'Point (-68.047619047619051 -0.90476190476190477)'
583        expected_area = 43069568296.34387
584
585        assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry)
586        self.assertAlmostEqual(f2['poly_area'], expected_area, places=4)
587        self.assertEqual(f2['name'], 'QGIS-3')
588
589        # Checking if we can correctly change values of an existing feature.
590        self.assertTrue(vl2.startEditing())
591        ix2_name = f2.fieldNameIndex('name')
592        fid2 = f2.id()
593        vl2.changeAttributeValue(fid2, ix2_name, 'New')
594        self.assertTrue(vl2.commitChanges())
595
596        # getting a brand new QgsVectorLayer
597        vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
598        self.assertTrue(vl.isValid())
599
600        # checking if the name field was correctly updated
601        f = next(vl.getFeatures(QgsFeatureRequest()))
602        self.assertEqual(f['name'], 'New')
603
604        # Now, check if we can change the value of a GENERATED field (we shouldn't)
605        self.assertTrue(vl.startEditing())
606        ix_area = f.fieldNameIndex('poly_area')
607        fid = f.id()
608        vl.changeAttributeValue(fid, ix_area, 42)
609        self.assertTrue(vl.commitChanges())
610
611        # reading back
612        vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
613        f2 = next(vl2.getFeatures(QgsFeatureRequest()))
614        self.assertAlmostEqual(f2['poly_area'], expected_area, places=4)
615
616        # now, getting a brand new QgsVectorLayer to check if changes (UPDATE) in the geometry are reflected in the generated fields
617        vl = QgsVectorLayer('{} table="qgis_test"."{}" (geom) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
618        self.assertTrue(vl.isValid())
619        f = next(vl.getFeatures(QgsFeatureRequest()))
620        vl.startEditing()
621        fid = f.id()
622        vl.changeGeometry(fid, QgsGeometry.fromWkt('Polygon ((-67 -2, -65 0, -68 0, -70 -1, -67 -2))'))
623        vl.commitChanges()
624
625        # reading back...
626        vl2 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_col"), "test_gen_col", "postgres")
627        f2 = next(vl2.getFeatures(QgsFeatureRequest()))
628        generated_geometry = f2.geometry().asWkt()
629
630        generated_geometry = f2.geometry().asWkt()
631        expected_geometry = 'Point (-67.42424242424242209 -0.81818181818181823)'
632        expected_area = 67718478405.28429
633
634        assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry)
635        self.assertAlmostEqual(f2['poly_area'], expected_area, places=4)
636        self.assertEqual(f2['name'], 'New')
637
638        # Geography columns
639        vl3 = QgsVectorLayer('{} table="qgis_test"."{}" (geog) srid=4326 type=POLYGON key="id" sql='.format(self.dbconn, "test_gen_geog_col"), "test_gen_geog_col", "postgres")
640        self.assertTrue(vl3.isValid())
641
642        # writing geography...
643        f3 = QgsFeature(vl3.fields())
644        f3.setGeometry(QgsGeometry.fromWkt('Polygon ((-67 -2, -67 0, -68 0, -70 -1, -67 -2))'))
645        self.assertTrue(vl3.startEditing())
646        self.assertTrue(vl3.addFeatures([f3]))
647        self.assertTrue(vl3.commitChanges())
648
649        # reading back geography and checking values
650        vl4 = QgsVectorLayer('{} table="qgis_test"."{}" (cent) srid=4326 type=POINT key="id" sql='.format(self.dbconn, "test_gen_geog_col"), "test_gen_geog_col", "postgres")
651        f4 = next(vl4.getFeatures(QgsFeatureRequest()))
652        generated_geometry = f4.geometry().asWkt()
653        expected_geometry = 'Point (-68.0477406158202 -0.904960604589168)'
654        expected_area = 43088884296.69713
655
656        assert compareWkt(generated_geometry, expected_geometry), "Geometry mismatch! Expected:\n{}\nGot:\n{}\n".format(expected_geometry, generated_geometry)
657        self.assertEqual(f4['poly_area'], expected_area)
658
659        # Restore test table (after editing it)
660        self.execSQLCommand('TRUNCATE TABLE qgis_test.test_gen_col')
661        self.execSQLCommand('INSERT INTO qgis_test.test_gen_col(id,name,geom) SELECT id,name,geom FROM qgis_test.test_gen_col_edit')
662        self.execSQLCommand('DROP TABLE qgis_test.test_gen_col_edit')
663
664    def testNonPkBigintField(self):
665        """Test if we can correctly insert, read and change attributes(fields) of type bigint and which are not PKs."""
666        vl = QgsVectorLayer(
667            '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(
668                self.dbconn, 'bigint_pk'),
669            "bigint_pk", "postgres")
670        self.assertTrue(vl.isValid())
671        flds = vl.fields()
672
673        # Backup test table (will be edited)
674        scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk')
675
676        # check if default values are correctly read back
677        f = next(vl.getFeatures(QgsFeatureRequest()))
678        bigint_with_default_idx = vl.fields().lookupField('bigint_attribute_def')
679        self.assertEqual(f.attributes()[bigint_with_default_idx], 42)
680
681        # check if NULL values are correctly read
682        bigint_def_null_idx = vl.fields().lookupField('bigint_attribute')
683        self.assertEqual(f.attributes()[bigint_def_null_idx], NULL)
684
685        # check if we can overwrite a default value
686        vl.startEditing()
687        vl.changeAttributeValue(f.id(), bigint_with_default_idx, 43)
688
689        pkidx = vl.fields().lookupField('pk')
690        editedid = f.attributes()[pkidx]
691
692        self.assertTrue(vl.commitChanges())
693        vl2 = QgsVectorLayer(
694            '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(
695                self.dbconn, 'bigint_pk'),
696            "bigint_pk", "postgres")
697        flds = vl2.fields()
698        self.assertTrue(vl2.isValid())
699        f = next(vl2.getFeatures(
700            QgsFeatureRequest().setFilterExpression('pk = ' + str(editedid))))
701        bigint_with_default_idx = vl2.fields().lookupField('bigint_attribute_def')
702        self.assertEqual(f.attributes()[bigint_with_default_idx], 43)
703
704        # check if we can insert a new value
705        dp = vl2.dataProvider()
706        dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1)
707        pkidx = vl2.fields().lookupField('pk')
708        vl2.startEditing()
709        f = QgsFeature(vl2.fields())
710        f['pk'] = NULL
711        f['value'] = 'The answer.'
712        f['bigint_attribute'] = 84
713        f.setAttribute(pkidx, vl2.dataProvider().defaultValue(pkidx))
714        f.setAttribute(bigint_with_default_idx,
715                       vl2.dataProvider().defaultValue(bigint_with_default_idx))
716        r, f = vl2.dataProvider().addFeatures([f])
717        self.assertTrue(r)
718        vl2.commitChanges()
719        inserted_id = f[0]['pk']
720
721        f = next(vl2.getFeatures(
722            QgsFeatureRequest().setFilterExpression('pk = ' + str(inserted_id))))
723
724        self.assertEqual(f['bigint_attribute'], 84)
725        self.assertEqual(f['bigint_attribute_def'], 42)
726
727    def testPktUpdateBigintPk(self):
728        """Test if we can update objects with positive, zero and negative bigint PKs."""
729        vl = QgsVectorLayer(
730            '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(
731                self.dbconn, 'bigint_pk'),
732            "bigint_pk", "postgres")
733        flds = vl.fields()
734
735        # Backup test table (will be edited)
736        scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk')
737
738        self.assertTrue(vl.isValid())
739
740        vl.startEditing()
741
742        statuses = [-1, -1, -1, -1]
743        # changing values...
744        for ft in vl.getFeatures():
745            if ft['value'] == 'first value':
746                vl.changeAttributeValue(
747                    ft.id(), flds.indexOf('value'), '1st value')
748                statuses[0] = 0
749            elif ft['value'] == 'second value':
750                vl.changeAttributeValue(
751                    ft.id(), flds.indexOf('value'), '2nd value')
752                statuses[1] = 0
753            elif ft['value'] == 'zero value':
754                vl.changeAttributeValue(
755                    ft.id(), flds.indexOf('value'), '0th value')
756                statuses[2] = 0
757            elif ft['value'] == 'negative value':
758                vl.changeAttributeValue(
759                    ft.id(), flds.indexOf('value'), '-1th value')
760                statuses[3] = 0
761        self.assertTrue(vl.commitChanges())
762        self.assertTrue(all(x == 0 for x in statuses))
763
764        # now, let's see if the values were changed
765        vl2 = QgsVectorLayer(
766            '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(
767                self.dbconn, 'bigint_pk'),
768            "bigint_pk", "postgres")
769        self.assertTrue(vl2.isValid())
770        for ft in vl2.getFeatures():
771            if ft['value'] == '1st value':
772                statuses[0] = 1
773            elif ft['value'] == '2nd value':
774                statuses[1] = 1
775            elif ft['value'] == '0th value':
776                statuses[2] = 1
777            elif ft['value'] == '-1th value':
778                statuses[3] = 1
779        self.assertTrue(all(x == 1 for x in statuses))
780
781    def testPktUpdateBigintPkNonFirst(self):
782        """Test if we can update objects with positive, zero and negative bigint PKs in tables whose PK is not the first field"""
783        vl = QgsVectorLayer('{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(self.dbconn,
784                                                                                                       'bigint_non_first_pk'),
785                            "bigint_non_first_pk", "postgres")
786        flds = vl.fields()
787
788        self.assertTrue(vl.isValid())
789
790        vl.startEditing()
791
792        # Backup test table (will be edited)
793        scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_non_first_pk')
794
795        statuses = [-1, -1, -1, -1]
796        values = ['first value', 'second value', 'zero value', 'negative value']
797        newvalues = ['1st value', '2nd value', '0th value', '-1th value']
798        # changing values...
799        for ft in vl.getFeatures():
800            if ft['value'] == values[0]:
801                vl.changeAttributeValue(
802                    ft.id(), flds.indexOf('value'), newvalues[0])
803                statuses[0] = 0
804            elif ft['value'] == values[1]:
805                vl.changeAttributeValue(
806                    ft.id(), flds.indexOf('value'), newvalues[1])
807                statuses[1] = 0
808            elif ft['value'] == values[2]:
809                vl.changeAttributeValue(
810                    ft.id(), flds.indexOf('value'), newvalues[2])
811                statuses[2] = 0
812            elif ft['value'] == values[3]:
813                vl.changeAttributeValue(
814                    ft.id(), flds.indexOf('value'), newvalues[3])
815                statuses[3] = 0
816        self.assertTrue(vl.commitChanges())
817        for i in range(len(statuses)):
818            self.assertEqual(statuses[i], 0, 'start value "{}" not found'.format(values[i]))
819
820        # now, let's see if the values were changed
821        vl2 = QgsVectorLayer(
822            '{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(
823                self.dbconn, 'bigint_non_first_pk'),
824            "bigint_pk_nonfirst", "postgres")
825        self.assertTrue(vl2.isValid())
826        for ft in vl2.getFeatures():
827            if ft['value'] == newvalues[0]:
828                statuses[0] = 1
829            elif ft['value'] == newvalues[1]:
830                statuses[1] = 1
831            elif ft['value'] == newvalues[2]:
832                statuses[2] = 1
833            elif ft['value'] == newvalues[3]:
834                statuses[3] = 1
835        for i in range(len(statuses)):
836            self.assertEqual(statuses[i], 1, 'changed value "{}" not found'.format(newvalues[i]))
837
838    def testPktComposite(self):
839        """
840        Check that tables with PKs composed of many fields of different types are correctly read and written to
841        """
842        vl = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2"\' table="qgis_test"."tb_test_compound_pk" (geom)'.format(self.dbconn), "test_compound", "postgres")
843        self.assertTrue(vl.isValid())
844
845        fields = vl.fields()
846
847        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 1 AND pk2 = 2')))
848        # first of all: we must be able to fetch a valid feature
849        self.assertTrue(f.isValid())
850        self.assertEqual(f['pk1'], 1)
851        self.assertEqual(f['pk2'], 2)
852        self.assertEqual(f['value'], 'test 2')
853
854        # Backup test table (will be edited)
855        scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_compound_pk')
856
857        # can we edit a field?
858        vl.startEditing()
859        vl.changeAttributeValue(f.id(), fields.indexOf('value'), 'Edited Test 2')
860        self.assertTrue(vl.commitChanges())
861
862        # Did we get it right? Let's create a new QgsVectorLayer and try to read back our changes:
863        vl2 = QgsVectorLayer('{} sslmode=disable srid=4326 table="qgis_test"."tb_test_compound_pk" (geom) key=\'"pk1","pk2"\' '.format(self.dbconn), "test_compound2", "postgres")
864        self.assertTrue(vl2.isValid())
865        f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 1 AND pk2 = 2')))
866        self.assertTrue(f2.isValid())
867
868        # Then, making sure we really did change our value.
869        self.assertEqual(f2['value'], 'Edited Test 2')
870
871        # How about inserting a new field?
872        f3 = QgsFeature(vl2.fields())
873        f3['pk1'] = 4
874        f3['pk2'] = -9223372036854775800
875        f3['value'] = 'other test'
876        vl.startEditing()
877        res, f3 = vl.dataProvider().addFeatures([f3])
878        self.assertTrue(res)
879        self.assertTrue(vl.commitChanges())
880
881        # can we catch it on another layer?
882        f4 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk2 = -9223372036854775800')))
883
884        self.assertTrue(f4.isValid())
885        expected_attrs = [4, -9223372036854775800, 'other test']
886        self.assertEqual(f4.attributes(), expected_attrs)
887
888        # Finally, let's delete one of the features.
889        f5 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 2 AND pk2 = 1')))
890        vl2.startEditing()
891        vl2.deleteFeatures([f5.id()])
892        self.assertTrue(vl2.commitChanges())
893
894        # did we really delete? Let's try to get the deleted feature from the first layer.
895        f_iterator = vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk1 = 2 AND pk2 = 1'))
896        got_feature = True
897
898        try:
899            f6 = next(f_iterator)
900            got_feature = f6.isValid()
901        except StopIteration:
902            got_feature = False
903
904        self.assertFalse(got_feature)
905
906    def testPktCompositeFloat(self):
907        """
908        Check that tables with PKs composed of many fields of different types are correctly read and written to
909        """
910        vl = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2","pk3"\' table="qgis_test"."tb_test_composite_float_pk" (geom)'.format(self.dbconn), "test_composite_float", "postgres")
911        self.assertTrue(vl.isValid())
912
913        fields = vl.fields()
914
915        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '3.14159274'")))
916        # first of all: we must be able to fetch a valid feature
917        self.assertTrue(f.isValid())
918        self.assertEqual(f['pk1'], 1)
919        self.assertEqual(f['pk2'], 2)
920
921        self.assertAlmostEqual(f['pk3'], 3.14159274)
922        self.assertEqual(f['value'], 'test 2')
923
924        # Backup test table (will be edited)
925        scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_composite_float_pk')
926
927        # can we edit a field?
928        vl.startEditing()
929        vl.changeAttributeValue(f.id(), fields.indexOf('value'), 'Edited Test 2')
930        self.assertTrue(vl.commitChanges())
931
932        # Did we get it right? Let's create a new QgsVectorLayer and try to read back our changes:
933        vl2 = QgsVectorLayer('{} sslmode=disable srid=4326 key=\'"pk1","pk2","pk3"\' table="qgis_test"."tb_test_composite_float_pk" (geom)'.format(self.dbconn), "test_composite_float2", "postgres")
934        self.assertTrue(vl2.isValid())
935        f2 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '3.14159274'")))
936        self.assertTrue(f2.isValid())
937
938        # just making sure we have the correct feature
939        self.assertAlmostEqual(f2['pk3'], 3.14159274)
940
941        # Then, making sure we really did change our value.
942        self.assertEqual(f2['value'], 'Edited Test 2')
943
944        # How about inserting a new field?
945        f3 = QgsFeature(vl2.fields())
946        f3['pk1'] = 4
947        f3['pk2'] = -9223372036854775800
948        f3['pk3'] = 7.29154
949        f3['value'] = 'other test'
950        vl.startEditing()
951        res, f3 = vl.dataProvider().addFeatures([f3])
952        self.assertTrue(res)
953        self.assertTrue(vl.commitChanges())
954
955        # can we catch it on another layer?
956        f4 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk2 = '-9223372036854775800'")))
957
958        self.assertTrue(f4.isValid())
959        expected_attrs = [4, -9223372036854775800, 7.29154, 'other test']
960        gotten_attrs = [f4['pk1'], f4['pk2'], f4['pk3'], f4['value']]
961        self.assertEqual(gotten_attrs[0], expected_attrs[0])
962        self.assertEqual(gotten_attrs[1], expected_attrs[1])
963        self.assertAlmostEqual(gotten_attrs[2], expected_attrs[2], places=4)
964        self.assertEqual(gotten_attrs[3], expected_attrs[3])
965
966        # Finally, let's delete one of the features.
967        f5 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '7.29154'")))
968        vl2.startEditing()
969        vl2.deleteFeatures([f5.id()])
970        self.assertTrue(vl2.commitChanges())
971
972        # did we really delete?
973        f_iterator = vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk3 = '7.29154'"))
974        got_feature = True
975
976        try:
977            f6 = next(f_iterator)
978            got_feature = f6.isValid()
979        except StopIteration:
980            got_feature = False
981
982        self.assertFalse(got_feature)
983
984    def testPktFloatingPoint(self):
985        """
986        Check if we can handle floating point/numeric primary keys correctly
987        """
988
989        # 0. Backup test table (will be edited)
990        scopedBackup1 = self.scopedTableBackup('qgis_test', 'tb_test_float_pk')
991        scopedBackup2 = self.scopedTableBackup('qgis_test', 'tb_test_double_pk')
992
993        # 1. 32 bit float (PostgreSQL "REAL" type)
994        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_float_pk" (geom)', "test_float_pk", "postgres")
995        self.assertTrue(vl.isValid())
996
997        # 1.1. Retrieving
998        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'")))
999        self.assertTrue(f.isValid())
1000        self.assertEqual(f['value'], 'first teste')
1001        # 1.2. Editing
1002        self.assertTrue(vl.startEditing())
1003        vl.changeAttributeValue(f.id(), vl.fields().indexOf('value'), 'Changed first')
1004        self.assertTrue(vl.commitChanges())
1005        # 1.2.1. Checking edit from another vector layer
1006        vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk1" table="qgis_test"."tb_test_float_pk" (geom)', "test_float_pk2", "postgres")
1007        self.assertTrue(vl2.isValid())
1008        f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'")))
1009        self.assertTrue(f2.isValid())
1010        self.assertEqual(f2['value'], 'Changed first')
1011        # 1.3. Deleting
1012        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'")))
1013        vl.startEditing()
1014        vl.deleteFeatures([f.id()])
1015        self.assertTrue(vl.commitChanges())
1016        # 1.3.1. Checking deletion
1017        f_iterator = vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'"))
1018        got_feature = True
1019
1020        try:
1021            f2 = next(f_iterator)
1022            got_feature = f2.isValid()
1023        except StopIteration:
1024            got_feature = False
1025        self.assertFalse(got_feature)
1026        # 1.4. Inserting new feature
1027        newpointwkt = 'Point(-47.751 -15.644)'
1028        f = QgsFeature(vl.fields())
1029        f['pk'] = 0.22222222222222222222222
1030        f['value'] = 'newly inserted'
1031        f.setGeometry(QgsGeometry.fromWkt(newpointwkt))
1032        vl.startEditing()
1033        res, f = vl.dataProvider().addFeatures([f])
1034        self.assertTrue(res)
1035        self.assertTrue(vl.commitChanges())
1036        # 1.4.1. Checking insertion
1037        f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '0.22222222222222222222222'")))
1038        self.assertTrue(f2.isValid())
1039        self.assertAlmostEqual(f2['pk'], 0.2222222222222222)
1040        self.assertEqual(f2['value'], 'newly inserted')
1041        assert compareWkt(f2.geometry().asWkt(), newpointwkt), "Geometry mismatch. Expected: {} Got: {} \n".format(f2.geometry().asWkt(), newpointwkt)
1042        # One more check: can we retrieve the same row with the value that we got from this layer?
1043        floatpk = f2['pk']
1044        f3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '{}'".format(floatpk))))
1045        self.assertTrue(f3.isValid())
1046        self.assertEqual(f3['value'], 'newly inserted')
1047        self.assertEqual(f3['pk'], floatpk)
1048
1049        # 2. 64 bit float (PostgreSQL "DOUBLE PRECISION" type)
1050        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_double_pk" (geom)', "test_double_pk", "postgres")
1051        self.assertTrue(vl.isValid())
1052
1053        # 2.1. Retrieving
1054        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'")))
1055        self.assertTrue(f.isValid())
1056        self.assertEqual(f['value'], 'first teste')
1057        # 2.2. Editing
1058        self.assertTrue(vl.startEditing())
1059        vl.changeAttributeValue(f.id(), vl.fields().indexOf('value'), 'Changed first')
1060        self.assertTrue(vl.commitChanges())
1061        # 2.2.1. Checking edit from another vector layer
1062        vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_double_pk" (geom)', "test_double_pk2", "postgres")
1063        self.assertTrue(vl2.isValid())
1064        f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '3.141592653589793238462643383279502884197169'")))
1065        self.assertTrue(f2.isValid())
1066        self.assertEqual(f2['value'], 'Changed first')
1067        # 2.3. Deleting
1068        f = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'")))
1069        vl.startEditing()
1070        vl.deleteFeatures([f.id()])
1071        self.assertTrue(vl.commitChanges())
1072        # 2.3.1. Checking deletion
1073        f_iterator = vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '2.718281828459045235360287471352662497757247'"))
1074        got_feature = True
1075
1076        try:
1077            f2 = next(f_iterator)
1078            got_feature = f2.isValid()
1079        except StopIteration:
1080            got_feature = False
1081        self.assertFalse(got_feature)
1082        # 2.4. Inserting new feature
1083        newpointwkt = 'Point(-47.751 -15.644)'
1084        f = QgsFeature(vl.fields())
1085        f['pk'] = 0.22222222222222222222222
1086        f['value'] = 'newly inserted'
1087        f.setGeometry(QgsGeometry.fromWkt(newpointwkt))
1088        vl.startEditing()
1089        res, f = vl.dataProvider().addFeatures([f])
1090        self.assertTrue(res)
1091        self.assertTrue(vl.commitChanges())
1092        # 2.4.1. Checking insertion
1093        f2 = next(vl2.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '0.22222222222222222222222'")))
1094        self.assertTrue(f2.isValid())
1095        self.assertAlmostEqual(f2['pk'], 0.2222222222222222, places=15)
1096        self.assertEqual(f2['value'], 'newly inserted')
1097        assert compareWkt(f2.geometry().asWkt(), newpointwkt), "Geometry mismatch. Expected: {} Got: {} \n".format(f2.geometry().asWkt(), newpointwkt)
1098        # One more check: can we retrieve the same row with the value that we got from this layer?
1099        doublepk = f2['pk']
1100        f3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("pk = '{}'".format(doublepk))))
1101        self.assertTrue(f3.isValid())
1102        self.assertEqual(f3['value'], 'newly inserted')
1103        self.assertEqual(f3['pk'], doublepk)
1104
1105        # no NUMERIC/DECIMAL checks here. NUMERIC primary keys are unsupported.
1106        # TODO: implement NUMERIC primary keys/arbitrary precision arithmethics/fixed point math in QGIS.
1107
1108    def testPktMapInsert(self):
1109        vl = QgsVectorLayer('{} table="qgis_test"."{}" key="obj_id" sql='.format(self.dbconn, 'oid_serial_table'),
1110                            "oid_serial", "postgres")
1111        self.assertTrue(vl.isValid())
1112        f = QgsFeature(vl.fields())
1113        f['obj_id'] = vl.dataProvider().defaultValueClause(0)
1114        f['name'] = 'Test'
1115        r, f = vl.dataProvider().addFeatures([f])
1116        self.assertTrue(r)
1117        self.assertNotEqual(f[0]['obj_id'], NULL, f[0].attributes())
1118        vl.deleteFeatures([f[0].id()])
1119
1120    def testNull(self):
1121        """
1122        Asserts that 0, '' and NULL are treated as different values on insert
1123        """
1124        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' table="qgis_test"."constraints" sql=', 'test1',
1125                            'postgres')
1126        self.assertTrue(vl.isValid())
1127        QgsProject.instance().addMapLayer(vl)
1128        tg = QgsTransactionGroup()
1129        tg.addLayer(vl)
1130        vl.startEditing()
1131
1132        def onError(message):
1133            """We should not get here. If we do, fail and say why"""
1134            self.assertFalse(True, message)
1135
1136        vl.raiseError.connect(onError)
1137
1138        f = QgsFeature(vl.fields())
1139        f['gid'] = 100
1140        f['val'] = 0
1141        f['name'] = ''
1142        self.assertTrue(vl.addFeature(f))
1143        feature = next(vl.getFeatures('"gid" = 100'))
1144        self.assertEqual(f['val'], feature['val'])
1145        self.assertEqual(f['name'], feature['name'])
1146
1147    def testNestedInsert(self):
1148        tg = QgsTransactionGroup()
1149        l = self.getEditableLayer()
1150        tg.addLayer(l)
1151        l.startEditing()
1152        it = l.getFeatures()
1153        f = next(it)
1154        f['pk'] = NULL
1155        self.assertTrue(l.addFeature(f))  # Should not deadlock during an active iteration
1156        f = next(it)
1157        l.commitChanges()
1158
1159    def testTimeout(self):
1160        """
1161        Asserts that we will not deadlock if more iterators are opened in parallel than
1162        available in the connection pool
1163        """
1164        request = QgsFeatureRequest()
1165        request.setTimeout(1)
1166
1167        iterators = list()
1168        for i in range(100):
1169            iterators.append(self.vl.getFeatures(request))
1170
1171    def testTransactionDirtyName(self):
1172        # create a vector ayer based on postgres
1173        vl = QgsVectorLayer(
1174            self.dbconn +
1175            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
1176            'test', 'postgres')
1177        self.assertTrue(vl.isValid())
1178
1179        # prepare a project with transactions enabled
1180        p = QgsProject()
1181        p.setAutoTransaction(True)
1182        p.addMapLayers([vl])
1183        vl.startEditing()
1184
1185        # update the data within the transaction
1186        tr = vl.dataProvider().transaction()
1187        sql = "update qgis_test.some_poly_data set pk=1 where pk=1"
1188        name = "My Awesome Transaction!"
1189        self.assertTrue(tr.executeSql(sql, True, name)[0])
1190
1191        # test name
1192        self.assertEqual(vl.undoStack().command(0).text(), name)
1193
1194        # rollback
1195        vl.rollBack()
1196
1197    def testTransactionDirty(self):
1198        # create a vector layer based on postgres
1199        vl = QgsVectorLayer(
1200            self.dbconn +
1201            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
1202            'test', 'postgres')
1203        self.assertTrue(vl.isValid())
1204
1205        # prepare a project with transactions enabled
1206        p = QgsProject()
1207        p.setAutoTransaction(True)
1208        p.addMapLayers([vl])
1209        vl.startEditing()
1210
1211        # check that the feature used for testing is ok
1212        ft0 = vl.getFeatures('pk=1')
1213        f = QgsFeature()
1214        self.assertTrue(ft0.nextFeature(f))
1215
1216        # update the data within the transaction
1217        tr = vl.dataProvider().transaction()
1218        sql = "update qgis_test.some_poly_data set pk=33 where pk=1"
1219        self.assertTrue(tr.executeSql(sql, True)[0])
1220
1221        # check that the pk of the feature has been changed
1222        ft = vl.getFeatures('pk=1')
1223        self.assertFalse(ft.nextFeature(f))
1224
1225        ft = vl.getFeatures('pk=33')
1226        self.assertTrue(ft.nextFeature(f))
1227
1228        # underlying data has been modified but the layer is not tagged as
1229        # modified
1230        self.assertTrue(vl.isModified())
1231
1232        # undo sql query
1233        vl.undoStack().undo()
1234
1235        # check that the original feature with pk is back
1236        ft0 = vl.getFeatures('pk=1')
1237        self.assertTrue(ft0.nextFeature(f))
1238
1239        # redo
1240        vl.undoStack().redo()
1241
1242        # check that the pk of the feature has been changed
1243        ft1 = vl.getFeatures('pk=1')
1244        self.assertFalse(ft1.nextFeature(f))
1245
1246    def testTransactionConstraints(self):
1247        # create a vector layer based on postgres
1248        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' table="qgis_test"."check_constraints" sql=',
1249                            'test', 'postgres')
1250        self.assertTrue(vl.isValid())
1251
1252        # prepare a project with transactions enabled
1253        p = QgsProject()
1254        p.setAutoTransaction(True)
1255        p.addMapLayers([vl])
1256
1257        # get feature
1258        f = QgsFeature()
1259        self.assertTrue(vl.getFeatures('id=1').nextFeature(f))
1260        self.assertEqual(f.attributes(), [1, 4, 3])
1261
1262        # start edition
1263        vl.startEditing()
1264
1265        # update attribute form with a failing constraints
1266        # coming from the database if attributes are updated
1267        # one at a time.
1268        # Current feature: a = 4 / b = 3
1269        # Update feature: a = 1 / b = 0
1270        # If updated one at a time, '(a = 1) < (b = 3)' => FAIL!
1271        form = QgsAttributeForm(vl, f)
1272        for w in form.findChildren(QLabel):
1273            if w.buddy():
1274                spinBox = w.buddy()
1275                if w.text() == 'a':
1276                    spinBox.setValue(1)
1277                elif w.text() == 'b':
1278                    spinBox.setValue(0)
1279
1280        # save
1281        form.save()
1282
1283        # check new values
1284        self.assertTrue(vl.getFeatures('id=1').nextFeature(f))
1285        self.assertEqual(f.attributes(), [1, 1, 0])
1286
1287    def testTransactionTuple(self):
1288        # create a vector layer based on postgres
1289        vl = QgsVectorLayer(
1290            self.dbconn +
1291            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
1292            'test', 'postgres')
1293        self.assertTrue(vl.isValid())
1294
1295        # prepare a project with transactions enabled
1296        p = QgsProject()
1297        p.setAutoTransaction(True)
1298        p.addMapLayers([vl])
1299        vl.startEditing()
1300
1301        # execute a query which returns a tuple
1302        tr = vl.dataProvider().transaction()
1303        sql = "select * from qgis_test.some_poly_data"
1304        self.assertTrue(tr.executeSql(sql, False)[0])
1305
1306        # underlying data has not been modified
1307        self.assertFalse(vl.isModified())
1308
1309    def testDomainTypes(self):
1310        """Test that domain types are correctly mapped"""
1311
1312        vl = QgsVectorLayer('%s table="qgis_test"."domains" sql=' %
1313                            (self.dbconn), "domains", "postgres")
1314        self.assertTrue(vl.isValid())
1315
1316        fields = vl.dataProvider().fields()
1317
1318        expected = {}
1319        expected['fld_var_char_domain'] = {'type': QVariant.String, 'typeName': 'qgis_test.var_char_domain',
1320                                           'length': -1}
1321        expected['fld_var_char_domain_6'] = {'type': QVariant.String, 'typeName': 'qgis_test.var_char_domain_6',
1322                                             'length': 6}
1323        expected['fld_character_domain'] = {'type': QVariant.String, 'typeName': 'qgis_test.character_domain',
1324                                            'length': 1}
1325        expected['fld_character_domain_6'] = {'type': QVariant.String, 'typeName': 'qgis_test.character_domain_6',
1326                                              'length': 6}
1327        expected['fld_char_domain'] = {
1328            'type': QVariant.String, 'typeName': 'qgis_test.char_domain', 'length': 1}
1329        expected['fld_char_domain_6'] = {
1330            'type': QVariant.String, 'typeName': 'qgis_test.char_domain_6', 'length': 6}
1331        expected['fld_text_domain'] = {
1332            'type': QVariant.String, 'typeName': 'qgis_test.text_domain', 'length': -1}
1333        expected['fld_numeric_domain'] = {'type': QVariant.Double, 'typeName': 'qgis_test.numeric_domain', 'length': 10,
1334                                          'precision': 4}
1335
1336        for f, e in list(expected.items()):
1337            self.assertEqual(
1338                fields.at(fields.indexFromName(f)).type(), e['type'])
1339            self.assertEqual(fields.at(fields.indexFromName(f)
1340                                       ).typeName(), e['typeName'])
1341            self.assertEqual(
1342                fields.at(fields.indexFromName(f)).length(), e['length'])
1343            if 'precision' in e:
1344                self.assertEqual(
1345                    fields.at(fields.indexFromName(f)).precision(), e['precision'])
1346
1347    def testRenameAttributes(self):
1348        ''' Test renameAttributes() '''
1349        vl = QgsVectorLayer('%s table="qgis_test"."rename_table" sql=' % (
1350            self.dbconn), "renames", "postgres")
1351        provider = vl.dataProvider()
1352        provider.renameAttributes({1: 'field1', 2: 'field2'})
1353
1354        # bad rename
1355        self.assertFalse(provider.renameAttributes({-1: 'not_a_field'}))
1356        self.assertFalse(provider.renameAttributes({100: 'not_a_field'}))
1357        # already exists
1358        self.assertFalse(provider.renameAttributes({1: 'field2'}))
1359
1360        # rename one field
1361        self.assertTrue(provider.renameAttributes({1: 'newname'}))
1362        self.assertEqual(provider.fields().at(1).name(), 'newname')
1363        vl.updateFields()
1364        fet = next(vl.getFeatures())
1365        self.assertEqual(fet.fields()[1].name(), 'newname')
1366
1367        # rename two fields
1368        self.assertTrue(provider.renameAttributes(
1369            {1: 'newname2', 2: 'another'}))
1370        self.assertEqual(provider.fields().at(1).name(), 'newname2')
1371        self.assertEqual(provider.fields().at(2).name(), 'another')
1372        vl.updateFields()
1373        fet = next(vl.getFeatures())
1374        self.assertEqual(fet.fields()[1].name(), 'newname2')
1375        self.assertEqual(fet.fields()[2].name(), 'another')
1376
1377        # close layer and reopen, then recheck to confirm that changes were saved to db
1378        del vl
1379        vl = None
1380        vl = QgsVectorLayer('%s table="qgis_test"."rename_table" sql=' % (
1381            self.dbconn), "renames", "postgres")
1382        provider = vl.dataProvider()
1383        self.assertEqual(provider.fields().at(1).name(), 'newname2')
1384        self.assertEqual(provider.fields().at(2).name(), 'another')
1385        fet = next(vl.getFeatures())
1386        self.assertEqual(fet.fields()[1].name(), 'newname2')
1387        self.assertEqual(fet.fields()[2].name(), 'another')
1388
1389    def testEditorWidgetTypes(self):
1390        """Test that editor widget types can be fetched from the qgis_editor_widget_styles table"""
1391
1392        vl = QgsVectorLayer('%s table="qgis_test"."widget_styles" sql=' % (
1393            self.dbconn), "widget_styles", "postgres")
1394        self.assertTrue(vl.isValid())
1395        fields = vl.dataProvider().fields()
1396
1397        setup1 = fields.field("fld1").editorWidgetSetup()
1398        self.assertFalse(setup1.isNull())
1399        self.assertEqual(setup1.type(), "FooEdit")
1400        self.assertEqual(setup1.config(), {"param1": "value1", "param2": "2"})
1401
1402        best1 = QgsGui.editorWidgetRegistry().findBest(vl, "fld1")
1403        self.assertEqual(best1.type(), "FooEdit")
1404        self.assertEqual(best1.config(), setup1.config())
1405
1406        self.assertTrue(fields.field("fld2").editorWidgetSetup().isNull())
1407
1408        best2 = QgsGui.editorWidgetRegistry().findBest(vl, "fld2")
1409        self.assertEqual(best2.type(), "TextEdit")
1410
1411    def testHstore(self):
1412        vl = QgsVectorLayer('%s table="qgis_test"."dict" sql=' %
1413                            (self.dbconn), "testhstore", "postgres")
1414        self.assertTrue(vl.isValid())
1415
1416        fields = vl.dataProvider().fields()
1417        self.assertEqual(
1418            fields.at(fields.indexFromName('value')).type(), QVariant.Map)
1419
1420        f = next(vl.getFeatures(QgsFeatureRequest()))
1421
1422        value_idx = vl.fields().lookupField('value')
1423        self.assertIsInstance(f.attributes()[value_idx], dict)
1424        self.assertEqual(f.attributes()[value_idx], {'a': 'b', '1': '2'})
1425
1426        new_f = QgsFeature(vl.fields())
1427        new_f['pk'] = NULL
1428        new_f['value'] = {'simple': '1', 'doubleQuote': '"y"',
1429                          'quote': "'q'", 'backslash': '\\'}
1430        r, fs = vl.dataProvider().addFeatures([new_f])
1431        self.assertTrue(r)
1432        new_pk = fs[0]['pk']
1433        self.assertNotEqual(new_pk, NULL, fs[0].attributes())
1434
1435        try:
1436            read_back = vl.getFeature(new_pk)
1437            self.assertEqual(read_back['pk'], new_pk)
1438            self.assertEqual(read_back['value'], new_f['value'])
1439        finally:
1440            self.assertTrue(vl.startEditing())
1441            self.assertTrue(vl.deleteFeatures([new_pk]))
1442            self.assertTrue(vl.commitChanges())
1443
1444    def testJson(self):
1445        vl = QgsVectorLayer('%s table="qgis_test"."json" sql=' %
1446                            (self.dbconn), "testjson", "postgres")
1447        self.assertTrue(vl.isValid())
1448
1449        # Backup test table (will be edited)
1450        tableBackup = self.scopedTableBackup('qgis_test', 'json')
1451
1452        attrs = (
1453            123,
1454            1233.45,
1455            None,
1456            True,
1457            False,
1458            r"String literal with \"quotes\" 'and' other funny chars []{};#/èé*",
1459            [1, 2, 3.4, None],
1460            [True, False],
1461            {'a': 123, 'b': 123.34, 'c': 'a string', 'd': [
1462                1, 2, 3], 'e': {'a': 123, 'b': 123.45}}
1463        )
1464        attrs2 = (
1465            246,
1466            2466.91,
1467            None,
1468            True,
1469            False,
1470            r"Yet another string literal with \"quotes\" 'and' other funny chars: π []{};#/èé*",
1471            [2, 4, 3.14159, None],
1472            [True, False],
1473            {'a': 246, 'b': 246.68, 'c': 'a rounded area: π × r²', 'd': [
1474                1, 2, 3], 'e': {'a': 246, 'b': 246.91}}
1475        )
1476        json_idx = vl.fields().lookupField('jvalue')
1477        jsonb_idx = vl.fields().lookupField('jbvalue')
1478
1479        for attr in attrs:
1480            # Add a new feature
1481            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1482                self.dbconn), "testjson", "postgres")
1483            self.assertTrue(vl2.startEditing())
1484            f = QgsFeature(vl2.fields())
1485            f.setAttributes([None, attr, attr])
1486            self.assertTrue(vl2.addFeatures([f]))
1487            self.assertTrue(vl2.commitChanges(), attr)
1488            # Read back
1489            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1490                self.dbconn), "testjson", "postgres")
1491            fid = [f.id() for f in vl2.getFeatures()][-1]
1492            f = vl2.getFeature(fid)
1493            self.assertEqual(f.attributes(), [fid, attr, attr])
1494            # Change attribute values
1495            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1496                self.dbconn), "testjson", "postgres")
1497            fid = [f.id() for f in vl2.getFeatures()][-1]
1498            self.assertTrue(vl2.startEditing())
1499            self.assertTrue(vl2.changeAttributeValues(
1500                fid, {json_idx: attr, jsonb_idx: attr}))
1501            self.assertTrue(vl2.commitChanges())
1502            # Read back
1503            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1504                self.dbconn), "testjson", "postgres")
1505            f = vl2.getFeature(fid)
1506            self.assertEqual(f.attributes(), [fid, attr, attr])
1507
1508        # Let's check changeFeatures:
1509        for attr in attrs2:
1510            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1511                self.dbconn), "testjson", "postgres")
1512            fid = [f.id() for f in vl2.getFeatures()][-1]
1513            self.assertTrue(vl2.startEditing())
1514            self.assertTrue(vl2.dataProvider().changeFeatures({fid: {json_idx: attr, jsonb_idx: attr}}, {}))
1515            self.assertTrue(vl2.commitChanges())
1516
1517            # Read back again
1518            vl2 = QgsVectorLayer('%s table="qgis_test"."json" sql=' % (
1519                self.dbconn), "testjson", "postgres")
1520            f = vl2.getFeature(fid)
1521            self.assertEqual(f.attributes(), [fid, attr, attr])
1522
1523    def testStringArray(self):
1524        vl = QgsVectorLayer('%s table="qgis_test"."string_array" sql=' % (
1525            self.dbconn), "teststringarray", "postgres")
1526        self.assertTrue(vl.isValid())
1527
1528        fields = vl.dataProvider().fields()
1529        self.assertEqual(fields.at(fields.indexFromName(
1530            'value')).type(), QVariant.StringList)
1531        self.assertEqual(fields.at(fields.indexFromName(
1532            'value')).subType(), QVariant.String)
1533
1534        f = next(vl.getFeatures(QgsFeatureRequest()))
1535
1536        value_idx = vl.fields().lookupField('value')
1537        self.assertIsInstance(f.attributes()[value_idx], list)
1538        self.assertEqual(f.attributes()[value_idx], ['a', 'b', 'c'])
1539
1540        new_f = QgsFeature(vl.fields())
1541        new_f['pk'] = NULL
1542        new_f['value'] = ['simple', '"doubleQuote"', "'quote'", 'back\\slash']
1543        r, fs = vl.dataProvider().addFeatures([new_f])
1544        self.assertTrue(r)
1545        new_pk = fs[0]['pk']
1546        self.assertNotEqual(new_pk, NULL, fs[0].attributes())
1547
1548        try:
1549            read_back = vl.getFeature(new_pk)
1550            self.assertEqual(read_back['pk'], new_pk)
1551            self.assertEqual(read_back['value'], new_f['value'])
1552        finally:
1553            self.assertTrue(vl.startEditing())
1554            self.assertTrue(vl.deleteFeatures([new_pk]))
1555            self.assertTrue(vl.commitChanges())
1556
1557    def testIntArray(self):
1558        vl = QgsVectorLayer('%s table="qgis_test"."int_array" sql=' % (
1559            self.dbconn), "testintarray", "postgres")
1560        self.assertTrue(vl.isValid())
1561
1562        fields = vl.dataProvider().fields()
1563        self.assertEqual(
1564            fields.at(fields.indexFromName('value')).type(), QVariant.List)
1565        self.assertEqual(fields.at(fields.indexFromName(
1566            'value')).subType(), QVariant.Int)
1567
1568        f = next(vl.getFeatures(QgsFeatureRequest()))
1569
1570        value_idx = vl.fields().lookupField('value')
1571        self.assertIsInstance(f.attributes()[value_idx], list)
1572        self.assertEqual(f.attributes()[value_idx], [1, 2, -5])
1573
1574    def testDoubleArray(self):
1575        vl = QgsVectorLayer('%s table="qgis_test"."double_array" sql=' % (
1576            self.dbconn), "testdoublearray", "postgres")
1577        self.assertTrue(vl.isValid())
1578
1579        fields = vl.dataProvider().fields()
1580        self.assertEqual(
1581            fields.at(fields.indexFromName('value')).type(), QVariant.List)
1582        self.assertEqual(fields.at(fields.indexFromName(
1583            'value')).subType(), QVariant.Double)
1584
1585        f = next(vl.getFeatures(QgsFeatureRequest()))
1586
1587        value_idx = vl.fields().lookupField('value')
1588        self.assertIsInstance(f.attributes()[value_idx], list)
1589        self.assertEqual(f.attributes()[value_idx], [1.1, 2, -5.12345])
1590
1591    def testNotNullConstraint(self):
1592        vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % (
1593            self.dbconn), "constraints", "postgres")
1594        self.assertTrue(vl.isValid())
1595        self.assertEqual(len(vl.fields()), 4)
1596
1597        # test some bad field indexes
1598        self.assertEqual(vl.dataProvider().fieldConstraints(-1),
1599                         QgsFieldConstraints.Constraints())
1600        self.assertEqual(vl.dataProvider().fieldConstraints(
1601            1001), QgsFieldConstraints.Constraints())
1602
1603        self.assertTrue(vl.dataProvider().fieldConstraints(0) &
1604                        QgsFieldConstraints.ConstraintNotNull)
1605        self.assertFalse(vl.dataProvider().fieldConstraints(1)
1606                         & QgsFieldConstraints.ConstraintNotNull)
1607        self.assertTrue(vl.dataProvider().fieldConstraints(2) &
1608                        QgsFieldConstraints.ConstraintNotNull)
1609        self.assertFalse(vl.dataProvider().fieldConstraints(3)
1610                         & QgsFieldConstraints.ConstraintNotNull)
1611
1612        # test that constraints have been saved to fields correctly
1613        fields = vl.fields()
1614        self.assertTrue(fields.at(0).constraints().constraints()
1615                        & QgsFieldConstraints.ConstraintNotNull)
1616        self.assertEqual(fields.at(0).constraints().constraintOrigin(QgsFieldConstraints.ConstraintNotNull),
1617                         QgsFieldConstraints.ConstraintOriginProvider)
1618        self.assertFalse(fields.at(1).constraints().constraints()
1619                         & QgsFieldConstraints.ConstraintNotNull)
1620        self.assertTrue(fields.at(2).constraints().constraints()
1621                        & QgsFieldConstraints.ConstraintNotNull)
1622        self.assertEqual(fields.at(2).constraints().constraintOrigin(QgsFieldConstraints.ConstraintNotNull),
1623                         QgsFieldConstraints.ConstraintOriginProvider)
1624        self.assertFalse(fields.at(3).constraints().constraints()
1625                         & QgsFieldConstraints.ConstraintNotNull)
1626
1627    def testUniqueConstraint(self):
1628        vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % (
1629            self.dbconn), "constraints", "postgres")
1630        self.assertTrue(vl.isValid())
1631        self.assertEqual(len(vl.fields()), 4)
1632
1633        # test some bad field indexes
1634        self.assertEqual(vl.dataProvider().fieldConstraints(-1),
1635                         QgsFieldConstraints.Constraints())
1636        self.assertEqual(vl.dataProvider().fieldConstraints(
1637            1001), QgsFieldConstraints.Constraints())
1638
1639        self.assertTrue(vl.dataProvider().fieldConstraints(0)
1640                        & QgsFieldConstraints.ConstraintUnique)
1641        self.assertTrue(vl.dataProvider().fieldConstraints(1)
1642                        & QgsFieldConstraints.ConstraintUnique)
1643        self.assertTrue(vl.dataProvider().fieldConstraints(2)
1644                        & QgsFieldConstraints.ConstraintUnique)
1645        self.assertFalse(vl.dataProvider().fieldConstraints(3)
1646                         & QgsFieldConstraints.ConstraintUnique)
1647
1648        # test that constraints have been saved to fields correctly
1649        fields = vl.fields()
1650        self.assertTrue(fields.at(0).constraints().constraints()
1651                        & QgsFieldConstraints.ConstraintUnique)
1652        self.assertEqual(fields.at(0).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique),
1653                         QgsFieldConstraints.ConstraintOriginProvider)
1654        self.assertTrue(fields.at(1).constraints().constraints()
1655                        & QgsFieldConstraints.ConstraintUnique)
1656        self.assertEqual(fields.at(1).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique),
1657                         QgsFieldConstraints.ConstraintOriginProvider)
1658        self.assertTrue(fields.at(2).constraints().constraints()
1659                        & QgsFieldConstraints.ConstraintUnique)
1660        self.assertEqual(fields.at(2).constraints().constraintOrigin(QgsFieldConstraints.ConstraintUnique),
1661                         QgsFieldConstraints.ConstraintOriginProvider)
1662        self.assertFalse(fields.at(3).constraints().constraints()
1663                         & QgsFieldConstraints.ConstraintUnique)
1664
1665    def testConstraintOverwrite(self):
1666        """ test that Postgres provider constraints can't be overwritten by vector layer method """
1667        vl = QgsVectorLayer('%s table="qgis_test"."constraints" sql=' % (
1668            self.dbconn), "constraints", "postgres")
1669        self.assertTrue(vl.isValid())
1670
1671        self.assertTrue(vl.dataProvider().fieldConstraints(0) &
1672                        QgsFieldConstraints.ConstraintNotNull)
1673        self.assertTrue(vl.fields().at(0).constraints().constraints()
1674                        & QgsFieldConstraints.ConstraintNotNull)
1675
1676        # add a constraint at the layer level
1677        vl.setFieldConstraint(0, QgsFieldConstraints.ConstraintUnique)
1678
1679        # should be no change at provider level
1680        self.assertTrue(vl.dataProvider().fieldConstraints(0) &
1681                        QgsFieldConstraints.ConstraintNotNull)
1682
1683        # but layer should still keep provider constraints...
1684        self.assertTrue(vl.fields().at(0).constraints().constraints()
1685                        & QgsFieldConstraints.ConstraintNotNull)
1686        self.assertTrue(vl.fieldConstraints(
1687            0) & QgsFieldConstraints.ConstraintNotNull)
1688        # ...in addition to layer level constraint
1689        self.assertTrue(vl.fields().at(0).constraints(
1690        ).constraints() & QgsFieldConstraints.ConstraintUnique)
1691        self.assertTrue(vl.fieldConstraints(
1692            0) & QgsFieldConstraints.ConstraintUnique)
1693
1694    def testVectorLayerUtilsUniqueWithProviderDefault(self):
1695        vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' %
1696                            (self.dbconn), "someData", "postgres")
1697        default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)'
1698        vl.dataProvider().setProviderProperty(
1699            QgsDataProvider.EvaluateDefaultValues, False)
1700        self.assertEqual(
1701            vl.dataProvider().defaultValueClause(0), default_clause)
1702        self.assertTrue(QgsVectorLayerUtils.valueExists(vl, 0, 4))
1703
1704        vl.startEditing()
1705        f = QgsFeature(vl.fields())
1706        f.setAttribute(0, default_clause)
1707        self.assertFalse(
1708            QgsVectorLayerUtils.valueExists(vl, 0, default_clause))
1709        self.assertTrue(vl.addFeatures([f]))
1710
1711        # the default value clause should exist...
1712        self.assertTrue(QgsVectorLayerUtils.valueExists(vl, 0, default_clause))
1713        # but it should not prevent the attribute being validated
1714        self.assertTrue(QgsVectorLayerUtils.validateAttribute(vl, f, 0))
1715        vl.rollBack()
1716
1717    def testSkipConstraintCheck(self):
1718        vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' %
1719                            (self.dbconn), "someData", "postgres")
1720        default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)'
1721        vl.dataProvider().setProviderProperty(
1722            QgsDataProvider.EvaluateDefaultValues, False)
1723        self.assertTrue(vl.dataProvider().skipConstraintCheck(
1724            0, QgsFieldConstraints.ConstraintUnique, default_clause))
1725        self.assertFalse(vl.dataProvider().skipConstraintCheck(
1726            0, QgsFieldConstraints.ConstraintUnique, 59))
1727
1728    def testVectorLayerUtilsCreateFeatureWithProviderDefault(self):
1729        vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' %
1730                            (self.dbconn), "someData", "postgres")
1731        default_clause = 'nextval(\'qgis_test."someData_pk_seq"\'::regclass)'
1732        self.assertEqual(
1733            vl.dataProvider().defaultValueClause(0), default_clause)
1734
1735        # If an attribute map is provided, QgsVectorLayerUtils.createFeature must
1736        # respect it, otherwise default values from provider are checked.
1737        # User's choice will not be respected if the value violates unique constraints.
1738        # See https://github.com/qgis/QGIS/issues/27758
1739        f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5, 3: 'map'})
1740        # changed so that createFeature respects user choice
1741        self.assertEqual(f.attributes(), [
1742                         default_clause, 5, "'qgis'::text", 'map', None, None, None, None, None])
1743
1744        vl.setDefaultValueDefinition(3, QgsDefaultValue("'mappy'"))
1745        # test ignore vector layer default value expression overrides postgres provider default clause,
1746        # due to user's choice
1747        f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5, 3: 'map'})
1748        self.assertEqual(f.attributes(), [
1749                         default_clause, 5, "'qgis'::text", 'map', None, None, None, None, None])
1750        # Since user did not enter a default for field 3, test must return the default value chosen
1751        f = QgsVectorLayerUtils.createFeature(vl, attributes={1: 5})
1752        self.assertEqual(f.attributes(), [
1753                         default_clause, 5, "'qgis'::text", 'mappy', None, None, None, None, None])
1754
1755    # See https://github.com/qgis/QGIS/issues/23127
1756    def testNumericPrecision(self):
1757        uri = 'point?field=f1:int'
1758        uri += '&field=f2:double(6,4)'
1759        uri += '&field=f3:string(20)'
1760        lyr = QgsVectorLayer(uri, "x", "memory")
1761        self.assertTrue(lyr.isValid())
1762        f = QgsFeature(lyr.fields())
1763        f['f1'] = 1
1764        f['f2'] = 123.456
1765        f['f3'] = '12345678.90123456789'
1766        lyr.dataProvider().addFeatures([f])
1767        uri = '%s table="qgis_test"."b18155" (g) key=\'f1\'' % (self.dbconn)
1768        self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.b18155')
1769        err = QgsVectorLayerExporter.exportLayer(
1770            lyr, uri, "postgres", lyr.crs())
1771        self.assertEqual(err[0], QgsVectorLayerExporter.NoError,
1772                         'unexpected import error {0}'.format(err))
1773        lyr = QgsVectorLayer(uri, "y", "postgres")
1774        self.assertTrue(lyr.isValid())
1775        f = next(lyr.getFeatures())
1776        self.assertEqual(f['f1'], 1)
1777        self.assertEqual(f['f2'], 123.456)
1778        self.assertEqual(f['f3'], '12345678.90123456789')
1779
1780    # See https://github.com/qgis/QGIS/issues/23163
1781    def testImportKey(self):
1782        uri = 'point?field=f1:int'
1783        uri += '&field=F2:double(6,4)'
1784        uri += '&field=f3:string(20)'
1785        lyr = QgsVectorLayer(uri, "x", "memory")
1786        self.assertTrue(lyr.isValid())
1787
1788        def testKey(lyr, key, kfnames):
1789            self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.import_test')
1790            uri = '%s table="qgis_test"."import_test" (g)' % self.dbconn
1791            if key is not None:
1792                uri += ' key=\'%s\'' % key
1793            err = QgsVectorLayerExporter.exportLayer(
1794                lyr, uri, "postgres", lyr.crs())
1795            self.assertEqual(err[0], QgsVectorLayerExporter.NoError,
1796                             'unexpected import error {0}'.format(err))
1797            olyr = QgsVectorLayer(uri, "y", "postgres")
1798            self.assertTrue(olyr.isValid())
1799            flds = lyr.fields()
1800            oflds = olyr.fields()
1801            if key is None:
1802                # if the pkey was not given, it will create a pkey
1803                self.assertEqual(oflds.size(), flds.size() + 1)
1804                self.assertEqual(oflds[0].name(), kfnames[0])
1805                for i in range(flds.size()):
1806                    self.assertEqual(oflds[i + 1].name(), flds[i].name())
1807            else:
1808                # pkey was given, no extra field generated
1809                self.assertEqual(oflds.size(), flds.size())
1810                for i in range(oflds.size()):
1811                    self.assertEqual(oflds[i].name(), flds[i].name())
1812            pks = olyr.primaryKeyAttributes()
1813            self.assertEqual(len(pks), len(kfnames))
1814            for i in range(0, len(kfnames)):
1815                self.assertEqual(oflds[pks[i]].name(), kfnames[i])
1816
1817        testKey(lyr, 'f1', ['f1'])
1818        testKey(lyr, '"f1"', ['f1'])
1819        testKey(lyr, '"f1","F2"', ['f1', 'F2'])
1820        testKey(lyr, '"f1","F2","f3"', ['f1', 'F2', 'f3'])
1821        testKey(lyr, None, ['id'])
1822
1823    # See https://github.com/qgis/QGIS/issues/25415
1824    def testImportWithoutSchema(self):
1825
1826        def _test(table, schema=None):
1827            self.execSQLCommand('DROP TABLE IF EXISTS %s CASCADE' % table)
1828            uri = 'point?field=f1:int'
1829            uri += '&field=F2:double(6,4)'
1830            uri += '&field=f3:string(20)'
1831            lyr = QgsVectorLayer(uri, "x", "memory")
1832            self.assertTrue(lyr.isValid())
1833
1834            table = ("%s" % table) if schema is None else (
1835                "\"%s\".\"%s\"" % (schema, table))
1836            dest_uri = "%s sslmode=disable table=%s  (geom) sql" % (
1837                self.dbconn, table)
1838            QgsVectorLayerExporter.exportLayer(
1839                lyr, dest_uri, "postgres", lyr.crs())
1840            olyr = QgsVectorLayer(dest_uri, "y", "postgres")
1841            self.assertTrue(olyr.isValid(), "Failed URI: %s" % dest_uri)
1842
1843        # Test bug 17518
1844        _test('b17518')
1845
1846        # Test fully qualified table (with schema)
1847        _test("b17518", "qgis_test")
1848
1849        # Test empty schema
1850        _test("b17518", "")
1851
1852        # Test public schema
1853        _test("b17518", "public")
1854
1855        # Test fully qualified table (with wrong schema)
1856        with self.assertRaises(AssertionError):
1857            _test("b17518", "qgis_test_wrong")
1858
1859    def testStyle(self):
1860        self.execSQLCommand('DROP TABLE IF EXISTS layer_styles CASCADE')
1861
1862        vl = self.getEditableLayer()
1863        self.assertTrue(vl.isValid())
1864        self.assertTrue(
1865            vl.dataProvider().isSaveAndLoadStyleToDatabaseSupported())
1866        self.assertTrue(vl.dataProvider().isDeleteStyleFromDatabaseSupported())
1867
1868        # table layer_styles does not exit
1869        related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase()
1870        self.assertEqual(related_count, -1)
1871        self.assertEqual(idlist, [])
1872        self.assertEqual(namelist, [])
1873        self.assertEqual(desclist, [])
1874        self.assertNotEqual(errmsg, "")
1875
1876        qml, errmsg = vl.getStyleFromDatabase("1")
1877        self.assertEqual(qml, "")
1878        self.assertNotEqual(errmsg, "")
1879
1880        mFilePath = QDir.toNativeSeparators(
1881            '%s/symbol_layer/%s.qml' % (unitTestDataPath(), "singleSymbol"))
1882        status = vl.loadNamedStyle(mFilePath)
1883        self.assertTrue(status)
1884
1885        # The style is saved as non-default
1886        errorMsg = vl.saveStyleToDatabase(
1887            "by day", "faded greens and elegant patterns", False, "")
1888        self.assertEqual(errorMsg, "")
1889
1890        # the style id should be "1", not "by day"
1891        qml, errmsg = vl.getStyleFromDatabase("by day")
1892        self.assertEqual(qml, "")
1893        self.assertNotEqual(errmsg, "")
1894
1895        related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase()
1896        self.assertEqual(related_count, 1)
1897        self.assertEqual(errmsg, "")
1898        self.assertEqual(idlist, ["1"])
1899        self.assertEqual(namelist, ["by day"])
1900        self.assertEqual(desclist, ["faded greens and elegant patterns"])
1901
1902        qml, errmsg = vl.getStyleFromDatabase("100")
1903        self.assertEqual(qml, "")
1904        self.assertNotEqual(errmsg, "")
1905
1906        qml, errmsg = vl.getStyleFromDatabase("1")
1907        self.assertTrue(qml.startswith('<!DOCTYPE qgis'), qml)
1908        self.assertEqual(errmsg, "")
1909
1910        res, errmsg = vl.deleteStyleFromDatabase("100")
1911        self.assertTrue(res)
1912        self.assertEqual(errmsg, "")
1913
1914        res, errmsg = vl.deleteStyleFromDatabase("1")
1915        self.assertTrue(res)
1916        self.assertEqual(errmsg, "")
1917
1918        # We save now the style again twice but with one as default
1919        errorMsg = vl.saveStyleToDatabase(
1920            "related style", "faded greens and elegant patterns", False, "")
1921        self.assertEqual(errorMsg, "")
1922        errorMsg = vl.saveStyleToDatabase(
1923            "default style", "faded greens and elegant patterns", True, "")
1924        self.assertEqual(errorMsg, "")
1925
1926        related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase()
1927        self.assertEqual(related_count, 2)
1928        self.assertEqual(errmsg, "")
1929        self.assertEqual(idlist, ["3", "2"])  # Ids must be reversed.
1930        self.assertEqual(namelist, ["default style", "related style"])
1931        self.assertEqual(desclist, ["faded greens and elegant patterns"] * 2)
1932
1933        # We remove these 2 styles
1934        res, errmsg = vl.deleteStyleFromDatabase("2")
1935        self.assertTrue(res)
1936        self.assertEqual(errmsg, "")
1937        res, errmsg = vl.deleteStyleFromDatabase("3")
1938        self.assertTrue(res)
1939        self.assertEqual(errmsg, "")
1940
1941        # table layer_styles does exit, but is now empty
1942        related_count, idlist, namelist, desclist, errmsg = vl.listStylesInDatabase()
1943        self.assertEqual(related_count, 0)
1944        self.assertEqual(idlist, [])
1945        self.assertEqual(namelist, [])
1946        self.assertEqual(desclist, [])
1947        self.assertEqual(errmsg, "")
1948
1949    def testStyleWithGeometryType(self):
1950        """Test saving styles with the additional geometry type
1951        Layers are created from geometries_table
1952        """
1953
1954        myconn = 'service=\'qgis_test\''
1955        if 'QGIS_PGTEST_DB' in os.environ:
1956            myconn = os.environ['QGIS_PGTEST_DB']
1957
1958        # point layer
1959        myPoint = QgsVectorLayer(
1960            myconn +
1961            ' sslmode=disable srid=4326 type=POINT table="qgis_test"."geometries_table" (geom) sql=', 'Point',
1962            'postgres')
1963        self.assertTrue(myPoint.isValid())
1964        myPoint.saveStyleToDatabase('myPointStyle', '', False, '')
1965
1966        # polygon layer
1967        myPolygon = QgsVectorLayer(
1968            myconn +
1969            ' sslmode=disable srid=4326 type=POLYGON table="qgis_test"."geometries_table" (geom) sql=', 'Poly',
1970            'postgres')
1971        self.assertTrue(myPoint.isValid())
1972        myPolygon.saveStyleToDatabase('myPolygonStyle', '', False, '')
1973
1974        # how many
1975        related_count, idlist, namelist, desclist, errmsg = myPolygon.listStylesInDatabase()
1976        self.assertEqual(len(idlist), 2)
1977        self.assertEqual(namelist, ['myPolygonStyle', 'myPointStyle'])
1978
1979        # raw psycopg2 query
1980        self.assertTrue(self.con)
1981        cur = self.con.cursor()
1982        self.assertTrue(cur)
1983        cur.execute("select stylename, type from layer_styles order by type")
1984        self.assertEqual(cur.fetchall(), [
1985                         ('myPointStyle', 'Point'), ('myPolygonStyle', 'Polygon')])
1986        cur.close()
1987
1988        # delete them
1989        myPolygon.deleteStyleFromDatabase(idlist[1])
1990        myPolygon.deleteStyleFromDatabase(idlist[0])
1991        styles = myPolygon.listStylesInDatabase()
1992        ids = styles[1]
1993        self.assertEqual(len(ids), 0)
1994
1995    def testSaveStyleInvalidXML(self):
1996
1997        self.execSQLCommand('DROP TABLE IF EXISTS layer_styles CASCADE')
1998
1999        vl = self.getEditableLayer()
2000        self.assertTrue(vl.isValid())
2001        self.assertTrue(
2002            vl.dataProvider().isSaveAndLoadStyleToDatabaseSupported())
2003        self.assertTrue(vl.dataProvider().isDeleteStyleFromDatabaseSupported())
2004
2005        mFilePath = QDir.toNativeSeparators(
2006            '%s/symbol_layer/%s.qml' % (unitTestDataPath(), "fontSymbol"))
2007        status = vl.loadNamedStyle(mFilePath)
2008        self.assertTrue(status)
2009
2010        errorMsg = vl.saveStyleToDatabase(
2011            "fontSymbol", "font with invalid utf8 char", False, "")
2012        self.assertEqual(errorMsg, "")
2013
2014        qml, errmsg = vl.getStyleFromDatabase("1")
2015        self.assertTrue('v="\u001E"' in qml)
2016        self.assertEqual(errmsg, "")
2017
2018        # Test loadStyle from metadata
2019        md = QgsProviderRegistry.instance().providerMetadata('postgres')
2020        qml = md.loadStyle(self.dbconn + " type=POINT table=\"qgis_test\".\"editData\" (geom)", 'fontSymbol')
2021        self.assertTrue(qml.startswith('<!DOCTYPE qgi'), qml)
2022        self.assertTrue('v="\u001E"' in qml)
2023
2024    def testHasMetadata(self):
2025        # views don't have metadata
2026        vl = QgsVectorLayer('{} table="qgis_test"."{}" key="pk" sql='.format(self.dbconn, 'bikes_view'), "bikes_view",
2027                            "postgres")
2028        self.assertTrue(vl.isValid())
2029        self.assertFalse(vl.dataProvider().hasMetadata())
2030
2031        # ordinary tables have metadata
2032        vl = QgsVectorLayer('%s table="qgis_test"."someData" sql=' %
2033                            (self.dbconn), "someData", "postgres")
2034        self.assertTrue(vl.isValid())
2035        self.assertTrue(vl.dataProvider().hasMetadata())
2036
2037    def testReadExtentOnView(self):
2038        # vector layer based on view
2039        vl0 = QgsVectorLayer(
2040            self.dbconn +
2041            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data_view" (geom) sql=',
2042            'test', 'postgres')
2043        self.assertTrue(vl0.isValid())
2044        self.assertFalse(vl0.dataProvider().hasMetadata())
2045
2046        # set a custom extent
2047        originalExtent = vl0.extent()
2048
2049        customExtent = QgsRectangle(-80, 80, -70, 90)
2050        vl0.setExtent(customExtent)
2051
2052        # write xml
2053        doc = QDomDocument("testdoc")
2054        elem = doc.createElement("maplayer")
2055        self.assertTrue(vl0.writeLayerXml(elem, doc, QgsReadWriteContext()))
2056
2057        # read xml with the custom extent. It should not be used by default
2058        vl1 = QgsVectorLayer()
2059        vl1.readLayerXml(elem, QgsReadWriteContext())
2060        self.assertTrue(vl1.isValid())
2061
2062        self.assertEqual(vl1.extent(), originalExtent)
2063
2064        # read xml with custom extent with readExtent option. Extent read from
2065        # xml document should be used because we have a view
2066        vl2 = QgsVectorLayer()
2067        vl2.setReadExtentFromXml(True)
2068        vl2.readLayerXml(elem, QgsReadWriteContext())
2069        self.assertTrue(vl2.isValid())
2070
2071        self.assertEqual(vl2.extent(), customExtent)
2072
2073        # but a force update on extent should allow retrieveing the data
2074        # provider extent
2075        vl2.updateExtents()
2076        vl2.readLayerXml(elem, QgsReadWriteContext())
2077        self.assertEqual(vl2.extent(), customExtent)
2078
2079        vl2.updateExtents(force=True)
2080        vl2.readLayerXml(elem, QgsReadWriteContext())
2081        self.assertEqual(vl2.extent(), originalExtent)
2082
2083    def testReadExtentOnTable(self):
2084        # vector layer based on a standard table
2085        vl0 = QgsVectorLayer(
2086            self.dbconn +
2087            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
2088            'test', 'postgres')
2089        self.assertTrue(vl0.isValid())
2090        self.assertTrue(vl0.dataProvider().hasMetadata())
2091
2092        # set a custom extent
2093        originalExtent = vl0.extent()
2094
2095        customExtent = QgsRectangle(-80, 80, -70, 90)
2096        vl0.setExtent(customExtent)
2097
2098        # write xml
2099        doc = QDomDocument("testdoc")
2100        elem = doc.createElement("maplayer")
2101        self.assertTrue(vl0.writeLayerXml(elem, doc, QgsReadWriteContext()))
2102
2103        # read xml with the custom extent. It should not be used by default
2104        vl1 = QgsVectorLayer()
2105        vl1.readLayerXml(elem, QgsReadWriteContext())
2106        self.assertTrue(vl1.isValid())
2107
2108        self.assertEqual(vl1.extent(), originalExtent)
2109
2110        # read xml with custom extent with readExtent option. Extent read from
2111        # xml document should be used even if we don't have a view or a
2112        # materialized view
2113        vl2 = QgsVectorLayer()
2114        vl2.setReadExtentFromXml(True)
2115        vl2.readLayerXml(elem, QgsReadWriteContext())
2116        self.assertTrue(vl2.isValid())
2117
2118        self.assertEqual(vl2.extent(), customExtent)
2119
2120        # but a force update on extent should allow retrieveing the data
2121        # provider extent
2122        vl2.updateExtents()
2123        vl2.readLayerXml(elem, QgsReadWriteContext())
2124        self.assertEqual(vl2.extent(), customExtent)
2125
2126        vl2.updateExtents(force=True)
2127        vl2.readLayerXml(elem, QgsReadWriteContext())
2128        self.assertEqual(vl2.extent(), originalExtent)
2129
2130    def testPreparedFailure(self):
2131        """Test error from issue GH #45100"""
2132
2133        layer = self.getEditableLayerWithCheckConstraint()
2134        self.assertTrue(layer.startEditing())
2135        old_value = layer.getFeature(1).attribute('i_will_fail_on_no_name')
2136        layer.changeAttributeValue(1, 1, 'no name')
2137        layer.changeGeometry(1, QgsGeometry.fromWkt('point(7 45)'))
2138        self.assertFalse(layer.commitChanges())
2139        layer.changeAttributeValue(1, 1, old_value)
2140        self.assertTrue(layer.commitChanges())
2141
2142    def testDeterminePkey(self):
2143        """Test primary key auto-determination"""
2144
2145        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 type=POLYGON table="qgis_test"."authors" sql=',
2146                            'test', 'postgres')
2147        self.assertTrue(vl.isValid())
2148        self.assertTrue(vl.dataProvider().hasMetadata())
2149        self.assertTrue("key='pk'" in vl.source())
2150
2151    def testCheckPkUnicityOnView(self):
2152        # vector layer based on view
2153
2154        # This is valid
2155        vl0 = QgsVectorLayer(
2156            self.dbconn +
2157            ' checkPrimaryKeyUnicity=\'0\'  sslmode=disable key=\'pk\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2158            'test', 'postgres')
2159        self.assertTrue(vl0.isValid())
2160
2161        geom = vl0.getFeature(1).geometry().asWkt()
2162
2163        # This is NOT valid
2164        vl0 = QgsVectorLayer(
2165            self.dbconn +
2166            ' checkPrimaryKeyUnicity=\'1\' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2167            'test', 'postgres')
2168        self.assertFalse(vl0.isValid())
2169
2170        # This is NOT valid because the default is to check unicity
2171        vl0 = QgsVectorLayer(
2172            self.dbconn +
2173            ' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2174            'test', 'postgres')
2175        self.assertFalse(vl0.isValid())
2176
2177        # This is valid because the readExtentFromXml option is set
2178        # loadDefaultStyle, readExtentFromXml
2179        options = QgsVectorLayer.LayerOptions(True, True)
2180        vl0 = QgsVectorLayer(
2181            self.dbconn +
2182            ' sslmode=disable key=\'an_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2183            'test', 'postgres', options)
2184        self.assertTrue(vl0.isValid())
2185
2186        # Valid because a_unique_int is unique and default is to check unicity
2187        vl0 = QgsVectorLayer(
2188            self.dbconn +
2189            ' sslmode=disable key=\'a_unique_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2190            'test', 'postgres')
2191        self.assertEqual(vl0.getFeature(1).geometry().asWkt(), geom)
2192
2193        # Valid because a_unique_int is unique
2194        vl0 = QgsVectorLayer(
2195            self.dbconn +
2196            ' checkPrimaryKeyUnicity=\'1\' sslmode=disable key=\'a_unique_int\' srid=0 type=POINT table="qgis_test"."b21839_pk_unicity_view" (geom) sql=',
2197            'test', 'postgres')
2198        self.assertTrue(vl0.isValid())
2199        self.assertEqual(vl0.getFeature(1).geometry().asWkt(), geom)
2200
2201    def testNotify(self):
2202        vl0 = QgsVectorLayer(
2203            self.dbconn +
2204            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="qgis_test"."some_poly_data" (geom) sql=',
2205            'test', 'postgres')
2206        vl0.dataProvider().setListening(True)
2207
2208        class Notified(QObject):
2209
2210            def __init__(self):
2211                super(Notified, self).__init__()
2212                self.received = ""
2213
2214            def receive(self, msg):
2215                self.received = msg
2216
2217        notified = Notified()
2218        vl0.dataProvider().notify.connect(notified.receive)
2219
2220        vl0.dataProvider().setListening(True)
2221
2222        cur = self.con.cursor()
2223        ok = False
2224        start = time.time()
2225        while True:
2226            cur.execute("NOTIFY qgis, 'my message'")
2227            self.con.commit()
2228            QGISAPP.processEvents()
2229            if notified.received == "my message":
2230                ok = True
2231                break
2232            if (time.time() - start) > 5:  # timeout
2233                break
2234
2235        vl0.dataProvider().notify.disconnect(notified.receive)
2236        vl0.dataProvider().setListening(False)
2237
2238        self.assertTrue(ok)
2239
2240    def testStyleDatabaseWithService(self):
2241        """Test saving style in DB using a service file.
2242
2243        To run this test, you first need to setup the test
2244        database with tests/testdata/provider/testdata_pg.sh
2245        """
2246        myconn = 'service=\'qgis_test\''
2247        if 'QGIS_PGTEST_DB' in os.environ:
2248            myconn = os.environ['QGIS_PGTEST_DB']
2249        myvl = QgsVectorLayer(
2250            myconn +
2251            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=',
2252            'test', 'postgres')
2253
2254        styles = myvl.listStylesInDatabase()
2255        ids = styles[1]
2256        self.assertEqual(len(ids), 0)
2257
2258        myvl.saveStyleToDatabase('mystyle', '', False, '')
2259        styles = myvl.listStylesInDatabase()
2260        ids = styles[1]
2261        self.assertEqual(len(ids), 1)
2262
2263        myvl.deleteStyleFromDatabase(ids[0])
2264        styles = myvl.listStylesInDatabase()
2265        ids = styles[1]
2266        self.assertEqual(len(ids), 0)
2267
2268        # try with a layer which doesn't have geom
2269        myvl = QgsVectorLayer(
2270            myconn +
2271            ' sslmode=disable key=\'pk\' table="qgis_test"."bikes" sql=', 'test', 'postgres')
2272        self.assertTrue(myvl.isValid())
2273
2274        myvl.saveStyleToDatabase('mystyle_wo_geom', '', False, '')
2275        styles = myvl.listStylesInDatabase()
2276        ids = styles[1]
2277        self.assertEqual(len(ids), 1)
2278
2279        myvl.deleteStyleFromDatabase(ids[0])
2280        styles = myvl.listStylesInDatabase()
2281        ids = styles[1]
2282        self.assertEqual(len(ids), 0)
2283
2284    def testCurveToMultipolygon(self):
2285        self.execSQLCommand(
2286            'CREATE TABLE IF NOT EXISTS multicurve(pk SERIAL NOT NULL PRIMARY KEY, geom public.geometry(MultiPolygon, 4326))')
2287        self.execSQLCommand('TRUNCATE multicurve')
2288
2289        vl = QgsVectorLayer(
2290            self.dbconn +
2291            ' sslmode=disable key=\'pk\' srid=4326 type=MULTIPOLYGON table="multicurve" (geom) sql=',
2292            'test', 'postgres')
2293
2294        f = QgsFeature(vl.fields())
2295        f.setGeometry(QgsGeometry.fromWkt(
2296            'CurvePolygon(CircularString (20 30, 50 30, 50 90, 10 50, 20 30))'))
2297        self.assertTrue(vl.startEditing())
2298        self.assertTrue(vl.addFeatures([f]))
2299        self.assertTrue(vl.commitChanges())
2300
2301        f = next(vl.getFeatures(QgsFeatureRequest()))
2302
2303        g = f.geometry().constGet()
2304        self.assertTrue(g)
2305        self.assertEqual(g.wkbType(), QgsWkbTypes.MultiPolygon)
2306        self.assertEqual(g.childCount(), 1)
2307        self.assertTrue(g.childGeometry(0).vertexCount() > 3)
2308
2309    def testMassivePaste(self):
2310        """Speed test to compare createFeature and createFeatures, for regression #21303"""
2311
2312        import time
2313
2314        self.execSQLCommand(
2315            'CREATE TABLE IF NOT EXISTS massive_paste(pk SERIAL NOT NULL PRIMARY KEY, geom public.geometry(Polygon, 4326))')
2316        self.execSQLCommand('TRUNCATE massive_paste')
2317
2318        start_time = time.time()
2319        vl = QgsVectorLayer(
2320            self.dbconn +
2321            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="massive_paste" (geom) sql=',
2322            'test_massive_paste', 'postgres')
2323        self.assertTrue(vl.startEditing())
2324        features = []
2325        context = vl.createExpressionContext()
2326        for i in range(4000):
2327            features.append(
2328                QgsVectorLayerUtils.createFeature(vl, QgsGeometry.fromWkt('Polygon ((7 44, 8 45, 8 46, 7 46, 7 44))'),
2329                                                  {0: i}, context))
2330        self.assertTrue(vl.addFeatures(features))
2331        self.assertTrue(vl.commitChanges())
2332        self.assertEqual(vl.featureCount(), 4000)
2333        print("--- %s seconds ---" % (time.time() - start_time))
2334
2335        self.execSQLCommand('TRUNCATE massive_paste')
2336        start_time = time.time()
2337        vl = QgsVectorLayer(
2338            self.dbconn +
2339            ' sslmode=disable key=\'pk\' srid=4326 type=POLYGON table="massive_paste" (geom) sql=',
2340            'test_massive_paste', 'postgres')
2341        self.assertTrue(vl.startEditing())
2342        features_data = []
2343        context = vl.createExpressionContext()
2344        for i in range(4000):
2345            features_data.append(
2346                QgsVectorLayerUtils.QgsFeatureData(QgsGeometry.fromWkt('Polygon ((7 44, 8 45, 8 46, 7 46, 7 44))'),
2347                                                   {0: i}))
2348        features = QgsVectorLayerUtils.createFeatures(
2349            vl, features_data, context)
2350        self.assertTrue(vl.addFeatures(features))
2351        self.assertTrue(vl.commitChanges())
2352        self.assertEqual(vl.featureCount(), 4000)
2353        print("--- %s seconds ---" % (time.time() - start_time))
2354
2355    def testFilterOnCustomBbox(self):
2356        extent = QgsRectangle(-68, 70, -67, 80)
2357        request = QgsFeatureRequest().setFilterRect(extent)
2358        dbconn = 'service=qgis_test'
2359        uri = '%s srid=4326 key="pk" sslmode=disable table="qgis_test"."some_poly_data_shift_bbox" (geom)' % (
2360            dbconn)
2361
2362        def _test(vl, ids):
2363            values = {feat['pk']: 'x' for feat in vl.getFeatures(request)}
2364            expected = {x: 'x' for x in ids}
2365            self.assertEqual(values, expected)
2366
2367        vl = QgsVectorLayer(uri, "testgeom", "postgres")
2368        self.assertTrue(vl.isValid())
2369        _test(vl, [2, 3])
2370
2371        vl = QgsVectorLayer(uri + ' bbox=shiftbox', "testgeom", "postgres")
2372        self.assertTrue(vl.isValid())
2373        _test(vl, [1, 3])
2374
2375    def testValidLayerDiscoverRelationsNone(self):
2376        """
2377        Test checks that discover relation feature can be used on a layer that has no relation.
2378        """
2379        vl = QgsVectorLayer(
2380            self.dbconn +
2381            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=',
2382            'test', 'postgres')
2383        self.assertTrue(vl.isValid())
2384        self.assertEqual(vl.dataProvider().discoverRelations(vl, []), [])
2385
2386    def testInvalidLayerDiscoverRelations(self):
2387        """
2388        Test that discover relations feature can be used on invalid layer.
2389        """
2390        vl = QgsVectorLayer('{} table="qgis_test"."invalid_layer" sql='.format(self.dbconn), "invalid_layer",
2391                            "postgres")
2392        self.assertFalse(vl.isValid())
2393        self.assertEqual(vl.dataProvider().discoverRelations(vl, []), [])
2394
2395    def testValidLayerDiscoverRelations(self):
2396        """
2397        Test implicit relations that can be discovers between tables, based on declared foreign keys.
2398        The test also checks that two distinct relations can be discovered when two foreign keys are declared (see #41138).
2399        """
2400        vl = QgsVectorLayer(
2401            self.dbconn +
2402            ' sslmode=disable key=\'pk\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referencing_layer"',
2403            'referencing_layer', 'postgres')
2404        vls = [
2405            QgsVectorLayer(
2406                self.dbconn +
2407                ' sslmode=disable key=\'pk_ref_1\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referenced_layer_1"',
2408                'referenced_layer_1', 'postgres'),
2409            QgsVectorLayer(
2410                self.dbconn +
2411                ' sslmode=disable key=\'pk_ref_2\' checkPrimaryKeyUnicity=\'1\' table="qgis_test"."referenced_layer_2"',
2412                'referenced_layer_2', 'postgres'),
2413            vl
2414        ]
2415
2416        for lyr in vls:
2417            self.assertTrue(lyr.isValid())
2418            QgsProject.instance().addMapLayer(lyr)
2419        relations = vl.dataProvider().discoverRelations(vl, vls)
2420        self.assertEqual(len(relations), 2)
2421        for i, r in enumerate(relations):
2422            self.assertEqual(r.referencedLayer(), vls[i])
2423
2424    def testCheckTidPkOnViews(self):
2425        """Test vector layer based on a view with `ctid` as a key"""
2426
2427        # This is valid
2428        vl0 = QgsVectorLayer(
2429            self.dbconn +
2430            ' checkPrimaryKeyUnicity=\'0\'  sslmode=disable key=\'ctid\' srid=4326 type=POINT table="qgis_test"."b31799_test_view_ctid" (geom) sql=',
2431            'test', 'postgres')
2432        self.assertTrue(vl0.isValid())
2433        self.assertEqual(vl0.featureCount(), 10)
2434        for f in vl0.getFeatures():
2435            self.assertNotEqual(f.attribute(0), NULL)
2436
2437    def testFeatureCountEstimatedOnTable(self):
2438        """
2439        Test feature count on table when estimated data is enabled
2440        """
2441        vl = QgsVectorLayer(
2442            self.dbconn +
2443            ' sslmode=disable key=\'pk\' estimatedmetadata=true srid=4326 type=POINT table="qgis_test"."someData" (geom) sql=',
2444            'test', 'postgres')
2445        self.assertTrue(vl.isValid())
2446        self.assertTrue(vl.featureCount() > 0)
2447
2448    def testFeatureCountEstimatedOnView(self):
2449        """
2450        Test feature count on view when estimated data is enabled
2451        """
2452        self.execSQLCommand('DROP VIEW IF EXISTS qgis_test.somedataview')
2453        self.execSQLCommand(
2454            'CREATE VIEW qgis_test.somedataview AS SELECT * FROM qgis_test."someData"')
2455        vl = QgsVectorLayer(
2456            self.dbconn +
2457            ' sslmode=disable key=\'pk\' estimatedmetadata=true srid=4326 type=POINT table="qgis_test"."somedataview" (geom) sql=',
2458            'test', 'postgres')
2459        self.assertTrue(vl.isValid())
2460        self.assertTrue(vl.featureCount() > 0)
2461
2462    def testIdentityPk(self):
2463        """Test a table with identity pk, see GH #29560"""
2464
2465        vl = QgsVectorLayer(
2466            self.dbconn +
2467            ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."b29560"(geom) sql=',
2468            'testb29560', 'postgres')
2469        self.assertTrue(vl.isValid())
2470
2471        feature = QgsFeature(vl.fields())
2472        geom = QgsGeometry.fromWkt('POLYGON EMPTY')
2473        feature.setGeometry(geom)
2474        self.assertTrue(vl.dataProvider().addFeature(feature))
2475
2476        self.assertEqual(vl.dataProvider().defaultValueClause(0), "nextval('b29560_gid_seq'::regclass)")
2477
2478        del (vl)
2479
2480        # Verify
2481        vl = QgsVectorLayer(
2482            self.dbconn +
2483            ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."b29560"(geom) sql=',
2484            'testb29560', 'postgres')
2485        self.assertTrue(vl.isValid())
2486        feature = next(vl.getFeatures())
2487        self.assertIsNotNone(feature.id())
2488
2489    @unittest.skipIf(os.environ.get('QGIS_CONTINUOUS_INTEGRATION_RUN', 'true'), 'Test flaky')
2490    def testDefaultValuesAndClauses(self):
2491        """Test whether default values like CURRENT_TIMESTAMP or
2492        now() they are respected. See GH #33383"""
2493
2494        # Create the test table
2495
2496        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable  table="public"."test_table_default_values" sql=', 'test',
2497                            'postgres')
2498        self.assertTrue(vl.isValid())
2499
2500        dp = vl.dataProvider()
2501
2502        # Clean the table
2503        dp.deleteFeatures(dp.allFeatureIds())
2504
2505        # Save it for the test
2506        now = datetime.now()
2507
2508        # Test default values
2509        dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1)
2510        # FIXME: spatialite provider (and OGR) return a NULL here and the following passes
2511        # self.assertTrue(dp.defaultValue(0).isNull())
2512        self.assertIsNotNone(dp.defaultValue(0))
2513        self.assertIsNone(dp.defaultValue(1))
2514        self.assertTrue(dp.defaultValue(
2515            2).startswith(now.strftime('%Y-%m-%d')))
2516        self.assertTrue(dp.defaultValue(
2517            3).startswith(now.strftime('%Y-%m-%d')))
2518        self.assertEqual(dp.defaultValue(4), 123)
2519        self.assertEqual(dp.defaultValue(5), 'My default')
2520
2521        # FIXME: the provider should return the clause definition
2522        #       regardless of the EvaluateDefaultValues setting
2523        dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 0)
2524        self.assertEqual(dp.defaultValueClause(
2525            0), "nextval('test_table_default_values_id_seq'::regclass)")
2526        self.assertEqual(dp.defaultValueClause(1), '')
2527        self.assertEqual(dp.defaultValueClause(2), "now()")
2528        self.assertEqual(dp.defaultValueClause(3), "CURRENT_TIMESTAMP")
2529        self.assertEqual(dp.defaultValueClause(4), '123')
2530        self.assertEqual(dp.defaultValueClause(5), "'My default'::text")
2531        # FIXME: the test fails if the value is not reset to 1
2532        dp.setProviderProperty(QgsDataProvider.EvaluateDefaultValues, 1)
2533
2534        feature = QgsFeature(vl.fields())
2535        for idx in range(vl.fields().count()):
2536            default = vl.dataProvider().defaultValue(idx)
2537            if default is not None:
2538                feature.setAttribute(idx, default)
2539            else:
2540                feature.setAttribute(idx, 'A comment')
2541
2542        self.assertTrue(vl.dataProvider().addFeature(feature))
2543        del (vl)
2544
2545        # Verify
2546        vl2 = QgsVectorLayer(self.dbconn + ' sslmode=disable  table="public"."test_table_default_values" sql=', 'test',
2547                             'postgres')
2548        self.assertTrue(vl2.isValid())
2549        feature = next(vl2.getFeatures())
2550        self.assertEqual(feature.attribute(1), 'A comment')
2551        self.assertTrue(feature.attribute(
2552            2).startswith(now.strftime('%Y-%m-%d')))
2553        self.assertTrue(feature.attribute(
2554            3).startswith(now.strftime('%Y-%m-%d')))
2555        self.assertEqual(feature.attribute(4), 123)
2556        self.assertEqual(feature.attribute(5), 'My default')
2557
2558    def testEncodeDecodeUri(self):
2559        """Test PG encode/decode URI"""
2560
2561        md = QgsProviderRegistry.instance().providerMetadata('postgres')
2562        self.assertEqual(md.decodeUri(
2563            'dbname=\'qgis_tests\' host=localhost port=5432 user=\'myuser\' sslmode=disable estimatedmetadata=true srid=3067 table="public"."basic_map_tiled" (rast)'),
2564            {'dbname': 'qgis_tests',
2565             'estimatedmetadata': True,
2566             'geometrycolumn': 'rast',
2567             'host': 'localhost',
2568             'port': '5432',
2569             'schema': 'public',
2570             'srid': '3067',
2571             'sslmode': 1,
2572             'table': 'basic_map_tiled',
2573             'username': 'myuser'})
2574
2575        self.assertEqual(md.decodeUri(
2576            'dbname=\'qgis_tests\' host=localhost port=5432 user=\'myuser\' sslmode=disable key=\'id\' estimatedmetadata=true srid=3763 type=MultiPolygon checkPrimaryKeyUnicity=\'1\' table="public"."copas1" (geom)'),
2577            {'dbname': 'qgis_tests',
2578             'estimatedmetadata': True,
2579             'geometrycolumn': 'geom',
2580             'host': 'localhost',
2581             'key': 'id',
2582             'port': '5432',
2583             'schema': 'public',
2584             'srid': '3763',
2585             'sslmode': 1,
2586             'table': 'copas1',
2587             'type': 6,
2588             'username': 'myuser'})
2589
2590        self.assertEqual(md.encodeUri({'dbname': 'qgis_tests',
2591                                       'estimatedmetadata': True,
2592                                       'geometrycolumn': 'geom',
2593                                       'host': 'localhost',
2594                                       'key': 'id',
2595                                       'port': '5432',
2596                                       'schema': 'public',
2597                                       'srid': '3763',
2598                                       'sslmode': 1,
2599                                       'table': 'copas1',
2600                                       'type': 6,
2601                                       'username': 'myuser'}),
2602                         "dbname='qgis_tests' user='myuser' srid=3763 estimatedmetadata='true' host='localhost' key='id' port='5432' sslmode='disable' type='MultiPolygon' table=\"public\".\"copas1\" (geom)")
2603
2604        self.assertEqual(md.encodeUri({'dbname': 'qgis_tests',
2605                                       'estimatedmetadata': True,
2606                                       'geometrycolumn': 'rast',
2607                                       'host': 'localhost',
2608                                       'port': '5432',
2609                                       'schema': 'public',
2610                                       'srid': '3067',
2611                                       'sslmode': 1,
2612                                       'table': 'basic_map_tiled',
2613                                       'username': 'myuser'}),
2614                         "dbname='qgis_tests' user='myuser' srid=3067 estimatedmetadata='true' host='localhost' port='5432' sslmode='disable' table=\"public\".\"basic_map_tiled\" (rast)")
2615
2616        def _round_trip(uri):
2617            decoded = md.decodeUri(uri)
2618            self.assertEqual(decoded, md.decodeUri(md.encodeUri(decoded)))
2619
2620        uri = self.dbconn + \
2621            ' sslmode=disable key=\'gid\' srid=3035  table="public"."my_pg_vector" sql='
2622        decoded = md.decodeUri(uri)
2623        self.assertEqual(decoded, {
2624            'key': 'gid',
2625            'schema': 'public',
2626            'service': 'qgis_test',
2627            'srid': '3035',
2628            'sslmode': QgsDataSourceUri.SslDisable,
2629            'table': 'my_pg_vector',
2630        })
2631
2632        _round_trip(uri)
2633
2634        uri = self.dbconn + \
2635            ' sslmode=prefer key=\'gid\' srid=3035 temporalFieldIndex=2 ' + \
2636            'authcfg=afebeff username=\'my username\' password=\'my secret password=\' ' + \
2637            'table="public"."my_pg_vector" (the_geom) sql="a_field" != 1223223'
2638
2639        _round_trip(uri)
2640
2641        decoded = md.decodeUri(uri)
2642        self.assertEqual(decoded, {
2643            'authcfg': 'afebeff',
2644            'geometrycolumn': 'the_geom',
2645            'key': 'gid',
2646            'password': 'my secret password=',
2647            'schema': 'public',
2648            'service': 'qgis_test',
2649            'sql': '"a_field" != 1223223',
2650            'srid': '3035',
2651            'sslmode': QgsDataSourceUri.SslPrefer,
2652            'table': 'my_pg_vector',
2653            'username': 'my username',
2654        })
2655
2656    def testHasSpatialIndex(self):
2657        for layer_name in ('hspi_table', 'hspi_materialized_view'):
2658            columns = {'geom_without_index': QgsFeatureSource.SpatialIndexNotPresent, 'geom_with_index': QgsFeatureSource.SpatialIndexPresent}
2659            for (geometry_column, spatial_index) in columns.items():
2660                conn = 'service=\'qgis_test\''
2661                if 'QGIS_PGTEST_DB' in os.environ:
2662                    conn = os.environ['QGIS_PGTEST_DB']
2663                vl = QgsVectorLayer(
2664                    conn +
2665                    ' sslmode=disable key=\'id\' srid=4326 type=\'Polygon\' table="qgis_test"."{n}" ({c}) sql='.format(n=layer_name, c=geometry_column),
2666                    'test', 'postgres')
2667                self.assertTrue(vl.isValid())
2668                self.assertEqual(vl.hasSpatialIndex(), spatial_index)
2669
2670    def testBBoxFilterOnGeographyType(self):
2671        """Test bounding box filter on geography type"""
2672
2673        vl = QgsVectorLayer(
2674            self.dbconn +
2675            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."testgeog" (geog) sql=',
2676            'test', 'postgres')
2677
2678        self.assertTrue(vl.isValid())
2679
2680        def _test(vl, extent, ids):
2681            request = QgsFeatureRequest().setFilterRect(extent)
2682            values = {feat['pk']: 'x' for feat in vl.getFeatures(request)}
2683            expected = {x: 'x' for x in ids}
2684            self.assertEqual(values, expected)
2685
2686        _test(vl, QgsRectangle(40 - 0.01, -0.01, 40 + 0.01, 0.01), [1])
2687        _test(vl, QgsRectangle(40 - 5, -5, 40 + 5, 5), [1])
2688        _test(vl, QgsRectangle(40 - 5, 0, 40 + 5, 5), [1])
2689        _test(vl, QgsRectangle(40 - 10, -10, 40 + 10, 10), [1])  # no use of spatial index currently
2690        _test(vl, QgsRectangle(40 - 5, 0.01, 40 + 5, 5), [])  # no match
2691
2692        _test(vl, QgsRectangle(40 - 0.01, 60 - 0.01, 40 + 0.01, 60 + 0.01), [2])
2693        _test(vl, QgsRectangle(40 - 5, 60 - 5, 40 + 5, 60 + 5), [2])
2694        _test(vl, QgsRectangle(40 - 5, 60 - 0.01, 40 + 5, 60 + 9.99), [2])
2695
2696        _test(vl, QgsRectangle(40 - 0.01, -60 - 0.01, 40 + 0.01, -60 + 0.01), [3])
2697        _test(vl, QgsRectangle(40 - 5, -60 - 5, 40 + 5, -60 + 5), [3])
2698        _test(vl, QgsRectangle(40 - 5, -60 - 9.99, 40 + 5, -60 + 0.01), [3])
2699
2700        _test(vl, QgsRectangle(-181, -90, 181, 90), [1, 2, 3])  # no use of spatial index currently
2701
2702    def testReadCustomSRID(self):
2703        """Test that we can correctly read the SRS from a custom SRID"""
2704
2705        md = QgsProviderRegistry.instance().providerMetadata("postgres")
2706        conn = md.createConnection(self.dbconn, {})
2707
2708        # Cleanup if needed
2709        try:
2710            conn.dropVectorTable('qgis_test', 'test_custom_srid')
2711        except QgsProviderConnectionException:
2712            pass
2713
2714        conn.executeSql("DELETE FROM spatial_ref_sys WHERE srid = 543210 AND auth_name='FOO' AND auth_srid=32600;")
2715        conn.executeSql("""INSERT INTO spatial_ref_sys (srid, auth_name, auth_srid, srtext, proj4text) VALUES (543210, 'FOO', 32600, 'PROJCS["my_projection",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",0],PARAMETER["scale_factor",1],PARAMETER["false_easting",0],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]','+proj=tmerc +lat_0=0 +lon_0=0 +k=1 +x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs');""")
2716
2717        conn.executeSql('''
2718        CREATE TABLE "qgis_test"."test_custom_srid" (
2719            gid serial primary key,
2720            geom geometry(Point, 543210)
2721        );''')
2722
2723        layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\'table="qgis_test"."test_custom_srid" (geom) sql=', 'test', 'postgres')
2724
2725        conn.executeSql("DELETE FROM spatial_ref_sys WHERE srid = 543210 AND auth_name='FOO' AND auth_srid=32600;")
2726
2727        self.assertTrue(layer.isValid())
2728        self.assertEqual(layer.crs().description(), 'my_projection')
2729
2730    def testSingleMultiColumnPkSmallData(self):
2731        """Test Single and Multi Column PK, `Small` Data"""
2732        from itertools import combinations
2733
2734        def test_for_pk_combinations(test_type_list, pk_column_name_list, fids_get_count):
2735            pk_column_name = ','.join(pk_column_name_list)
2736            set_new_pk = '''
2737                ALTER TABLE qgis_test.multi_column_pk_small_data_table DROP CONSTRAINT multi_column_pk_small_data_pk;
2738                ALTER TABLE qgis_test.multi_column_pk_small_data_table
2739                    ADD CONSTRAINT multi_column_pk_small_data_pk PRIMARY KEY ({});'''
2740            set_new_layer = ' sslmode=disable key=\'{}\' srid=3857 type=POLYGON table="qgis_test"."multi_column_pk_small_data_{}" (geom) sql='
2741            error_string = 'from {} with PK - {} : expected {}, got {}'
2742
2743            if 'table' in test_type_list:
2744                self.execSQLCommand(set_new_pk.format(pk_column_name))
2745            for test_type in test_type_list:
2746                vl = QgsVectorLayer(self.dbconn + set_new_layer.format(pk_column_name, test_type), 'test_multi_column_pk_small_data', 'postgres')
2747                fids = [f.id() for f in vl.getFeatures(QgsFeatureRequest().setLimit(fids_get_count))]
2748                fids2 = [f.id() for f in vl.getFeatures(fids)]
2749                self.assertEqual(fids_get_count, len(fids), "Get with limit " +
2750                                 error_string.format(test_type, pk_column_name, fids_get_count, len(fids)))
2751                self.assertEqual(fids_get_count, len(fids2), "Get by fids " +
2752                                 error_string.format(test_type, pk_column_name, fids_get_count, len(fids2)))
2753
2754        self.execSQLCommand('DROP TABLE IF EXISTS qgis_test.multi_column_pk_small_data_table CASCADE;')
2755        self.execSQLCommand('''
2756        CREATE TABLE qgis_test.multi_column_pk_small_data_table (
2757          id_serial serial NOT NULL,
2758          id_uuid uuid NOT NULL,
2759          id_int int NOT NULL,
2760          id_bigint bigint NOT NULL,
2761          id_str character varying(20) NOT NULL,
2762          id_inet4 inet NOT NULL,
2763          id_inet6 inet NOT NULL,
2764          id_cidr4 cidr NOT NULL,
2765          id_cidr6 cidr NOT NULL,
2766          id_macaddr macaddr NOT NULL,
2767          id_macaddr8 macaddr8 NOT NULL,
2768          id_timestamp timestamp with time zone NOT NULL,
2769          id_half_null_uuid uuid,
2770          id_all_null_uuid uuid,
2771          geom geometry(Polygon,3857),
2772          CONSTRAINT multi_column_pk_small_data_pk
2773            PRIMARY KEY (id_serial, id_uuid, id_int, id_bigint, id_str) );''')
2774        self.execSQLCommand('''
2775        CREATE OR REPLACE VIEW qgis_test.multi_column_pk_small_data_view AS
2776          SELECT * FROM qgis_test.multi_column_pk_small_data_table;
2777        DROP MATERIALIZED VIEW IF EXISTS qgis_test.multi_column_pk_small_data_mat_view;
2778        CREATE MATERIALIZED VIEW qgis_test.multi_column_pk_small_data_mat_view AS
2779          SELECT * FROM qgis_test.multi_column_pk_small_data_table;''')
2780        self.execSQLCommand('''
2781        TRUNCATE qgis_test.multi_column_pk_small_data_table;
2782        INSERT INTO qgis_test.multi_column_pk_small_data_table(
2783          id_uuid, id_int, id_bigint, id_str, id_inet4, id_inet6, id_cidr4, id_cidr6,
2784            id_macaddr, id_macaddr8, id_timestamp, id_half_null_uuid, id_all_null_uuid, geom)
2785          SELECT
2786            ( (10000000)::text || (100000000000 + dy)::text || (100000000000 + dx)::text )::uuid,
2787            dx + 1000000 * dy,                                                    --id_int
2788            dx + 1000000 * dy,                                                    --id_bigint
2789            dx || E\' ot\\'her \' || dy,                                          --id_str
2790            (\'192.168.0.1\'::inet + dx + 100 * dy )::inet,                       --id_inet4
2791            (\'2001:4f8:3:ba:2e0:81ff:fe22:d1f1\'::inet + dx + 100 * dy )::inet,  --id_inet6
2792            (\'192.168.0.1\'::cidr + dx + 100 * dy )::cidr,                       --id_cidr4
2793            (\'2001:4f8:3:ba:2e0:81ff:fe22:d1f1\'::cidr + dx + 100 * dy )::cidr,  --id_cidr6
2794            ((112233445566 + dx + 100 * dy)::text)::macaddr,                      --id_macaddr
2795            ((1122334455667788 + dx + 100 * dy)::text)::macaddr8,                 --id_macaddr8
2796            now() - ((dx||\' hour\')::text)::interval - ((dy||\' day\')::text)::interval,
2797            NULLIF( ( (10000000)::text || (100000000000 + dy)::text || (100000000000 + dx)::text )::uuid,
2798              ( (10000000)::text || (100000000000 + dy + dy%2)::text || (100000000000 + dx)::text )::uuid ),
2799            NULL,
2800            ST_Translate(
2801              ST_GeomFromText(\'POLYGON((3396900.0 6521800.0,3396900.0 6521870.0,
2802                  3396830.0 6521870.0,3396830.0 6521800.0,3396900.0 6521800.0))\', 3857 ),
2803              100.0 * dx,
2804              100.0 * dy )
2805          FROM generate_series(1,3) dx, generate_series(1,3) dy;
2806        REFRESH MATERIALIZED VIEW qgis_test.multi_column_pk_small_data_mat_view;''')
2807
2808        pk_col_list = ("id_serial", "id_uuid", "id_int", "id_bigint", "id_str", "id_inet4", "id_inet6", "id_cidr4", "id_cidr6", "id_macaddr", "id_macaddr8")
2809        test_type_list = ["table", "view", "mat_view"]
2810        for n in [1, 2, len(pk_col_list)]:
2811            pk_col_set_list = list(combinations(pk_col_list, n))
2812            for pk_col_set in pk_col_set_list:
2813                test_for_pk_combinations(test_type_list, pk_col_set, 7)
2814
2815        for col_name in ["id_serial", "id_uuid", "id_int", "id_bigint", "id_str", "id_inet4"]:
2816            test_for_pk_combinations(["view", "mat_view"], ["id_half_null_uuid", col_name], 7)
2817            test_for_pk_combinations(["view", "mat_view"], ["id_all_null_uuid", col_name], 7)
2818
2819    def testChangeAttributeWithDefaultValue(self):
2820        """Test that we can change an attribute value with its default value"""
2821
2822        md = QgsProviderRegistry.instance().providerMetadata("postgres")
2823        conn = md.createConnection(self.dbconn, {})
2824
2825        # Cleanup
2826        try:
2827            conn.dropVectorTable('qgis_test', 'test_change_att_w_default_value')
2828        except QgsProviderConnectionException:
2829            pass
2830
2831        conn.executeSql('''
2832        CREATE TABLE "qgis_test"."test_change_att_w_default_value" (
2833            id serial primary key,
2834            thetext1 character varying(8) DEFAULT NULL::character varying,
2835            thetext2 character varying(8) DEFAULT NULL,
2836            thetext3 character varying(8) DEFAULT 'blabla',
2837            thenumber integer DEFAULT 2+2
2838        );''')
2839
2840        conn.executeSql('''
2841        INSERT INTO "qgis_test"."test_change_att_w_default_value" (thetext1,thetext2,thetext3,thenumber) VALUES ('test1','test2','test3',6);''')
2842
2843        layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\'table="qgis_test"."test_change_att_w_default_value" sql=', 'test', 'postgres')
2844        self.assertTrue(layer.isValid())
2845        self.assertEqual(layer.featureCount(), 1)
2846        feat = next(layer.getFeatures())
2847        self.assertTrue(feat["thetext1"], "test1")
2848        self.assertTrue(feat["thetext2"], "test2")
2849        self.assertTrue(feat["thetext3"], "test3")
2850        self.assertTrue(feat["thenumber"], 6)
2851
2852        self.assertEqual(layer.dataProvider().defaultValueClause(1), "NULL::character varying")
2853        self.assertEqual(layer.dataProvider().defaultValueClause(2), "NULL::character varying")
2854        self.assertEqual(layer.dataProvider().defaultValueClause(3), "'blabla'::character varying")
2855        self.assertEqual(layer.dataProvider().defaultValueClause(4), "(2 + 2)")
2856
2857        layer.startEditing()
2858        self.assertTrue(layer.changeAttributeValues(1, {1: "NULL::character varying", 2: "NULL::character varying",
2859                                                        3: "'blabla'::character varying", 4: "(2 + 2)"}))
2860        self.assertTrue(layer.commitChanges())
2861
2862        feat = next(layer.getFeatures())
2863
2864        # print( "hein |{}| expected=|{}| bool={}".format( feat["thetext"], expected, (feat["thetext"] expected) ) )
2865        self.assertEqual(feat["thetext1"], NULL)
2866        self.assertEqual(feat["thetext2"], NULL)
2867        self.assertEqual(feat["thetext3"], "blabla")
2868        self.assertEqual(feat["thenumber"], 4)
2869
2870
2871class TestPyQgsPostgresProviderCompoundKey(unittest.TestCase, ProviderTestCase):
2872
2873    @classmethod
2874    def setUpClass(cls):
2875        """Run before all tests"""
2876        cls.dbconn = 'service=qgis_test'
2877        if 'QGIS_PGTEST_DB' in os.environ:
2878            cls.dbconn = os.environ['QGIS_PGTEST_DB']
2879        # Create test layers
2880        cls.vl = QgsVectorLayer(
2881            cls.dbconn +
2882            ' sslmode=disable key=\'"key1","key2"\' srid=4326 type=POINT table="qgis_test"."someDataCompound" (geom) sql=',
2883            'test', 'postgres')
2884        assert cls.vl.isValid()
2885        cls.source = cls.vl.dataProvider()
2886
2887    @classmethod
2888    def tearDownClass(cls):
2889        """Run after all tests"""
2890
2891    def enableCompiler(self):
2892        QgsSettings().setValue('/qgis/compileExpressions', True)
2893        return True
2894
2895    def disableCompiler(self):
2896        QgsSettings().setValue('/qgis/compileExpressions', False)
2897
2898    def uncompiledFilters(self):
2899        return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')',
2900                    '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')',
2901                    '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')'])
2902
2903    def partiallyCompiledFilters(self):
2904        return set([])
2905
2906    def testConstraints(self):
2907        for key in ["key1", "key2"]:
2908            idx = self.vl.dataProvider().fieldNameIndex(key)
2909            self.assertTrue(idx >= 0)
2910            self.assertFalse(self.vl.dataProvider().fieldConstraints(
2911                idx) & QgsFieldConstraints.ConstraintUnique)
2912
2913    def testCompoundPkChanges(self):
2914        """ Check if fields with compound primary keys can be changed """
2915        vl = self.vl
2916
2917        self.assertTrue(vl.isValid())
2918        idx_key1 = vl.fields().lookupField('key1')
2919        idx_key2 = vl.fields().lookupField('key2')
2920        # the name "pk" for this datasource is misleading;
2921        # the primary key is actually composed by the fields key1 and key2
2922        idx_pk = vl.fields().lookupField('pk')
2923        idx_name = vl.fields().lookupField('name')
2924        idx_name2 = vl.fields().lookupField('name2')
2925
2926        geomwkt = 'Point(-47.945 -15.812)'
2927
2928        # start editing ordinary attribute.
2929        ft1 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("key1 = 2 AND key2 = 2")))
2930        self.assertTrue(ft1.isValid())
2931        original_geometry = ft1.geometry().asWkt()
2932
2933        vl.startEditing()
2934        self.assertTrue(vl.changeAttributeValues(ft1.id(), {idx_name: 'Rose'}))
2935        self.assertTrue(vl.commitChanges())
2936
2937        # check change
2938        ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression("key1 = 2 AND key2 = 2")))
2939        self.assertEqual(ft2['name'], 'Rose')
2940        self.assertEqual(ft2['name2'], 'Apple')
2941        self.assertEqual(ft2['pk'], 2)
2942
2943        # now, start editing one of the PK field components
2944        vl.startEditing()
2945
2946        self.assertTrue(vl.dataProvider().changeFeatures({ft2.id(): {idx_key2: 42, idx_name: 'Orchid', idx_name2: 'Daisy'}}, {ft2.id(): QgsGeometry.fromWkt(geomwkt)}))
2947        self.assertTrue(vl.commitChanges())
2948
2949        # let's check if we still have the same fid...
2950        ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterFid(ft2.id())))
2951        self.assertEqual(ft2['key2'], 42)
2952        self.assertEqual(ft2['name'], 'Orchid')
2953        self.assertEqual(ft2['name2'], 'Daisy')
2954        self.assertTrue(vl.startEditing())
2955        vl.changeAttributeValues(ft2.id(), {idx_key1: 21, idx_name2: 'Hibiscus'})
2956        self.assertTrue(vl.commitChanges())
2957        ft2 = next(vl.getFeatures(QgsFeatureRequest().setFilterFid(ft2.id())))
2958        self.assertEqual(ft2['key1'], 21)
2959        self.assertEqual(ft2['name2'], 'Hibiscus')
2960
2961        # lets get a brand new feature and check how it went...
2962        ft3 = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 2')))
2963        self.assertEqual(ft3['name'], 'Orchid')
2964        self.assertEqual(ft3['key1'], 21)
2965        self.assertEqual(ft3['key2'], 42)
2966
2967        assert compareWkt(ft3.geometry().asWkt(), geomwkt), "Geometry mismatch. Expected: {} Got: {}\n".format(ft3.geometry().asWkt(), geomwkt)
2968
2969        # Now, we leave the record as we found it, so further tests can proceed
2970        vl.startEditing()
2971        self.assertTrue(vl.dataProvider().changeFeatures({ft3.id(): {idx_key1: 2, idx_key2: 2, idx_pk: 2, idx_name: 'Apple', idx_name2: 'Apple'}}, {ft3.id(): QgsGeometry.fromWkt(original_geometry)}))
2972        self.assertTrue(vl.commitChanges())
2973
2974
2975class TestPyQgsPostgresProviderBigintSinglePk(unittest.TestCase, ProviderTestCase):
2976
2977    @classmethod
2978    def setUpClass(cls):
2979        """Run before all tests"""
2980        cls.dbconn = 'service=qgis_test'
2981        if 'QGIS_PGTEST_DB' in os.environ:
2982            cls.dbconn = os.environ['QGIS_PGTEST_DB']
2983        # Create test layers
2984        cls.vl = QgsVectorLayer(
2985            cls.dbconn +
2986            ' sslmode=disable key=\'"pk"\' srid=4326 type=POINT table="qgis_test"."provider_bigint_single_pk" (geom) sql=',
2987            'bigint_pk', 'postgres')
2988        assert cls.vl.isValid()
2989        cls.source = cls.vl.dataProvider()
2990        cls.con = psycopg2.connect(cls.dbconn)
2991
2992    @classmethod
2993    def tearDownClass(cls):
2994        """Run after all tests"""
2995
2996    def getSource(self):
2997        """ drops/recreates the test data anew, like TestPyQgsPostgresProvider::getSource above. """
2998        self.execSqlCommand(
2999            "DROP TABLE IF EXISTS qgis_test.provider_edit_bigint_single_pk")
3000        self.execSqlCommand(
3001            "CREATE TABLE qgis_test.provider_edit_bigint_single_pk ( pk bigserial PRIMARY KEY, cnt integer, name text DEFAULT 'qgis', name2 text DEFAULT 'qgis', num_char text, dt timestamp without time zone, \"date\" date, \"time\" time without time zone, geom public.geometry(Point,4326), key1 integer, key2 integer)")
3002        self.execSqlCommand(
3003            "INSERT INTO qgis_test.provider_edit_bigint_single_pk  ( key1, key2, pk, cnt, name, name2, num_char, dt, \"date\", \"time\", geom) VALUES"
3004            "(1, 1, 5, -200, NULL, 'NuLl', '5', TIMESTAMP '2020-05-04 12:13:14', '2020-05-02', '12:13:01', '0101000020E61000001D5A643BDFC751C01F85EB51B88E5340'),"
3005            "(1, 2, 3,  300, 'Pear', 'PEaR', '3', NULL, NULL, NULL, NULL),"
3006            "(2, 1, 1,  100, 'Orange', 'oranGe', '1', TIMESTAMP '2020-05-03 12:13:14', '2020-05-03', '12:13:14', '0101000020E61000006891ED7C3F9551C085EB51B81E955040'),"
3007            "(2, 2, 2,  200, 'Apple', 'Apple', '2', TIMESTAMP '2020-05-04 12:14:14', '2020-05-04', '12:14:14', '0101000020E6100000CDCCCCCCCC0C51C03333333333B35140'),"
3008            "(2, 3, 4,  400, 'Honey', 'Honey', '4', TIMESTAMP '2021-05-04 13:13:14', '2021-05-04', '13:13:14', '0101000020E610000014AE47E17A5450C03333333333935340')")
3009        vl = QgsVectorLayer(
3010            self.dbconn +
3011            ' sslmode=disable key=\'"pk"\' srid=4326 type=POINT table="qgis_test"."provider_edit_bigint_single_pk" (geom) sql=',
3012            'edit_bigint_pk', 'postgres')
3013        return vl
3014
3015    def getEditableLayer(self):
3016        return self.getSource()
3017
3018    def execSqlCommand(self, sql):
3019        self.assertTrue(self.con)
3020        cur = self.con.cursor()
3021        self.assertTrue(cur)
3022        cur.execute(sql)
3023        cur.close()
3024        self.con.commit()
3025
3026    def enableCompiler(self):
3027        QgsSettings().setValue('/qgis/compileExpressions', True)
3028        return True
3029
3030    def disableCompiler(self):
3031        QgsSettings().setValue('/qgis/compileExpressions', False)
3032
3033    def uncompiledFilters(self):
3034        return set(['"dt" = to_datetime(\'000www14ww13ww12www4ww5ww2020\',\'zzzwwwsswwmmwwhhwwwdwwMwwyyyy\')',
3035                    '"date" = to_date(\'www4ww5ww2020\',\'wwwdwwMwwyyyy\')',
3036                    '"time" = to_time(\'000www14ww13ww12www\',\'zzzwwwsswwmmwwhhwww\')'])
3037
3038    def partiallyCompiledFilters(self):
3039        return set([])
3040
3041    def testConstraints(self):
3042        idx = self.vl.dataProvider().fieldNameIndex("pk")
3043        self.assertTrue(idx >= 0)
3044
3045    def testGetFeaturesFidTests(self):
3046        fids = [f.id() for f in self.source.getFeatures()]
3047        assert len(fids) == 5, 'Expected 5 features, got {} instead'.format(
3048            len(fids))
3049        for id in fids:
3050            features = [f for f in self.source.getFeatures(
3051                QgsFeatureRequest().setFilterFid(id))]
3052            self.assertEqual(len(features), 1)
3053            feature = features[0]
3054            self.assertTrue(feature.isValid())
3055
3056            result = [feature.id()]
3057            expected = [id]
3058            assert result == expected, 'Expected {} and got {} when testing for feature ID filter'.format(expected,
3059                                                                                                          result)
3060
3061            # test that results match QgsFeatureRequest.acceptFeature
3062            request = QgsFeatureRequest().setFilterFid(id)
3063            for f in self.source.getFeatures():
3064                self.assertEqual(request.acceptFeature(f), f.id() == id)
3065
3066        # TODO: bad features are not tested because the PostgreSQL provider
3067        # doesn't mark explicitly set invalid features as such.
3068
3069    def testGetFeatures(self, source=None, extra_features=[], skip_features=[], changed_attributes={},
3070                        changed_geometries={}):
3071        """ Test that expected results are returned when fetching all features """
3072
3073        # IMPORTANT - we do not use `for f in source.getFeatures()` as we are also
3074        # testing that existing attributes & geometry in f are overwritten correctly
3075        # (for f in ... uses a new QgsFeature for every iteration)
3076
3077        if not source:
3078            source = self.source
3079
3080        it = source.getFeatures()
3081        f = QgsFeature()
3082        attributes = {}
3083        geometries = {}
3084        while it.nextFeature(f):
3085            # expect feature to be valid
3086            self.assertTrue(f.isValid())
3087            # some source test datasets will include additional attributes which we ignore,
3088            # so cherry pick desired attributes
3089            attrs = [f['pk'], f['cnt'], f['name'], f['name2'], f['num_char']]
3090            # DON'T force the num_char attribute to be text - some sources (e.g., delimited text) will
3091            # automatically detect that this attribute contains numbers and set it as a numeric
3092            # field
3093            # TODO: PostgreSQL 12 won't accept conversion from integer to text.
3094            # attrs[4] = str(attrs[4])
3095            attributes[f['pk']] = attrs
3096            geometries[f['pk']] = f.hasGeometry() and f.geometry().asWkt()
3097
3098        expected_attributes = {5: [5, -200, NULL, 'NuLl', '5'],
3099                               3: [3, 300, 'Pear', 'PEaR', '3'],
3100                               1: [1, 100, 'Orange', 'oranGe', '1'],
3101                               2: [2, 200, 'Apple', 'Apple', '2'],
3102                               4: [4, 400, 'Honey', 'Honey', '4']}
3103
3104        expected_geometries = {1: 'Point (-70.332 66.33)',
3105                               2: 'Point (-68.2 70.8)',
3106                               3: None,
3107                               4: 'Point(-65.32 78.3)',
3108                               5: 'Point(-71.123 78.23)'}
3109        for f in extra_features:
3110            expected_attributes[f[0]] = f.attributes()
3111            if f.hasGeometry():
3112                expected_geometries[f[0]] = f.geometry().asWkt()
3113            else:
3114                expected_geometries[f[0]] = None
3115
3116        for i in skip_features:
3117            del expected_attributes[i]
3118            del expected_geometries[i]
3119        for i, a in changed_attributes.items():
3120            for attr_idx, v in a.items():
3121                expected_attributes[i][attr_idx] = v
3122        for i, g, in changed_geometries.items():
3123            if g:
3124                expected_geometries[i] = g.asWkt()
3125            else:
3126                expected_geometries[i] = None
3127
3128        self.assertEqual(attributes, expected_attributes, 'Expected {}, got {}'.format(
3129            expected_attributes, attributes))
3130
3131        self.assertEqual(len(expected_geometries), len(geometries))
3132
3133        for pk, geom in list(expected_geometries.items()):
3134            if geom:
3135                assert compareWkt(geom, geometries[pk]), "Geometry {} mismatch Expected:\n{}\nGot:\n{}\n".format(pk,
3136                                                                                                                 geom,
3137                                                                                                                 geometries[
3138                                                                                                                     pk])
3139            else:
3140                self.assertFalse(
3141                    geometries[pk], 'Expected null geometry for {}'.format(pk))
3142
3143    def testAddFeatureExtraAttributes(self):
3144        if not getattr(self, 'getEditableLayer', None):
3145            return
3146
3147        l = self.getEditableLayer()
3148        self.assertTrue(l.isValid())
3149
3150        if not l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures:
3151            return
3152
3153        # test that adding features with too many attributes drops these attributes
3154        # we be more tricky and also add a valid feature to stress test the provider
3155        f1 = QgsFeature()
3156        f1.setAttributes([6, -220, 'qgis', 'String', '15'])
3157        f2 = QgsFeature()
3158        f2.setAttributes([7, -230, 'qgis', 'String', '15', 15, 16, 17])
3159
3160        result, added = l.dataProvider().addFeatures([f1, f2])
3161        self.assertTrue(result,
3162                        'Provider returned False to addFeatures with extra attributes. Providers should accept these features but truncate the extra attributes.')
3163
3164        # make sure feature was added correctly
3165        added = [f for f in l.dataProvider().getFeatures() if f['pk'] == 7][0]
3166        # TODO: The PostgreSQL provider doesn't truncate extra attributes!
3167        self.assertNotEqual(added.attributes(), [7, -230, 'qgis', 'String', '15'],
3168                            'The PostgreSQL provider doesn\'t truncate extra attributes.')
3169
3170    def testAddFeatureMissingAttributes(self):
3171        if not getattr(self, 'getEditableLayer', None):
3172            return
3173
3174        l = self.getEditableLayer()
3175        self.assertTrue(l.isValid())
3176
3177        if not l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures:
3178            return
3179
3180        # test that adding features with missing attributes pads out these
3181        # attributes with NULL values to the correct length.
3182        # changed from ProviderTestBase.testAddFeatureMissingAttributes: we use
3183        # 'qgis' instead of NULL below.
3184        # TODO: Only unmentioned attributes get filled with the DEFAULT table
3185        # value; if the attribute is present, the saved value will be NULL if
3186        # that is indicated, or the value mentioned by the user; there is no
3187        # implicit conversion of PyQGIS::NULL to PostgreSQL DEFAULT.
3188        f1 = QgsFeature()
3189        f1.setAttributes([6, -220, 'qgis', 'String'])
3190        f2 = QgsFeature()
3191        f2.setAttributes([7, 330])
3192
3193        result, added = l.dataProvider().addFeatures([f1, f2])
3194        self.assertTrue(result,
3195                        'Provider returned False to addFeatures with missing attributes. Providers should accept these features but add NULL attributes to the end of the existing attributes to the required field length.')
3196        f1.setId(added[0].id())
3197        f2.setId(added[1].id())
3198
3199        # check result - feature attributes MUST be padded out to required number of fields
3200        f1.setAttributes([6, -220, 'qgis', 'String', NULL])
3201        f2.setAttributes([7, 330, 'qgis', 'qgis', NULL])
3202        self.testGetFeatures(l.dataProvider(), [f1, f2])
3203
3204    def testAddFeature(self):
3205        if not getattr(self, 'getEditableLayer', None):
3206            return
3207
3208        l = self.getEditableLayer()
3209        self.assertTrue(l.isValid())
3210
3211        f1 = QgsFeature()
3212        # changed from ProviderTestBase.testAddFeature: we use 'qgis' instead
3213        # of NULL below.
3214        # TODO: Only unmentioned attributes get filled with the DEFAULT table
3215        # value; if the attribute is present, the saved value will be NULL if
3216        # that is indicated, or the value mentioned by the user; there is no
3217        # implicit conversion of PyQGIS::NULL to PostgreSQL DEFAULT.
3218        f1.setAttributes([6, -220, 'qgis', 'String', '15'])
3219        f1.setGeometry(QgsGeometry.fromWkt('Point (-72.345 71.987)'))
3220
3221        f2 = QgsFeature()
3222        f2.setAttributes([7, 330, 'Coconut', 'CoCoNut', '13'])
3223
3224        if l.dataProvider().capabilities() & QgsVectorDataProvider.AddFeatures:
3225            # expect success
3226            result, added = l.dataProvider().addFeatures([f1, f2])
3227            self.assertTrue(
3228                result, 'Provider reported AddFeatures capability, but returned False to addFeatures')
3229            f1.setId(added[0].id())
3230            f2.setId(added[1].id())
3231
3232            # check result
3233            self.testGetFeatures(l.dataProvider(), [f1, f2])
3234
3235            # add empty list, should return true for consistency
3236            self.assertTrue(l.dataProvider().addFeatures([]))
3237
3238            # ensure that returned features have been given the correct id
3239            f = next(l.getFeatures(
3240                QgsFeatureRequest().setFilterFid(added[0].id())))
3241            self.assertTrue(f.isValid())
3242            self.assertEqual(f['cnt'], -220)
3243
3244            f = next(l.getFeatures(
3245                QgsFeatureRequest().setFilterFid(added[1].id())))
3246            self.assertTrue(f.isValid())
3247            self.assertEqual(f['cnt'], 330)
3248        else:
3249            # expect fail
3250            self.assertFalse(l.dataProvider().addFeatures([f1, f2]),
3251                             'Provider reported no AddFeatures capability, but returned true to addFeatures')
3252
3253    def testModifyPk(self):
3254        """ Check if we can modify a primary key value. Since this PK is bigint, we also exercise the mapping between fid and values """
3255
3256        vl = self.getEditableLayer()
3257        self.assertTrue(vl.isValid())
3258        geomwkt = 'Point(-47.945 -15.812)'
3259
3260        feature = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 4')))
3261        self.assertTrue(feature.isValid())
3262
3263        self.assertTrue(vl.startEditing())
3264        idxpk = vl.fields().lookupField('pk')
3265
3266        self.assertTrue(vl.dataProvider().changeFeatures({feature.id(): {idxpk: 42}}, {feature.id(): QgsGeometry.fromWkt(geomwkt)}))
3267        self.assertTrue(vl.commitChanges())
3268
3269        # read back
3270        ft = next(vl.getFeatures(QgsFeatureRequest().setFilterExpression('pk = 42')))
3271        self.assertTrue(ft.isValid())
3272        self.assertEqual(ft['name'], 'Honey')
3273        assert compareWkt(ft.geometry().asWkt(), geomwkt), "Geometry mismatch. Expected: {} Got: {}\n".format(ft.geometry().asWkt(), geomwkt)
3274
3275    def testDuplicatedFieldNamesInQueryLayers(self):
3276        """Test regresssion GH #36205"""
3277
3278        vl = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'__rid__\' table="(SELECT row_number() OVER () AS __rid__, * FROM (SELECT *  from qgis_test.some_poly_data a, qgis_test.some_poly_data b  where  ST_Intersects(a.geom,b.geom)) as foo)" sql=', 'test_36205', 'postgres')
3279        self.assertTrue(vl.isValid())
3280        self.assertEqual(vl.featureCount(), 3)
3281
3282        # This fails because the "geom" field and "pk" fields are ambiguous
3283        # There is no easy fix: all duplicated fields should be explicitly aliased
3284        # and the query internally rewritten
3285        # feature = next(vl.getFeatures())
3286        # self.assertTrue(vl.isValid())
3287
3288    def testUnrestrictedGeometryType(self):
3289        """Test geometry column with no explicit geometry type, regression GH #38565"""
3290
3291        md = QgsProviderRegistry.instance().providerMetadata("postgres")
3292        conn = md.createConnection(self.dbconn, {})
3293
3294        # Cleanup if needed
3295        try:
3296            conn.dropVectorTable('qgis_test', 'test_unrestricted_geometry')
3297        except QgsProviderConnectionException:
3298            pass
3299
3300        conn.executeSql('''
3301        CREATE TABLE "qgis_test"."test_unrestricted_geometry" (
3302            gid serial primary key,
3303            geom geometry(Geometry, 4326)
3304        );''')
3305
3306        points = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=POINT table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_points', 'postgres')
3307        lines = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=LINESTRING table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_lines', 'postgres')
3308        polygons = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' srid=4326 type=POLYGON table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_polygons', 'postgres')
3309
3310        self.assertTrue(points.isValid())
3311        self.assertTrue(lines.isValid())
3312        self.assertTrue(polygons.isValid())
3313
3314        f = QgsFeature(points.fields())
3315        f.setGeometry(QgsGeometry.fromWkt('point(9 45)'))
3316        self.assertTrue(points.dataProvider().addFeatures([f]))
3317        self.assertEqual(points.featureCount(), 1)
3318        self.assertEqual(lines.featureCount(), 0)
3319        self.assertEqual(polygons.featureCount(), 0)
3320
3321        # Fetch from iterator
3322        self.assertTrue(compareWkt(next(points.getFeatures()).geometry().asWkt(), 'point(9 45)'))
3323        with self.assertRaises(StopIteration):
3324            next(lines.getFeatures())
3325        with self.assertRaises(StopIteration):
3326            next(polygons.getFeatures())
3327
3328        f.setGeometry(QgsGeometry.fromWkt('linestring(9 45, 10 46)'))
3329        self.assertTrue(lines.dataProvider().addFeatures([f]))
3330        self.assertEqual(points.featureCount(), 1)
3331        self.assertEqual(lines.featureCount(), 1)
3332        self.assertEqual(polygons.featureCount(), 0)
3333
3334        # Fetch from iterator
3335        self.assertTrue(compareWkt(next(points.getFeatures()).geometry().asWkt(), 'point(9 45)'))
3336        self.assertTrue(compareWkt(next(lines.getFeatures()).geometry().asWkt(), 'linestring(9 45, 10 46)'))
3337        with self.assertRaises(StopIteration):
3338            next(polygons.getFeatures())
3339
3340        # Test regression GH #38567 (no SRID requested in the data source URI)
3341        # Cleanup if needed
3342        conn.executeSql('DELETE FROM "qgis_test"."test_unrestricted_geometry" WHERE \'t\'')
3343
3344        points = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=POINT table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_points', 'postgres')
3345        lines = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=LINESTRING table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_lines', 'postgres')
3346        polygons = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'gid\' type=POLYGON table="qgis_test"."test_unrestricted_geometry" (geom) sql=', 'test_polygons', 'postgres')
3347
3348        self.assertTrue(points.isValid())
3349        self.assertTrue(lines.isValid())
3350        self.assertTrue(polygons.isValid())
3351
3352    def test_read_wkb(self):
3353        """ Test to read WKB from Postgis. """
3354        md = QgsProviderRegistry.instance().providerMetadata("postgres")
3355        conn = md.createConnection(self.dbconn, {})
3356        results = conn.executeSql("SELECT ST_AsBinary(ST_MakePoint(5, 10));")
3357        wkb = results[0][0]
3358        geom = QgsGeometry()
3359        import binascii
3360        geom.fromWkb(binascii.unhexlify(wkb[2:]))
3361        self.assertEqual(geom.asWkt(), "Point (5 10)")
3362
3363    def testTrustFlag(self):
3364        """Test regression https://github.com/qgis/QGIS/issues/38809"""
3365
3366        vl = QgsVectorLayer(
3367            self.dbconn +
3368            ' sslmode=disable key=\'pk\' srid=4326 type=POINT table="qgis_test"."editData" (geom) sql=',
3369            'testTrustFlag', 'postgres')
3370
3371        self.assertTrue(vl.isValid())
3372
3373        p = QgsProject.instance()
3374        d = QTemporaryDir()
3375        dir_path = d.path()
3376
3377        self.assertTrue(p.addMapLayers([vl]))
3378        project_path = os.path.join(dir_path, 'testTrustFlag.qgs')
3379        self.assertTrue(p.write(project_path))
3380
3381        del vl
3382
3383        p.clear()
3384        self.assertTrue(p.read(project_path))
3385        vl = p.mapLayersByName('testTrustFlag')[0]
3386        self.assertTrue(vl.isValid())
3387        self.assertFalse(p.trustLayerMetadata())
3388
3389        # Set the trust flag
3390        p.setTrustLayerMetadata(True)
3391        self.assertTrue(p.write(project_path))
3392
3393        # Re-read
3394        p.clear()
3395        self.assertTrue(p.read(project_path))
3396        self.assertTrue(p.trustLayerMetadata())
3397        vl = p.mapLayersByName('testTrustFlag')[0]
3398        self.assertTrue(vl.isValid())
3399
3400    def testQueryLayerDuplicatedFields(self):
3401        """Test that duplicated fields from a query layer are returned"""
3402
3403        def _get_layer(sql):
3404            return QgsVectorLayer(
3405                self.dbconn +
3406                ' sslmode=disable key=\'__rid__\' table=\'(SELECT row_number() OVER () AS __rid__, * FROM (' + sql + ') as foo)\'  sql=',
3407                'test', 'postgres')
3408
3409        l = _get_layer('SELECT 1, 2')
3410        self.assertEqual(l.fields().count(), 3)
3411        self.assertEqual([f.name() for f in l.fields()], ['__rid__', '?column?', '?column? (2)'])
3412
3413        l = _get_layer('SELECT 1 as id, 2 as id')
3414        self.assertEqual(l.fields().count(), 3)
3415        self.assertEqual([f.name() for f in l.fields()], ['__rid__', 'id', 'id (2)'])
3416
3417    def testInsertOnlyFieldIsEditable(self):
3418        """Test issue #40922 when an INSERT only use cannot insert a new feature"""
3419
3420        md = QgsProviderRegistry.instance().providerMetadata("postgres")
3421        conn = md.createConnection(self.dbconn, {})
3422        conn.executeSql('DROP TABLE IF EXISTS public.insert_only_points')
3423        conn.executeSql('DROP USER IF EXISTS insert_only_user')
3424        conn.executeSql('CREATE USER insert_only_user WITH PASSWORD \'insert_only_user\'')
3425        conn.executeSql('CREATE TABLE insert_only_points (id SERIAL PRIMARY KEY, name VARCHAR(64))')
3426        conn.executeSql("SELECT AddGeometryColumn('public', 'insert_only_points', 'geom', 4326, 'POINT', 2 )")
3427        conn.executeSql('GRANT SELECT ON "public"."insert_only_points" TO insert_only_user')
3428
3429        uri = QgsDataSourceUri(self.dbconn +
3430                               ' sslmode=disable  key=\'id\'srid=4326 type=POINT table="public"."insert_only_points" (geom) sql=')
3431        uri.setUsername('insert_only_user')
3432        uri.setPassword('insert_only_user')
3433        vl = QgsVectorLayer(uri.uri(), 'test', 'postgres')
3434        self.assertTrue(vl.isValid())
3435        self.assertFalse(vl.startEditing())
3436
3437        feature = QgsFeature(vl.fields())
3438        self.assertFalse(QgsVectorLayerUtils.fieldIsEditable(vl, 0, feature))
3439        self.assertFalse(QgsVectorLayerUtils.fieldIsEditable(vl, 1, feature))
3440
3441        conn.executeSql('GRANT INSERT ON "public"."insert_only_points" TO insert_only_user')
3442        vl = QgsVectorLayer(uri.uri(), 'test', 'postgres')
3443
3444        feature = QgsFeature(vl.fields())
3445        self.assertTrue(vl.startEditing())
3446        self.assertTrue(QgsVectorLayerUtils.fieldIsEditable(vl, 0, feature))
3447        self.assertTrue(QgsVectorLayerUtils.fieldIsEditable(vl, 1, feature))
3448
3449    def testPkeyIntArray(self):
3450        """
3451        Test issue #42778 when pkey is an int array
3452        """
3453        md = QgsProviderRegistry.instance().providerMetadata("postgres")
3454        conn = md.createConnection(self.dbconn, {})
3455        conn.executeSql('DROP TABLE IF EXISTS public.test_pkey_intarray')
3456        conn.executeSql('CREATE TABLE public.test_pkey_intarray (id _int8 PRIMARY KEY, name VARCHAR(64))')
3457        conn.executeSql("""INSERT INTO public.test_pkey_intarray (id, name) VALUES('{0,0,19111815}', 'test')""")
3458
3459        uri = QgsDataSourceUri(self.dbconn +
3460                               ' sslmode=disable  key=\'id\' table="public"."test_pkey_intarray" sql=')
3461        vl = QgsVectorLayer(uri.uri(), 'test', 'postgres')
3462        self.assertTrue(vl.isValid())
3463
3464        feat = next(vl.getFeatures())
3465        self.assertTrue(feat.isValid())
3466        self.assertEqual(feat["name"], "test")
3467
3468        fid = feat.id()
3469        self.assertTrue(fid > 0)
3470
3471        feat = vl.getFeature(fid)
3472        self.assertTrue(feat.isValid())
3473        self.assertEqual(feat["name"], "test")
3474
3475    def testExportPkGuessLogic(self):
3476        """Test that when creating an empty layer a NOT NULL UNIQUE numeric field is identified as a PK"""
3477
3478        md = QgsProviderRegistry.instance().providerMetadata("postgres")
3479        conn = md.createConnection(self.dbconn, {})
3480        conn.executeSql(
3481            'DROP TABLE IF EXISTS qgis_test."testExportPkGuessLogic_source" CASCADE')
3482        conn.executeSql(
3483            'DROP TABLE IF EXISTS qgis_test."testExportPkGuessLogic_exported" CASCADE')
3484        conn.executeSql(
3485            """CREATE TABLE qgis_test."testExportPkGuessLogic_source" ( id bigint generated always as identity primary key,
3486                    geom geometry(Point, 4326) check (st_isvalid(geom)),
3487                    name text unique, author text not null)""")
3488
3489        source_layer = QgsVectorLayer(self.dbconn + ' sslmode=disable key=\'id\' srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_source" (geom) sql=', 'testExportPkGuessLogic_source', 'postgres')
3490
3491        md = QgsProviderRegistry.instance().providerMetadata('postgres')
3492        conn = md.createConnection(self.dbconn, {})
3493        table = conn.table("qgis_test", "testExportPkGuessLogic_source")
3494        self.assertEqual(table.primaryKeyColumns(), ['id'])
3495
3496        self.assertTrue(source_layer.isValid())
3497
3498        # Create the URI as the browser does (no PK information)
3499        uri = self.dbconn + ' sslmode=disable srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_exported" (geom) sql='
3500
3501        exporter = QgsVectorLayerExporter(uri, 'postgres', source_layer.fields(), source_layer.wkbType(), source_layer.crs(), True, {})
3502        self.assertTrue(exporter.lastError() == '')
3503
3504        exported_layer = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 type=POINT table="qgis_test"."testExportPkGuessLogic_exported" (geom) sql=', 'testExportPkGuessLogic_exported', 'postgres')
3505        self.assertTrue(exported_layer.isValid())
3506
3507        table = conn.table("qgis_test", "testExportPkGuessLogic_exported")
3508        self.assertEqual(table.primaryKeyColumns(), ['id'])
3509
3510        self.assertEqual(exported_layer.fields().names(), ['id', 'name', 'author'])
3511
3512
3513if __name__ == '__main__':
3514    unittest.main()
3515