1from contextlib import contextmanager
2import operator
3
4from sqlalchemy import CHAR
5from sqlalchemy import column
6from sqlalchemy import exc
7from sqlalchemy import exc as sa_exc
8from sqlalchemy import ForeignKey
9from sqlalchemy import func
10from sqlalchemy import INT
11from sqlalchemy import Integer
12from sqlalchemy import literal
13from sqlalchemy import literal_column
14from sqlalchemy import MetaData
15from sqlalchemy import select
16from sqlalchemy import sql
17from sqlalchemy import String
18from sqlalchemy import table
19from sqlalchemy import testing
20from sqlalchemy import text
21from sqlalchemy import type_coerce
22from sqlalchemy import TypeDecorator
23from sqlalchemy import util
24from sqlalchemy import VARCHAR
25from sqlalchemy.engine import default
26from sqlalchemy.engine import result as _result
27from sqlalchemy.testing import assert_raises
28from sqlalchemy.testing import assert_raises_message
29from sqlalchemy.testing import assertions
30from sqlalchemy.testing import engines
31from sqlalchemy.testing import eq_
32from sqlalchemy.testing import fixtures
33from sqlalchemy.testing import in_
34from sqlalchemy.testing import is_
35from sqlalchemy.testing import le_
36from sqlalchemy.testing import ne_
37from sqlalchemy.testing import not_in_
38from sqlalchemy.testing.mock import Mock
39from sqlalchemy.testing.mock import patch
40from sqlalchemy.testing.schema import Column
41from sqlalchemy.testing.schema import Table
42
43
44class ResultProxyTest(fixtures.TablesTest):
45    __backend__ = True
46
47    @classmethod
48    def define_tables(cls, metadata):
49        Table(
50            "users",
51            metadata,
52            Column(
53                "user_id", INT, primary_key=True, test_needs_autoincrement=True
54            ),
55            Column("user_name", VARCHAR(20)),
56            test_needs_acid=True,
57        )
58        Table(
59            "addresses",
60            metadata,
61            Column(
62                "address_id",
63                Integer,
64                primary_key=True,
65                test_needs_autoincrement=True,
66            ),
67            Column("user_id", Integer, ForeignKey("users.user_id")),
68            Column("address", String(30)),
69            test_needs_acid=True,
70        )
71
72        Table(
73            "users2",
74            metadata,
75            Column("user_id", INT, primary_key=True),
76            Column("user_name", VARCHAR(20)),
77            test_needs_acid=True,
78        )
79
80    def test_row_iteration(self):
81        users = self.tables.users
82
83        users.insert().execute(
84            {"user_id": 7, "user_name": "jack"},
85            {"user_id": 8, "user_name": "ed"},
86            {"user_id": 9, "user_name": "fred"},
87        )
88        r = users.select().execute()
89        rows = []
90        for row in r:
91            rows.append(row)
92        eq_(len(rows), 3)
93
94    def test_row_next(self):
95        users = self.tables.users
96
97        users.insert().execute(
98            {"user_id": 7, "user_name": "jack"},
99            {"user_id": 8, "user_name": "ed"},
100            {"user_id": 9, "user_name": "fred"},
101        )
102        r = users.select().execute()
103        rows = []
104        while True:
105            row = next(r, "foo")
106            if row == "foo":
107                break
108            rows.append(row)
109        eq_(len(rows), 3)
110
111    @testing.requires.subqueries
112    def test_anonymous_rows(self):
113        users = self.tables.users
114
115        users.insert().execute(
116            {"user_id": 7, "user_name": "jack"},
117            {"user_id": 8, "user_name": "ed"},
118            {"user_id": 9, "user_name": "fred"},
119        )
120
121        sel = (
122            select([users.c.user_id])
123            .where(users.c.user_name == "jack")
124            .as_scalar()
125        )
126        for row in select([sel + 1, sel + 3], bind=users.bind).execute():
127            eq_(row["anon_1"], 8)
128            eq_(row["anon_2"], 10)
129
130    def test_row_comparison(self):
131        users = self.tables.users
132
133        users.insert().execute(user_id=7, user_name="jack")
134        rp = users.select().execute().first()
135
136        eq_(rp, rp)
137        is_(not (rp != rp), True)
138
139        equal = (7, "jack")
140
141        eq_(rp, equal)
142        eq_(equal, rp)
143        is_((not (rp != equal)), True)
144        is_(not (equal != equal), True)
145
146        def endless():
147            while True:
148                yield 1
149
150        ne_(rp, endless())
151        ne_(endless(), rp)
152
153        # test that everything compares the same
154        # as it would against a tuple
155        for compare in [False, 8, endless(), "xyz", (7, "jack")]:
156            for op in [
157                operator.eq,
158                operator.ne,
159                operator.gt,
160                operator.lt,
161                operator.ge,
162                operator.le,
163            ]:
164
165                try:
166                    control = op(equal, compare)
167                except TypeError:
168                    # Py3K raises TypeError for some invalid comparisons
169                    assert_raises(TypeError, op, rp, compare)
170                else:
171                    eq_(control, op(rp, compare))
172
173                try:
174                    control = op(compare, equal)
175                except TypeError:
176                    # Py3K raises TypeError for some invalid comparisons
177                    assert_raises(TypeError, op, compare, rp)
178                else:
179                    eq_(control, op(compare, rp))
180
181    @testing.provide_metadata
182    def test_column_label_overlap_fallback(self):
183        content = Table("content", self.metadata, Column("type", String(30)))
184        bar = Table("bar", self.metadata, Column("content_type", String(30)))
185        self.metadata.create_all(testing.db)
186        testing.db.execute(content.insert().values(type="t1"))
187
188        row = testing.db.execute(content.select(use_labels=True)).first()
189        in_(content.c.type, row)
190        not_in_(bar.c.content_type, row)
191        in_(sql.column("content_type"), row)
192
193        row = testing.db.execute(
194            select([content.c.type.label("content_type")])
195        ).first()
196        in_(content.c.type, row)
197
198        not_in_(bar.c.content_type, row)
199
200        in_(sql.column("content_type"), row)
201
202        row = testing.db.execute(
203            select([func.now().label("content_type")])
204        ).first()
205        not_in_(content.c.type, row)
206
207        not_in_(bar.c.content_type, row)
208
209        in_(sql.column("content_type"), row)
210
211    def test_pickled_rows(self):
212        users = self.tables.users
213        addresses = self.tables.addresses
214
215        users.insert().execute(
216            {"user_id": 7, "user_name": "jack"},
217            {"user_id": 8, "user_name": "ed"},
218            {"user_id": 9, "user_name": "fred"},
219        )
220
221        for pickle in False, True:
222            for use_labels in False, True:
223                result = (
224                    users.select(use_labels=use_labels)
225                    .order_by(users.c.user_id)
226                    .execute()
227                    .fetchall()
228                )
229
230                if pickle:
231                    result = util.pickle.loads(util.pickle.dumps(result))
232
233                eq_(result, [(7, "jack"), (8, "ed"), (9, "fred")])
234                if use_labels:
235                    eq_(result[0]["users_user_id"], 7)
236                    eq_(
237                        list(result[0].keys()),
238                        ["users_user_id", "users_user_name"],
239                    )
240                else:
241                    eq_(result[0]["user_id"], 7)
242                    eq_(list(result[0].keys()), ["user_id", "user_name"])
243
244                eq_(result[0][0], 7)
245                eq_(result[0][users.c.user_id], 7)
246                eq_(result[0][users.c.user_name], "jack")
247
248                if not pickle or use_labels:
249                    assert_raises(
250                        exc.NoSuchColumnError,
251                        lambda: result[0][addresses.c.user_id],
252                    )
253                else:
254                    # test with a different table.  name resolution is
255                    # causing 'user_id' to match when use_labels wasn't used.
256                    eq_(result[0][addresses.c.user_id], 7)
257
258                assert_raises(
259                    exc.NoSuchColumnError, lambda: result[0]["fake key"]
260                )
261                assert_raises(
262                    exc.NoSuchColumnError,
263                    lambda: result[0][addresses.c.address_id],
264                )
265
266    def test_column_error_printing(self):
267        result = testing.db.execute(select([1]))
268        row = result.first()
269
270        class unprintable(object):
271            def __str__(self):
272                raise ValueError("nope")
273
274        msg = r"Could not locate column in row for column '%s'"
275
276        for accessor, repl in [
277            ("x", "x"),
278            (Column("q", Integer), "q"),
279            (Column("q", Integer) + 12, r"q \+ :q_1"),
280            (unprintable(), "unprintable element.*"),
281        ]:
282            assert_raises_message(
283                exc.NoSuchColumnError, msg % repl, result._getter, accessor
284            )
285
286            is_(result._getter(accessor, False), None)
287
288            assert_raises_message(
289                exc.NoSuchColumnError, msg % repl, lambda: row[accessor]
290            )
291
292    def test_fetchmany(self):
293        users = self.tables.users
294
295        users.insert().execute(user_id=7, user_name="jack")
296        users.insert().execute(user_id=8, user_name="ed")
297        users.insert().execute(user_id=9, user_name="fred")
298        r = users.select().execute()
299        rows = []
300        for row in r.fetchmany(size=2):
301            rows.append(row)
302        eq_(len(rows), 2)
303
304    def test_column_slices(self):
305        users = self.tables.users
306        addresses = self.tables.addresses
307
308        users.insert().execute(user_id=1, user_name="john")
309        users.insert().execute(user_id=2, user_name="jack")
310        addresses.insert().execute(
311            address_id=1, user_id=2, address="foo@bar.com"
312        )
313
314        r = text("select * from addresses", bind=testing.db).execute().first()
315        eq_(r[0:1], (1,))
316        eq_(r[1:], (2, "foo@bar.com"))
317        eq_(r[:-1], (1, 2))
318
319    def test_column_accessor_basic_compiled(self):
320        users = self.tables.users
321
322        users.insert().execute(
323            dict(user_id=1, user_name="john"),
324            dict(user_id=2, user_name="jack"),
325        )
326
327        r = users.select(users.c.user_id == 2).execute().first()
328        eq_(r.user_id, 2)
329        eq_(r["user_id"], 2)
330        eq_(r[users.c.user_id], 2)
331
332        eq_(r.user_name, "jack")
333        eq_(r["user_name"], "jack")
334        eq_(r[users.c.user_name], "jack")
335
336    def test_column_accessor_basic_text(self):
337        users = self.tables.users
338
339        users.insert().execute(
340            dict(user_id=1, user_name="john"),
341            dict(user_id=2, user_name="jack"),
342        )
343        r = testing.db.execute(
344            text("select * from users where user_id=2")
345        ).first()
346
347        eq_(r.user_id, 2)
348        eq_(r["user_id"], 2)
349        eq_(r[users.c.user_id], 2)
350
351        eq_(r.user_name, "jack")
352        eq_(r["user_name"], "jack")
353        eq_(r[users.c.user_name], "jack")
354
355    def test_column_accessor_textual_select(self):
356        users = self.tables.users
357
358        users.insert().execute(
359            dict(user_id=1, user_name="john"),
360            dict(user_id=2, user_name="jack"),
361        )
362        # this will create column() objects inside
363        # the select(), these need to match on name anyway
364        r = testing.db.execute(
365            select([column("user_id"), column("user_name")])
366            .select_from(table("users"))
367            .where(text("user_id=2"))
368        ).first()
369
370        eq_(r.user_id, 2)
371        eq_(r["user_id"], 2)
372        eq_(r[users.c.user_id], 2)
373
374        eq_(r.user_name, "jack")
375        eq_(r["user_name"], "jack")
376        eq_(r[users.c.user_name], "jack")
377
378    def test_column_accessor_dotted_union(self):
379        users = self.tables.users
380
381        users.insert().execute(dict(user_id=1, user_name="john"))
382
383        # test a little sqlite < 3.10.0 weirdness - with the UNION,
384        # cols come back as "users.user_id" in cursor.description
385        r = testing.db.execute(
386            text(
387                "select users.user_id, users.user_name "
388                "from users "
389                "UNION select users.user_id, "
390                "users.user_name from users"
391            )
392        ).first()
393        eq_(r["user_id"], 1)
394        eq_(r["user_name"], "john")
395        eq_(list(r.keys()), ["user_id", "user_name"])
396
397    def test_column_accessor_sqlite_raw(self):
398        users = self.tables.users
399
400        users.insert().execute(dict(user_id=1, user_name="john"))
401
402        r = (
403            text(
404                "select users.user_id, users.user_name "
405                "from users "
406                "UNION select users.user_id, "
407                "users.user_name from users",
408                bind=testing.db,
409            )
410            .execution_options(sqlite_raw_colnames=True)
411            .execute()
412            .first()
413        )
414
415        if testing.against("sqlite < 3.10.0"):
416            not_in_("user_id", r)
417            not_in_("user_name", r)
418            eq_(r["users.user_id"], 1)
419            eq_(r["users.user_name"], "john")
420
421            eq_(list(r.keys()), ["users.user_id", "users.user_name"])
422        else:
423            not_in_("users.user_id", r)
424            not_in_("users.user_name", r)
425            eq_(r["user_id"], 1)
426            eq_(r["user_name"], "john")
427
428            eq_(list(r.keys()), ["user_id", "user_name"])
429
430    def test_column_accessor_sqlite_translated(self):
431        users = self.tables.users
432
433        users.insert().execute(dict(user_id=1, user_name="john"))
434
435        r = (
436            text(
437                "select users.user_id, users.user_name "
438                "from users "
439                "UNION select users.user_id, "
440                "users.user_name from users",
441                bind=testing.db,
442            )
443            .execute()
444            .first()
445        )
446        eq_(r["user_id"], 1)
447        eq_(r["user_name"], "john")
448
449        if testing.against("sqlite < 3.10.0"):
450            eq_(r["users.user_id"], 1)
451            eq_(r["users.user_name"], "john")
452        else:
453            not_in_("users.user_id", r)
454            not_in_("users.user_name", r)
455
456        eq_(list(r.keys()), ["user_id", "user_name"])
457
458    def test_column_accessor_labels_w_dots(self):
459        users = self.tables.users
460
461        users.insert().execute(dict(user_id=1, user_name="john"))
462        # test using literal tablename.colname
463        r = (
464            text(
465                'select users.user_id AS "users.user_id", '
466                'users.user_name AS "users.user_name" '
467                "from users",
468                bind=testing.db,
469            )
470            .execution_options(sqlite_raw_colnames=True)
471            .execute()
472            .first()
473        )
474        eq_(r["users.user_id"], 1)
475        eq_(r["users.user_name"], "john")
476        not_in_("user_name", r)
477        eq_(list(r.keys()), ["users.user_id", "users.user_name"])
478
479    def test_column_accessor_unary(self):
480        users = self.tables.users
481
482        users.insert().execute(dict(user_id=1, user_name="john"))
483
484        # unary expressions
485        r = (
486            select([users.c.user_name.distinct()])
487            .order_by(users.c.user_name)
488            .execute()
489            .first()
490        )
491        eq_(r[users.c.user_name], "john")
492        eq_(r.user_name, "john")
493
494    def test_column_accessor_err(self):
495        r = testing.db.execute(select([1])).first()
496        assert_raises_message(
497            AttributeError,
498            "Could not locate column in row for column 'foo'",
499            getattr,
500            r,
501            "foo",
502        )
503        assert_raises_message(
504            KeyError,
505            "Could not locate column in row for column 'foo'",
506            lambda: r["foo"],
507        )
508
509    def test_graceful_fetch_on_non_rows(self):
510        """test that calling fetchone() etc. on a result that doesn't
511        return rows fails gracefully.
512
513        """
514
515        # these proxies don't work with no cursor.description present.
516        # so they don't apply to this test at the moment.
517        # result.FullyBufferedResultProxy,
518        # result.BufferedRowResultProxy,
519        # result.BufferedColumnResultProxy
520
521        users = self.tables.users
522
523        conn = testing.db.connect()
524        for meth in [
525            lambda r: r.fetchone(),
526            lambda r: r.fetchall(),
527            lambda r: r.first(),
528            lambda r: r.scalar(),
529            lambda r: r.fetchmany(),
530            lambda r: r._getter("user"),
531            lambda r: r._has_key("user"),
532        ]:
533            trans = conn.begin()
534            result = conn.execute(users.insert(), user_id=1)
535            assert_raises_message(
536                exc.ResourceClosedError,
537                "This result object does not return rows. "
538                "It has been closed automatically.",
539                meth,
540                result,
541            )
542            trans.rollback()
543
544    def test_fetchone_til_end(self):
545        result = testing.db.execute("select * from users")
546        eq_(result.fetchone(), None)
547        eq_(result.fetchone(), None)
548        eq_(result.fetchone(), None)
549        result.close()
550        assert_raises_message(
551            exc.ResourceClosedError,
552            "This result object is closed.",
553            result.fetchone,
554        )
555
556    def test_connectionless_autoclose_rows_exhausted(self):
557        users = self.tables.users
558        users.insert().execute(dict(user_id=1, user_name="john"))
559
560        result = testing.db.execute("select * from users")
561        connection = result.connection
562        assert not connection.closed
563        eq_(result.fetchone(), (1, "john"))
564        assert not connection.closed
565        eq_(result.fetchone(), None)
566        assert connection.closed
567
568    @testing.requires.returning
569    def test_connectionless_autoclose_crud_rows_exhausted(self):
570        users = self.tables.users
571        stmt = (
572            users.insert()
573            .values(user_id=1, user_name="john")
574            .returning(users.c.user_id)
575        )
576        result = testing.db.execute(stmt)
577        connection = result.connection
578        assert not connection.closed
579        eq_(result.fetchone(), (1,))
580        assert not connection.closed
581        eq_(result.fetchone(), None)
582        assert connection.closed
583
584    def test_connectionless_autoclose_no_rows(self):
585        result = testing.db.execute("select * from users")
586        connection = result.connection
587        assert not connection.closed
588        eq_(result.fetchone(), None)
589        assert connection.closed
590
591    @testing.requires.updateable_autoincrement_pks
592    def test_connectionless_autoclose_no_metadata(self):
593        result = testing.db.execute("update users set user_id=5")
594        connection = result.connection
595        assert connection.closed
596        assert_raises_message(
597            exc.ResourceClosedError,
598            "This result object does not return rows.",
599            result.fetchone,
600        )
601
602    def test_row_case_sensitive(self):
603        row = testing.db.execute(
604            select(
605                [
606                    literal_column("1").label("case_insensitive"),
607                    literal_column("2").label("CaseSensitive"),
608                ]
609            )
610        ).first()
611
612        eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
613
614        in_("case_insensitive", row._keymap)
615        in_("CaseSensitive", row._keymap)
616        not_in_("casesensitive", row._keymap)
617
618        eq_(row["case_insensitive"], 1)
619        eq_(row["CaseSensitive"], 2)
620
621        assert_raises(KeyError, lambda: row["Case_insensitive"])
622        assert_raises(KeyError, lambda: row["casesensitive"])
623
624    def test_row_case_sensitive_unoptimized(self):
625        ins_db = engines.testing_engine(options={"case_sensitive": True})
626        row = ins_db.execute(
627            select(
628                [
629                    literal_column("1").label("case_insensitive"),
630                    literal_column("2").label("CaseSensitive"),
631                    text("3 AS screw_up_the_cols"),
632                ]
633            )
634        ).first()
635
636        eq_(
637            list(row.keys()),
638            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
639        )
640
641        in_("case_insensitive", row._keymap)
642        in_("CaseSensitive", row._keymap)
643        not_in_("casesensitive", row._keymap)
644
645        eq_(row["case_insensitive"], 1)
646        eq_(row["CaseSensitive"], 2)
647        eq_(row["screw_up_the_cols"], 3)
648
649        assert_raises(KeyError, lambda: row["Case_insensitive"])
650        assert_raises(KeyError, lambda: row["casesensitive"])
651        assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
652
653    def test_row_case_insensitive(self):
654        ins_db = engines.testing_engine(options={"case_sensitive": False})
655        row = ins_db.execute(
656            select(
657                [
658                    literal_column("1").label("case_insensitive"),
659                    literal_column("2").label("CaseSensitive"),
660                ]
661            )
662        ).first()
663
664        eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
665
666        in_("case_insensitive", row._keymap)
667        in_("CaseSensitive", row._keymap)
668        in_("casesensitive", row._keymap)
669
670        eq_(row["case_insensitive"], 1)
671        eq_(row["CaseSensitive"], 2)
672        eq_(row["Case_insensitive"], 1)
673        eq_(row["casesensitive"], 2)
674
675    def test_row_case_insensitive_unoptimized(self):
676        ins_db = engines.testing_engine(options={"case_sensitive": False})
677        row = ins_db.execute(
678            select(
679                [
680                    literal_column("1").label("case_insensitive"),
681                    literal_column("2").label("CaseSensitive"),
682                    text("3 AS screw_up_the_cols"),
683                ]
684            )
685        ).first()
686
687        eq_(
688            list(row.keys()),
689            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
690        )
691
692        in_("case_insensitive", row._keymap)
693        in_("CaseSensitive", row._keymap)
694        in_("casesensitive", row._keymap)
695
696        eq_(row["case_insensitive"], 1)
697        eq_(row["CaseSensitive"], 2)
698        eq_(row["screw_up_the_cols"], 3)
699        eq_(row["Case_insensitive"], 1)
700        eq_(row["casesensitive"], 2)
701        eq_(row["screw_UP_the_cols"], 3)
702
703    def test_row_as_args(self):
704        users = self.tables.users
705
706        users.insert().execute(user_id=1, user_name="john")
707        r = users.select(users.c.user_id == 1).execute().first()
708        users.delete().execute()
709        users.insert().execute(r)
710        eq_(users.select().execute().fetchall(), [(1, "john")])
711
712    def test_result_as_args(self):
713        users = self.tables.users
714        users2 = self.tables.users2
715
716        users.insert().execute(
717            [
718                dict(user_id=1, user_name="john"),
719                dict(user_id=2, user_name="ed"),
720            ]
721        )
722        r = users.select().execute()
723        users2.insert().execute(list(r))
724        eq_(
725            users2.select().order_by(users2.c.user_id).execute().fetchall(),
726            [(1, "john"), (2, "ed")],
727        )
728
729        users2.delete().execute()
730        r = users.select().execute()
731        users2.insert().execute(*list(r))
732        eq_(
733            users2.select().order_by(users2.c.user_id).execute().fetchall(),
734            [(1, "john"), (2, "ed")],
735        )
736
737    @testing.requires.duplicate_names_in_cursor_description
738    def test_ambiguous_column(self):
739        users = self.tables.users
740        addresses = self.tables.addresses
741
742        users.insert().execute(user_id=1, user_name="john")
743        result = users.outerjoin(addresses).select().execute()
744        r = result.first()
745
746        assert_raises_message(
747            exc.InvalidRequestError,
748            "Ambiguous column name",
749            lambda: r["user_id"],
750        )
751
752        # pure positional targeting; users.c.user_id
753        # and addresses.c.user_id are known!
754        # works as of 1.1 issue #3501
755        eq_(r[users.c.user_id], 1)
756        eq_(r[addresses.c.user_id], None)
757
758        # try to trick it - fake_table isn't in the result!
759        # we get the correct error
760        fake_table = Table("fake", MetaData(), Column("user_id", Integer))
761        assert_raises_message(
762            exc.InvalidRequestError,
763            "Could not locate column in row for column 'fake.user_id'",
764            lambda: r[fake_table.c.user_id],
765        )
766
767        r = util.pickle.loads(util.pickle.dumps(r))
768        assert_raises_message(
769            exc.InvalidRequestError,
770            "Ambiguous column name",
771            lambda: r["user_id"],
772        )
773
774        result = users.outerjoin(addresses).select().execute()
775        result = _result.BufferedColumnResultProxy(result.context)
776        r = result.first()
777        assert isinstance(r, _result.BufferedColumnRow)
778        assert_raises_message(
779            exc.InvalidRequestError,
780            "Ambiguous column name",
781            lambda: r["user_id"],
782        )
783
784    @testing.requires.duplicate_names_in_cursor_description
785    def test_ambiguous_column_by_col(self):
786        users = self.tables.users
787
788        users.insert().execute(user_id=1, user_name="john")
789        ua = users.alias()
790        u2 = users.alias()
791        result = select([users.c.user_id, ua.c.user_id]).execute()
792        row = result.first()
793
794        # as of 1.1 issue #3501, we use pure positional
795        # targeting for the column objects here
796        eq_(row[users.c.user_id], 1)
797
798        eq_(row[ua.c.user_id], 1)
799
800        # this now works as of 1.1 issue #3501;
801        # previously this was stuck on "ambiguous column name"
802        assert_raises_message(
803            exc.InvalidRequestError,
804            "Could not locate column in row",
805            lambda: row[u2.c.user_id],
806        )
807
808    @testing.requires.duplicate_names_in_cursor_description
809    def test_ambiguous_column_case_sensitive(self):
810        eng = engines.testing_engine(options=dict(case_sensitive=False))
811
812        row = eng.execute(
813            select(
814                [
815                    literal_column("1").label("SOMECOL"),
816                    literal_column("1").label("SOMECOL"),
817                ]
818            )
819        ).first()
820
821        assert_raises_message(
822            exc.InvalidRequestError,
823            "Ambiguous column name",
824            lambda: row["somecol"],
825        )
826
827    @testing.requires.duplicate_names_in_cursor_description
828    def test_ambiguous_column_contains(self):
829        users = self.tables.users
830        addresses = self.tables.addresses
831
832        # ticket 2702.  in 0.7 we'd get True, False.
833        # in 0.8, both columns are present so it's True;
834        # but when they're fetched you'll get the ambiguous error.
835        users.insert().execute(user_id=1, user_name="john")
836        result = (
837            select([users.c.user_id, addresses.c.user_id])
838            .select_from(users.outerjoin(addresses))
839            .execute()
840        )
841        row = result.first()
842
843        eq_(
844            set([users.c.user_id in row, addresses.c.user_id in row]),
845            set([True]),
846        )
847
848    def test_ambiguous_column_by_col_plus_label(self):
849        users = self.tables.users
850
851        users.insert().execute(user_id=1, user_name="john")
852        result = select(
853            [
854                users.c.user_id,
855                type_coerce(users.c.user_id, Integer).label("foo"),
856            ]
857        ).execute()
858        row = result.first()
859        eq_(row[users.c.user_id], 1)
860        eq_(row[1], 1)
861
862    def test_fetch_partial_result_map(self):
863        users = self.tables.users
864
865        users.insert().execute(user_id=7, user_name="ed")
866
867        t = text("select * from users").columns(user_name=String())
868        eq_(testing.db.execute(t).fetchall(), [(7, "ed")])
869
870    def test_fetch_unordered_result_map(self):
871        users = self.tables.users
872
873        users.insert().execute(user_id=7, user_name="ed")
874
875        class Goofy1(TypeDecorator):
876            impl = String
877
878            def process_result_value(self, value, dialect):
879                return value + "a"
880
881        class Goofy2(TypeDecorator):
882            impl = String
883
884            def process_result_value(self, value, dialect):
885                return value + "b"
886
887        class Goofy3(TypeDecorator):
888            impl = String
889
890            def process_result_value(self, value, dialect):
891                return value + "c"
892
893        t = text(
894            "select user_name as a, user_name as b, "
895            "user_name as c from users"
896        ).columns(a=Goofy1(), b=Goofy2(), c=Goofy3())
897        eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")])
898
899    @testing.requires.subqueries
900    def test_column_label_targeting(self):
901        users = self.tables.users
902
903        users.insert().execute(user_id=7, user_name="ed")
904
905        for s in (
906            users.select().alias("foo"),
907            users.select().alias(users.name),
908        ):
909            row = s.select(use_labels=True).execute().first()
910            eq_(row[s.c.user_id], 7)
911            eq_(row[s.c.user_name], "ed")
912
913    def test_keys(self):
914        users = self.tables.users
915
916        users.insert().execute(user_id=1, user_name="foo")
917        result = users.select().execute()
918        eq_(result.keys(), ["user_id", "user_name"])
919        row = result.first()
920        eq_(row.keys(), ["user_id", "user_name"])
921
922    def test_keys_anon_labels(self):
923        """test [ticket:3483]"""
924
925        users = self.tables.users
926
927        users.insert().execute(user_id=1, user_name="foo")
928        result = testing.db.execute(
929            select(
930                [
931                    users.c.user_id,
932                    users.c.user_name.label(None),
933                    func.count(literal_column("1")),
934                ]
935            ).group_by(users.c.user_id, users.c.user_name)
936        )
937
938        eq_(result.keys(), ["user_id", "user_name_1", "count_1"])
939        row = result.first()
940        eq_(row.keys(), ["user_id", "user_name_1", "count_1"])
941
942    def test_items(self):
943        users = self.tables.users
944
945        users.insert().execute(user_id=1, user_name="foo")
946        r = users.select().execute().first()
947        eq_(
948            [(x[0].lower(), x[1]) for x in list(r.items())],
949            [("user_id", 1), ("user_name", "foo")],
950        )
951
952    def test_len(self):
953        users = self.tables.users
954
955        users.insert().execute(user_id=1, user_name="foo")
956        r = users.select().execute().first()
957        eq_(len(r), 2)
958
959        r = testing.db.execute("select user_name, user_id from users").first()
960        eq_(len(r), 2)
961        r = testing.db.execute("select user_name from users").first()
962        eq_(len(r), 1)
963
964    def test_sorting_in_python(self):
965        users = self.tables.users
966
967        users.insert().execute(
968            dict(user_id=1, user_name="foo"),
969            dict(user_id=2, user_name="bar"),
970            dict(user_id=3, user_name="def"),
971        )
972
973        rows = users.select().order_by(users.c.user_name).execute().fetchall()
974
975        eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")])
976
977        eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")])
978
979    def test_column_order_with_simple_query(self):
980        # should return values in column definition order
981        users = self.tables.users
982
983        users.insert().execute(user_id=1, user_name="foo")
984        r = users.select(users.c.user_id == 1).execute().first()
985        eq_(r[0], 1)
986        eq_(r[1], "foo")
987        eq_([x.lower() for x in list(r.keys())], ["user_id", "user_name"])
988        eq_(list(r.values()), [1, "foo"])
989
990    def test_column_order_with_text_query(self):
991        # should return values in query order
992        users = self.tables.users
993
994        users.insert().execute(user_id=1, user_name="foo")
995        r = testing.db.execute("select user_name, user_id from users").first()
996        eq_(r[0], "foo")
997        eq_(r[1], 1)
998        eq_([x.lower() for x in list(r.keys())], ["user_name", "user_id"])
999        eq_(list(r.values()), ["foo", 1])
1000
1001    @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()")
1002    @testing.crashes("firebird", "An identifier must begin with a letter")
1003    @testing.provide_metadata
1004    def test_column_accessor_shadow(self):
1005        shadowed = Table(
1006            "test_shadowed",
1007            self.metadata,
1008            Column("shadow_id", INT, primary_key=True),
1009            Column("shadow_name", VARCHAR(20)),
1010            Column("parent", VARCHAR(20)),
1011            Column("row", VARCHAR(40)),
1012            Column("_parent", VARCHAR(20)),
1013            Column("_row", VARCHAR(20)),
1014        )
1015        self.metadata.create_all()
1016        shadowed.insert().execute(
1017            shadow_id=1,
1018            shadow_name="The Shadow",
1019            parent="The Light",
1020            row="Without light there is no shadow",
1021            _parent="Hidden parent",
1022            _row="Hidden row",
1023        )
1024        r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
1025
1026        eq_(r.shadow_id, 1)
1027        eq_(r["shadow_id"], 1)
1028        eq_(r[shadowed.c.shadow_id], 1)
1029
1030        eq_(r.shadow_name, "The Shadow")
1031        eq_(r["shadow_name"], "The Shadow")
1032        eq_(r[shadowed.c.shadow_name], "The Shadow")
1033
1034        eq_(r.parent, "The Light")
1035        eq_(r["parent"], "The Light")
1036        eq_(r[shadowed.c.parent], "The Light")
1037
1038        eq_(r.row, "Without light there is no shadow")
1039        eq_(r["row"], "Without light there is no shadow")
1040        eq_(r[shadowed.c.row], "Without light there is no shadow")
1041
1042        eq_(r["_parent"], "Hidden parent")
1043        eq_(r["_row"], "Hidden row")
1044
1045    def test_nontuple_row(self):
1046        """ensure the C version of BaseRowProxy handles
1047        duck-type-dependent rows."""
1048
1049        from sqlalchemy.engine import RowProxy
1050
1051        class MyList(object):
1052            def __init__(self, data):
1053                self.internal_list = data
1054
1055            def __len__(self):
1056                return len(self.internal_list)
1057
1058            def __getitem__(self, i):
1059                return list.__getitem__(self.internal_list, i)
1060
1061        proxy = RowProxy(
1062            object(),
1063            MyList(["value"]),
1064            [None],
1065            {"key": (None, None, 0), 0: (None, None, 0)},
1066        )
1067        eq_(list(proxy), ["value"])
1068        eq_(proxy[0], "value")
1069        eq_(proxy["key"], "value")
1070
1071    @testing.provide_metadata
1072    def test_no_rowcount_on_selects_inserts(self):
1073        """assert that rowcount is only called on deletes and updates.
1074
1075        This because cursor.rowcount may can be expensive on some dialects
1076        such as Firebird, however many dialects require it be called
1077        before the cursor is closed.
1078
1079        """
1080
1081        metadata = self.metadata
1082
1083        engine = engines.testing_engine()
1084
1085        t = Table("t1", metadata, Column("data", String(10)))
1086        metadata.create_all(engine)
1087
1088        with patch.object(
1089            engine.dialect.execution_ctx_cls, "rowcount"
1090        ) as mock_rowcount:
1091            mock_rowcount.__get__ = Mock()
1092            engine.execute(
1093                t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
1094            )
1095
1096            eq_(len(mock_rowcount.__get__.mock_calls), 0)
1097
1098            eq_(
1099                engine.execute(t.select()).fetchall(),
1100                [("d1",), ("d2",), ("d3",)],
1101            )
1102            eq_(len(mock_rowcount.__get__.mock_calls), 0)
1103
1104            engine.execute(t.update(), {"data": "d4"})
1105
1106            eq_(len(mock_rowcount.__get__.mock_calls), 1)
1107
1108            engine.execute(t.delete())
1109            eq_(len(mock_rowcount.__get__.mock_calls), 2)
1110
1111    def test_rowproxy_is_sequence(self):
1112        from sqlalchemy.util import collections_abc
1113        from sqlalchemy.engine import RowProxy
1114
1115        row = RowProxy(
1116            object(),
1117            ["value"],
1118            [None],
1119            {"key": (None, None, 0), 0: (None, None, 0)},
1120        )
1121        assert isinstance(row, collections_abc.Sequence)
1122
1123    @testing.provide_metadata
1124    def test_rowproxy_getitem_indexes_compiled(self):
1125        values = Table(
1126            "rp",
1127            self.metadata,
1128            Column("key", String(10), primary_key=True),
1129            Column("value", String(10)),
1130        )
1131        values.create()
1132
1133        testing.db.execute(values.insert(), dict(key="One", value="Uno"))
1134        row = testing.db.execute(values.select()).first()
1135        eq_(row["key"], "One")
1136        eq_(row["value"], "Uno")
1137        eq_(row[0], "One")
1138        eq_(row[1], "Uno")
1139        eq_(row[-2], "One")
1140        eq_(row[-1], "Uno")
1141        eq_(row[1:0:-1], ("Uno",))
1142
1143    @testing.only_on("sqlite")
1144    def test_rowproxy_getitem_indexes_raw(self):
1145        row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
1146        eq_(row["key"], "One")
1147        eq_(row["value"], "Uno")
1148        eq_(row[0], "One")
1149        eq_(row[1], "Uno")
1150        eq_(row[-2], "One")
1151        eq_(row[-1], "Uno")
1152        eq_(row[1:0:-1], ("Uno",))
1153
1154    @testing.requires.cextensions
1155    def test_row_c_sequence_check(self):
1156        import csv
1157
1158        metadata = MetaData()
1159        metadata.bind = "sqlite://"
1160        users = Table(
1161            "users",
1162            metadata,
1163            Column("id", Integer, primary_key=True),
1164            Column("name", String(40)),
1165        )
1166        users.create()
1167
1168        users.insert().execute(name="Test")
1169        row = users.select().execute().fetchone()
1170
1171        s = util.StringIO()
1172        writer = csv.writer(s)
1173        # csv performs PySequenceCheck call
1174        writer.writerow(row)
1175        assert s.getvalue().strip() == "1,Test"
1176
1177    @testing.requires.selectone
1178    def test_empty_accessors(self):
1179        statements = [
1180            (
1181                "select 1",
1182                [
1183                    lambda r: r.last_inserted_params(),
1184                    lambda r: r.last_updated_params(),
1185                    lambda r: r.prefetch_cols(),
1186                    lambda r: r.postfetch_cols(),
1187                    lambda r: r.inserted_primary_key,
1188                ],
1189                "Statement is not a compiled expression construct.",
1190            ),
1191            (
1192                select([1]),
1193                [
1194                    lambda r: r.last_inserted_params(),
1195                    lambda r: r.inserted_primary_key,
1196                ],
1197                r"Statement is not an insert\(\) expression construct.",
1198            ),
1199            (
1200                select([1]),
1201                [lambda r: r.last_updated_params()],
1202                r"Statement is not an update\(\) expression construct.",
1203            ),
1204            (
1205                select([1]),
1206                [lambda r: r.prefetch_cols(), lambda r: r.postfetch_cols()],
1207                r"Statement is not an insert\(\) "
1208                r"or update\(\) expression construct.",
1209            ),
1210        ]
1211
1212        for stmt, meths, msg in statements:
1213            r = testing.db.execute(stmt)
1214            try:
1215                for meth in meths:
1216                    assert_raises_message(
1217                        sa_exc.InvalidRequestError, msg, meth, r
1218                    )
1219
1220            finally:
1221                r.close()
1222
1223
1224class KeyTargetingTest(fixtures.TablesTest):
1225    run_inserts = "once"
1226    run_deletes = None
1227    __backend__ = True
1228
1229    @classmethod
1230    def define_tables(cls, metadata):
1231        Table(
1232            "keyed1",
1233            metadata,
1234            Column("a", CHAR(2), key="b"),
1235            Column("c", CHAR(2), key="q"),
1236        )
1237        Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
1238        Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
1239        Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
1240        Table("content", metadata, Column("t", String(30), key="type"))
1241        Table("bar", metadata, Column("ctype", String(30), key="content_type"))
1242
1243        if testing.requires.schemas.enabled:
1244            Table(
1245                "wschema",
1246                metadata,
1247                Column("a", CHAR(2), key="b"),
1248                Column("c", CHAR(2), key="q"),
1249                schema=testing.config.test_schema,
1250            )
1251
1252    @classmethod
1253    def insert_data(cls):
1254        cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
1255        cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
1256        cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
1257        cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
1258        cls.tables.content.insert().execute(type="t1")
1259
1260        if testing.requires.schemas.enabled:
1261            cls.tables[
1262                "%s.wschema" % testing.config.test_schema
1263            ].insert().execute(dict(b="a1", q="c1"))
1264
1265    @testing.requires.schemas
1266    def test_keyed_accessor_wschema(self):
1267        keyed1 = self.tables["%s.wschema" % testing.config.test_schema]
1268        row = testing.db.execute(keyed1.select()).first()
1269
1270        eq_(row.b, "a1")
1271        eq_(row.q, "c1")
1272        eq_(row.a, "a1")
1273        eq_(row.c, "c1")
1274
1275    def test_keyed_accessor_single(self):
1276        keyed1 = self.tables.keyed1
1277        row = testing.db.execute(keyed1.select()).first()
1278
1279        eq_(row.b, "a1")
1280        eq_(row.q, "c1")
1281        eq_(row.a, "a1")
1282        eq_(row.c, "c1")
1283
1284    def test_keyed_accessor_single_labeled(self):
1285        keyed1 = self.tables.keyed1
1286        row = testing.db.execute(keyed1.select().apply_labels()).first()
1287
1288        eq_(row.keyed1_b, "a1")
1289        eq_(row.keyed1_q, "c1")
1290        eq_(row.keyed1_a, "a1")
1291        eq_(row.keyed1_c, "c1")
1292
1293    @testing.requires.duplicate_names_in_cursor_description
1294    def test_keyed_accessor_composite_conflict_2(self):
1295        keyed1 = self.tables.keyed1
1296        keyed2 = self.tables.keyed2
1297
1298        row = testing.db.execute(select([keyed1, keyed2])).first()
1299        # row.b is unambiguous
1300        eq_(row.b, "b2")
1301        # row.a is ambiguous
1302        assert_raises_message(
1303            exc.InvalidRequestError, "Ambig", getattr, row, "a"
1304        )
1305
1306    def test_keyed_accessor_composite_names_precedent(self):
1307        keyed1 = self.tables.keyed1
1308        keyed4 = self.tables.keyed4
1309
1310        row = testing.db.execute(select([keyed1, keyed4])).first()
1311        eq_(row.b, "b4")
1312        eq_(row.q, "q4")
1313        eq_(row.a, "a1")
1314        eq_(row.c, "c1")
1315
1316    @testing.requires.duplicate_names_in_cursor_description
1317    def test_keyed_accessor_composite_keys_precedent(self):
1318        keyed1 = self.tables.keyed1
1319        keyed3 = self.tables.keyed3
1320
1321        row = testing.db.execute(select([keyed1, keyed3])).first()
1322        eq_(row.q, "c1")
1323        assert_raises_message(
1324            exc.InvalidRequestError,
1325            "Ambiguous column name 'a'",
1326            getattr,
1327            row,
1328            "b",
1329        )
1330        assert_raises_message(
1331            exc.InvalidRequestError,
1332            "Ambiguous column name 'a'",
1333            getattr,
1334            row,
1335            "a",
1336        )
1337        eq_(row.d, "d3")
1338
1339    def test_keyed_accessor_composite_labeled(self):
1340        keyed1 = self.tables.keyed1
1341        keyed2 = self.tables.keyed2
1342
1343        row = testing.db.execute(
1344            select([keyed1, keyed2]).apply_labels()
1345        ).first()
1346        eq_(row.keyed1_b, "a1")
1347        eq_(row.keyed1_a, "a1")
1348        eq_(row.keyed1_q, "c1")
1349        eq_(row.keyed1_c, "c1")
1350        eq_(row.keyed2_a, "a2")
1351        eq_(row.keyed2_b, "b2")
1352        assert_raises(KeyError, lambda: row["keyed2_c"])
1353        assert_raises(KeyError, lambda: row["keyed2_q"])
1354
1355    def test_column_label_overlap_fallback(self):
1356        content, bar = self.tables.content, self.tables.bar
1357        row = testing.db.execute(
1358            select([content.c.type.label("content_type")])
1359        ).first()
1360
1361        not_in_(content.c.type, row)
1362        not_in_(bar.c.content_type, row)
1363
1364        in_(sql.column("content_type"), row)
1365
1366        row = testing.db.execute(
1367            select([func.now().label("content_type")])
1368        ).first()
1369        not_in_(content.c.type, row)
1370        not_in_(bar.c.content_type, row)
1371        in_(sql.column("content_type"), row)
1372
1373    def test_column_label_overlap_fallback_2(self):
1374        content, bar = self.tables.content, self.tables.bar
1375        row = testing.db.execute(content.select(use_labels=True)).first()
1376        in_(content.c.type, row)
1377        not_in_(bar.c.content_type, row)
1378        not_in_(sql.column("content_type"), row)
1379
1380    def test_columnclause_schema_column_one(self):
1381        keyed2 = self.tables.keyed2
1382
1383        # this is addressed by [ticket:2932]
1384        # ColumnClause._compare_name_for_result allows the
1385        # columns which the statement is against to be lightweight
1386        # cols, which results in a more liberal comparison scheme
1387        a, b = sql.column("a"), sql.column("b")
1388        stmt = select([a, b]).select_from(table("keyed2"))
1389        row = testing.db.execute(stmt).first()
1390
1391        in_(keyed2.c.a, row)
1392        in_(keyed2.c.b, row)
1393        in_(a, row)
1394        in_(b, row)
1395
1396    def test_columnclause_schema_column_two(self):
1397        keyed2 = self.tables.keyed2
1398
1399        a, b = sql.column("a"), sql.column("b")
1400        stmt = select([keyed2.c.a, keyed2.c.b])
1401        row = testing.db.execute(stmt).first()
1402
1403        in_(keyed2.c.a, row)
1404        in_(keyed2.c.b, row)
1405        in_(a, row)
1406        in_(b, row)
1407
1408    def test_columnclause_schema_column_three(self):
1409        keyed2 = self.tables.keyed2
1410
1411        # this is also addressed by [ticket:2932]
1412
1413        a, b = sql.column("a"), sql.column("b")
1414        stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
1415        row = testing.db.execute(stmt).first()
1416
1417        in_(keyed2.c.a, row)
1418        in_(keyed2.c.b, row)
1419        in_(a, row)
1420        in_(b, row)
1421        in_(stmt.c.a, row)
1422        in_(stmt.c.b, row)
1423
1424    def test_columnclause_schema_column_four(self):
1425        keyed2 = self.tables.keyed2
1426
1427        # this is also addressed by [ticket:2932]
1428
1429        a, b = sql.column("keyed2_a"), sql.column("keyed2_b")
1430        stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
1431            a, b
1432        )
1433        row = testing.db.execute(stmt).first()
1434
1435        in_(keyed2.c.a, row)
1436        in_(keyed2.c.b, row)
1437        in_(a, row)
1438        in_(b, row)
1439        in_(stmt.c.keyed2_a, row)
1440        in_(stmt.c.keyed2_b, row)
1441
1442    def test_columnclause_schema_column_five(self):
1443        keyed2 = self.tables.keyed2
1444
1445        # this is also addressed by [ticket:2932]
1446
1447        stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
1448            keyed2_a=CHAR, keyed2_b=CHAR
1449        )
1450        row = testing.db.execute(stmt).first()
1451
1452        in_(keyed2.c.a, row)
1453        in_(keyed2.c.b, row)
1454        in_(stmt.c.keyed2_a, row)
1455        in_(stmt.c.keyed2_b, row)
1456
1457
1458class PositionalTextTest(fixtures.TablesTest):
1459    run_inserts = "once"
1460    run_deletes = None
1461    __backend__ = True
1462
1463    @classmethod
1464    def define_tables(cls, metadata):
1465        Table(
1466            "text1",
1467            metadata,
1468            Column("a", CHAR(2)),
1469            Column("b", CHAR(2)),
1470            Column("c", CHAR(2)),
1471            Column("d", CHAR(2)),
1472        )
1473
1474    @classmethod
1475    def insert_data(cls):
1476        cls.tables.text1.insert().execute(
1477            [dict(a="a1", b="b1", c="c1", d="d1")]
1478        )
1479
1480    def test_via_column(self):
1481        c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
1482        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1483
1484        result = testing.db.execute(stmt)
1485        row = result.first()
1486
1487        eq_(row[c2], "b1")
1488        eq_(row[c4], "d1")
1489        eq_(row[1], "b1")
1490        eq_(row["b"], "b1")
1491        eq_(row.keys(), ["a", "b", "c", "d"])
1492        eq_(row["r"], "c1")
1493        eq_(row["d"], "d1")
1494
1495    def test_fewer_cols_than_sql_positional(self):
1496        c1, c2 = column("q"), column("p")
1497        stmt = text("select a, b, c, d from text1").columns(c1, c2)
1498
1499        # no warning as this can be similar for non-positional
1500        result = testing.db.execute(stmt)
1501        row = result.first()
1502
1503        eq_(row[c1], "a1")
1504        eq_(row["c"], "c1")
1505
1506    def test_fewer_cols_than_sql_non_positional(self):
1507        c1, c2 = column("a"), column("p")
1508        stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR)
1509
1510        # no warning as this can be similar for non-positional
1511        result = testing.db.execute(stmt)
1512        row = result.first()
1513
1514        # c1 name matches, locates
1515        eq_(row[c1], "a1")
1516        eq_(row["c"], "c1")
1517
1518        # c2 name does not match, doesn't locate
1519        assert_raises_message(
1520            exc.NoSuchColumnError, "in row for column 'p'", lambda: row[c2]
1521        )
1522
1523    def test_more_cols_than_sql(self):
1524        c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
1525        stmt = text("select a, b from text1").columns(c1, c2, c3, c4)
1526
1527        with assertions.expect_warnings(
1528            r"Number of columns in textual SQL \(4\) is "
1529            r"smaller than number of columns requested \(2\)"
1530        ):
1531            result = testing.db.execute(stmt)
1532
1533        row = result.first()
1534        eq_(row[c2], "b1")
1535
1536        assert_raises_message(
1537            exc.NoSuchColumnError, "in row for column 'r'", lambda: row[c3]
1538        )
1539
1540    def test_dupe_col_obj(self):
1541        c1, c2, c3 = column("q"), column("p"), column("r")
1542        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2)
1543
1544        assert_raises_message(
1545            exc.InvalidRequestError,
1546            "Duplicate column expression requested in "
1547            "textual SQL: <.*.ColumnClause.*; p>",
1548            testing.db.execute,
1549            stmt,
1550        )
1551
1552    def test_anon_aliased_unique(self):
1553        text1 = self.tables.text1
1554
1555        c1 = text1.c.a.label(None)
1556        c2 = text1.alias().c.c
1557        c3 = text1.alias().c.b
1558        c4 = text1.alias().c.d.label(None)
1559
1560        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1561        result = testing.db.execute(stmt)
1562        row = result.first()
1563
1564        eq_(row[c1], "a1")
1565        eq_(row[c2], "b1")
1566        eq_(row[c3], "c1")
1567        eq_(row[c4], "d1")
1568
1569        # key fallback rules still match this to a column
1570        # unambiguously based on its name
1571        eq_(row[text1.c.a], "a1")
1572
1573        # key fallback rules still match this to a column
1574        # unambiguously based on its name
1575        eq_(row[text1.c.d], "d1")
1576
1577        # text1.c.b goes nowhere....because we hit key fallback
1578        # but the text1.c.b doesn't derive from text1.c.c
1579        assert_raises_message(
1580            exc.NoSuchColumnError,
1581            "Could not locate column in row for column 'text1.b'",
1582            lambda: row[text1.c.b],
1583        )
1584
1585    def test_anon_aliased_overlapping(self):
1586        text1 = self.tables.text1
1587
1588        c1 = text1.c.a.label(None)
1589        c2 = text1.alias().c.a
1590        c3 = text1.alias().c.a.label(None)
1591        c4 = text1.c.a.label(None)
1592
1593        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1594        result = testing.db.execute(stmt)
1595        row = result.first()
1596
1597        eq_(row[c1], "a1")
1598        eq_(row[c2], "b1")
1599        eq_(row[c3], "c1")
1600        eq_(row[c4], "d1")
1601
1602        # key fallback rules still match this to a column
1603        # unambiguously based on its name
1604        eq_(row[text1.c.a], "a1")
1605
1606    def test_anon_aliased_name_conflict(self):
1607        text1 = self.tables.text1
1608
1609        c1 = text1.c.a.label("a")
1610        c2 = text1.alias().c.a
1611        c3 = text1.alias().c.a.label("a")
1612        c4 = text1.c.a.label("a")
1613
1614        # all cols are named "a".  if we are positional, we don't care.
1615        # this is new logic in 1.1
1616        stmt = text("select a, b as a, c as a, d as a from text1").columns(
1617            c1, c2, c3, c4
1618        )
1619        result = testing.db.execute(stmt)
1620        row = result.first()
1621
1622        eq_(row[c1], "a1")
1623        eq_(row[c2], "b1")
1624        eq_(row[c3], "c1")
1625        eq_(row[c4], "d1")
1626
1627        # fails, because we hit key fallback and find conflicts
1628        # in columns that are presnet
1629        assert_raises_message(
1630            exc.NoSuchColumnError,
1631            "Could not locate column in row for column 'text1.a'",
1632            lambda: row[text1.c.a],
1633        )
1634
1635
1636class AlternateResultProxyTest(fixtures.TablesTest):
1637    __requires__ = ("sqlite",)
1638
1639    @classmethod
1640    def setup_bind(cls):
1641        cls.engine = engine = engines.testing_engine("sqlite://")
1642        return engine
1643
1644    @classmethod
1645    def define_tables(cls, metadata):
1646        Table(
1647            "test",
1648            metadata,
1649            Column("x", Integer, primary_key=True),
1650            Column("y", String(50, convert_unicode="force")),
1651        )
1652
1653    @classmethod
1654    def insert_data(cls):
1655        cls.engine.execute(
1656            cls.tables.test.insert(),
1657            [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
1658        )
1659
1660    @contextmanager
1661    def _proxy_fixture(self, cls):
1662        self.table = self.tables.test
1663
1664        class ExcCtx(default.DefaultExecutionContext):
1665            def get_result_proxy(self):
1666                return cls(self)
1667
1668        self.patcher = patch.object(
1669            self.engine.dialect, "execution_ctx_cls", ExcCtx
1670        )
1671        with self.patcher:
1672            yield
1673
1674    def _test_proxy(self, cls):
1675        with self._proxy_fixture(cls):
1676            rows = []
1677            r = self.engine.execute(select([self.table]))
1678            assert isinstance(r, cls)
1679            for i in range(5):
1680                rows.append(r.fetchone())
1681            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
1682
1683            rows = r.fetchmany(3)
1684            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
1685
1686            rows = r.fetchall()
1687            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
1688
1689            r = self.engine.execute(select([self.table]))
1690            rows = r.fetchmany(None)
1691            eq_(rows[0], (1, "t_1"))
1692            # number of rows here could be one, or the whole thing
1693            assert len(rows) == 1 or len(rows) == 11
1694
1695            r = self.engine.execute(select([self.table]).limit(1))
1696            r.fetchone()
1697            eq_(r.fetchone(), None)
1698
1699            r = self.engine.execute(select([self.table]).limit(5))
1700            rows = r.fetchmany(6)
1701            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
1702
1703            # result keeps going just fine with blank results...
1704            eq_(r.fetchmany(2), [])
1705
1706            eq_(r.fetchmany(2), [])
1707
1708            eq_(r.fetchall(), [])
1709
1710            eq_(r.fetchone(), None)
1711
1712            # until we close
1713            r.close()
1714
1715            self._assert_result_closed(r)
1716
1717            r = self.engine.execute(select([self.table]).limit(5))
1718            eq_(r.first(), (1, "t_1"))
1719            self._assert_result_closed(r)
1720
1721            r = self.engine.execute(select([self.table]).limit(5))
1722            eq_(r.scalar(), 1)
1723            self._assert_result_closed(r)
1724
1725    def _assert_result_closed(self, r):
1726        assert_raises_message(
1727            sa_exc.ResourceClosedError, "object is closed", r.fetchone
1728        )
1729
1730        assert_raises_message(
1731            sa_exc.ResourceClosedError, "object is closed", r.fetchmany, 2
1732        )
1733
1734        assert_raises_message(
1735            sa_exc.ResourceClosedError, "object is closed", r.fetchall
1736        )
1737
1738    def test_basic_plain(self):
1739        self._test_proxy(_result.ResultProxy)
1740
1741    def test_basic_buffered_row_result_proxy(self):
1742        self._test_proxy(_result.BufferedRowResultProxy)
1743
1744    def test_basic_fully_buffered_result_proxy(self):
1745        self._test_proxy(_result.FullyBufferedResultProxy)
1746
1747    def test_basic_buffered_column_result_proxy(self):
1748        self._test_proxy(_result.BufferedColumnResultProxy)
1749
1750    def test_resultprocessor_plain(self):
1751        self._test_result_processor(_result.ResultProxy, False)
1752
1753    def test_resultprocessor_plain_cached(self):
1754        self._test_result_processor(_result.ResultProxy, True)
1755
1756    def test_resultprocessor_buffered_column(self):
1757        self._test_result_processor(_result.BufferedColumnResultProxy, False)
1758
1759    def test_resultprocessor_buffered_column_cached(self):
1760        self._test_result_processor(_result.BufferedColumnResultProxy, True)
1761
1762    def test_resultprocessor_buffered_row(self):
1763        self._test_result_processor(_result.BufferedRowResultProxy, False)
1764
1765    def test_resultprocessor_buffered_row_cached(self):
1766        self._test_result_processor(_result.BufferedRowResultProxy, True)
1767
1768    def test_resultprocessor_fully_buffered(self):
1769        self._test_result_processor(_result.FullyBufferedResultProxy, False)
1770
1771    def test_resultprocessor_fully_buffered_cached(self):
1772        self._test_result_processor(_result.FullyBufferedResultProxy, True)
1773
1774    def _test_result_processor(self, cls, use_cache):
1775        class MyType(TypeDecorator):
1776            impl = String()
1777
1778            def process_result_value(self, value, dialect):
1779                return "HI " + value
1780
1781        with self._proxy_fixture(cls):
1782            with self.engine.connect() as conn:
1783                if use_cache:
1784                    cache = {}
1785                    conn = conn.execution_options(compiled_cache=cache)
1786
1787                stmt = select([literal("THERE", type_=MyType())])
1788                for i in range(2):
1789                    r = conn.execute(stmt)
1790                    eq_(r.scalar(), "HI THERE")
1791
1792    def test_buffered_row_growth(self):
1793        with self._proxy_fixture(_result.BufferedRowResultProxy):
1794            with self.engine.connect() as conn:
1795                conn.execute(
1796                    self.table.insert(),
1797                    [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)],
1798                )
1799                result = conn.execute(self.table.select())
1800                checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000}
1801                for idx, row in enumerate(result, 0):
1802                    if idx in checks:
1803                        eq_(result._bufsize, checks[idx])
1804                    le_(len(result._BufferedRowResultProxy__rowbuffer), 1000)
1805
1806    def test_max_row_buffer_option(self):
1807        with self._proxy_fixture(_result.BufferedRowResultProxy):
1808            with self.engine.connect() as conn:
1809                conn.execute(
1810                    self.table.insert(),
1811                    [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)],
1812                )
1813                result = conn.execution_options(max_row_buffer=27).execute(
1814                    self.table.select()
1815                )
1816                for idx, row in enumerate(result, 0):
1817                    if idx in (16, 70, 150, 250):
1818                        eq_(result._bufsize, 27)
1819                    le_(len(result._BufferedRowResultProxy__rowbuffer), 27)
1820