1from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
2    in_, not_in_, is_, ne_, le_
3from sqlalchemy import testing
4from sqlalchemy.testing import fixtures, engines
5from sqlalchemy import util
6from sqlalchemy import (
7    exc, sql, func, select, String, Integer, MetaData, ForeignKey,
8    VARCHAR, INT, CHAR, text, type_coerce, literal_column,
9    TypeDecorator, table, column, literal)
10from sqlalchemy.engine import result as _result
11from sqlalchemy.testing.schema import Table, Column
12import operator
13from sqlalchemy.testing import assertions
14from sqlalchemy import exc as sa_exc
15from sqlalchemy.testing.mock import patch, Mock
16from contextlib import contextmanager
17from sqlalchemy.engine import default
18
19
20class ResultProxyTest(fixtures.TablesTest):
21    __backend__ = True
22
23    @classmethod
24    def define_tables(cls, metadata):
25        Table(
26            'users', metadata,
27            Column(
28                'user_id', INT, primary_key=True,
29                test_needs_autoincrement=True),
30            Column('user_name', VARCHAR(20)),
31            test_needs_acid=True
32        )
33        Table(
34            'addresses', metadata,
35            Column(
36                'address_id', Integer, primary_key=True,
37                test_needs_autoincrement=True),
38            Column('user_id', Integer, ForeignKey('users.user_id')),
39            Column('address', String(30)),
40            test_needs_acid=True
41        )
42
43        Table(
44            'users2', metadata,
45            Column('user_id', INT, primary_key=True),
46            Column('user_name', VARCHAR(20)),
47            test_needs_acid=True
48        )
49
50    def test_row_iteration(self):
51        users = self.tables.users
52
53        users.insert().execute(
54            {'user_id': 7, 'user_name': 'jack'},
55            {'user_id': 8, 'user_name': 'ed'},
56            {'user_id': 9, 'user_name': 'fred'},
57        )
58        r = users.select().execute()
59        rows = []
60        for row in r:
61            rows.append(row)
62        eq_(len(rows), 3)
63
64    @testing.requires.subqueries
65    def test_anonymous_rows(self):
66        users = self.tables.users
67
68        users.insert().execute(
69            {'user_id': 7, 'user_name': 'jack'},
70            {'user_id': 8, 'user_name': 'ed'},
71            {'user_id': 9, 'user_name': 'fred'},
72        )
73
74        sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \
75            as_scalar()
76        for row in select([sel + 1, sel + 3], bind=users.bind).execute():
77            eq_(row['anon_1'], 8)
78            eq_(row['anon_2'], 10)
79
80    def test_row_comparison(self):
81        users = self.tables.users
82
83        users.insert().execute(user_id=7, user_name='jack')
84        rp = users.select().execute().first()
85
86        eq_(rp, rp)
87        is_(not(rp != rp), True)
88
89        equal = (7, 'jack')
90
91        eq_(rp, equal)
92        eq_(equal, rp)
93        is_((not (rp != equal)), True)
94        is_(not (equal != equal), True)
95
96        def endless():
97            while True:
98                yield 1
99        ne_(rp, endless())
100        ne_(endless(), rp)
101
102        # test that everything compares the same
103        # as it would against a tuple
104        for compare in [False, 8, endless(), 'xyz', (7, 'jack')]:
105            for op in [
106                operator.eq, operator.ne, operator.gt,
107                operator.lt, operator.ge, operator.le
108            ]:
109
110                try:
111                    control = op(equal, compare)
112                except TypeError:
113                    # Py3K raises TypeError for some invalid comparisons
114                    assert_raises(TypeError, op, rp, compare)
115                else:
116                    eq_(control, op(rp, compare))
117
118                try:
119                    control = op(compare, equal)
120                except TypeError:
121                    # Py3K raises TypeError for some invalid comparisons
122                    assert_raises(TypeError, op, compare, rp)
123                else:
124                    eq_(control, op(compare, rp))
125
126    @testing.provide_metadata
127    def test_column_label_overlap_fallback(self):
128        content = Table(
129            'content', self.metadata,
130            Column('type', String(30)),
131        )
132        bar = Table(
133            'bar', self.metadata,
134            Column('content_type', String(30))
135        )
136        self.metadata.create_all(testing.db)
137        testing.db.execute(content.insert().values(type="t1"))
138
139        row = testing.db.execute(content.select(use_labels=True)).first()
140        in_(content.c.type, row)
141        not_in_(bar.c.content_type, row)
142        in_(sql.column('content_type'), row)
143
144        row = testing.db.execute(
145            select([content.c.type.label("content_type")])).first()
146        in_(content.c.type, row)
147
148        not_in_(bar.c.content_type, row)
149
150        in_(sql.column('content_type'), row)
151
152        row = testing.db.execute(select([func.now().label("content_type")])). \
153            first()
154        not_in_(content.c.type, row)
155
156        not_in_(bar.c.content_type, row)
157
158        in_(sql.column('content_type'), row)
159
160    def test_pickled_rows(self):
161        users = self.tables.users
162        addresses = self.tables.addresses
163
164        users.insert().execute(
165            {'user_id': 7, 'user_name': 'jack'},
166            {'user_id': 8, 'user_name': 'ed'},
167            {'user_id': 9, 'user_name': 'fred'},
168        )
169
170        for pickle in False, True:
171            for use_labels in False, True:
172                result = users.select(use_labels=use_labels).order_by(
173                    users.c.user_id).execute().fetchall()
174
175                if pickle:
176                    result = util.pickle.loads(util.pickle.dumps(result))
177
178                eq_(
179                    result,
180                    [(7, "jack"), (8, "ed"), (9, "fred")]
181                )
182                if use_labels:
183                    eq_(result[0]['users_user_id'], 7)
184                    eq_(
185                        list(result[0].keys()),
186                        ["users_user_id", "users_user_name"])
187                else:
188                    eq_(result[0]['user_id'], 7)
189                    eq_(list(result[0].keys()), ["user_id", "user_name"])
190
191                eq_(result[0][0], 7)
192                eq_(result[0][users.c.user_id], 7)
193                eq_(result[0][users.c.user_name], 'jack')
194
195                if not pickle or use_labels:
196                    assert_raises(
197                        exc.NoSuchColumnError,
198                        lambda: result[0][addresses.c.user_id])
199                else:
200                    # test with a different table.  name resolution is
201                    # causing 'user_id' to match when use_labels wasn't used.
202                    eq_(result[0][addresses.c.user_id], 7)
203
204                assert_raises(
205                    exc.NoSuchColumnError, lambda: result[0]['fake key'])
206                assert_raises(
207                    exc.NoSuchColumnError,
208                    lambda: result[0][addresses.c.address_id])
209
210    def test_column_error_printing(self):
211        result = testing.db.execute(select([1]))
212        row = result.first()
213
214        class unprintable(object):
215
216            def __str__(self):
217                raise ValueError("nope")
218
219        msg = r"Could not locate column in row for column '%s'"
220
221        for accessor, repl in [
222            ("x", "x"),
223            (Column("q", Integer), "q"),
224            (Column("q", Integer) + 12, r"q \+ :q_1"),
225            (unprintable(), "unprintable element.*"),
226        ]:
227            assert_raises_message(
228                exc.NoSuchColumnError,
229                msg % repl,
230                result._getter, accessor
231            )
232
233            is_(result._getter(accessor, False), None)
234
235            assert_raises_message(
236                exc.NoSuchColumnError,
237                msg % repl,
238                lambda: row[accessor]
239            )
240
241    def test_fetchmany(self):
242        users = self.tables.users
243
244        users.insert().execute(user_id=7, user_name='jack')
245        users.insert().execute(user_id=8, user_name='ed')
246        users.insert().execute(user_id=9, user_name='fred')
247        r = users.select().execute()
248        rows = []
249        for row in r.fetchmany(size=2):
250            rows.append(row)
251        eq_(len(rows), 2)
252
253    def test_column_slices(self):
254        users = self.tables.users
255        addresses = self.tables.addresses
256
257        users.insert().execute(user_id=1, user_name='john')
258        users.insert().execute(user_id=2, user_name='jack')
259        addresses.insert().execute(
260            address_id=1, user_id=2, address='foo@bar.com')
261
262        r = text(
263            "select * from addresses", bind=testing.db).execute().first()
264        eq_(r[0:1], (1,))
265        eq_(r[1:], (2, 'foo@bar.com'))
266        eq_(r[:-1], (1, 2))
267
268    def test_column_accessor_basic_compiled(self):
269        users = self.tables.users
270
271        users.insert().execute(
272            dict(user_id=1, user_name='john'),
273            dict(user_id=2, user_name='jack')
274        )
275
276        r = users.select(users.c.user_id == 2).execute().first()
277        eq_(r.user_id, 2)
278        eq_(r['user_id'], 2)
279        eq_(r[users.c.user_id], 2)
280
281        eq_(r.user_name, 'jack')
282        eq_(r['user_name'], 'jack')
283        eq_(r[users.c.user_name], 'jack')
284
285    def test_column_accessor_basic_text(self):
286        users = self.tables.users
287
288        users.insert().execute(
289            dict(user_id=1, user_name='john'),
290            dict(user_id=2, user_name='jack')
291        )
292        r = testing.db.execute(
293            text("select * from users where user_id=2")).first()
294
295        eq_(r.user_id, 2)
296        eq_(r['user_id'], 2)
297        eq_(r[users.c.user_id], 2)
298
299        eq_(r.user_name, 'jack')
300        eq_(r['user_name'], 'jack')
301        eq_(r[users.c.user_name], 'jack')
302
303    def test_column_accessor_textual_select(self):
304        users = self.tables.users
305
306        users.insert().execute(
307            dict(user_id=1, user_name='john'),
308            dict(user_id=2, user_name='jack')
309        )
310        # this will create column() objects inside
311        # the select(), these need to match on name anyway
312        r = testing.db.execute(
313            select([
314                column('user_id'), column('user_name')
315            ]).select_from(table('users')).
316            where(text('user_id=2'))
317        ).first()
318
319        eq_(r.user_id, 2)
320        eq_(r['user_id'], 2)
321        eq_(r[users.c.user_id], 2)
322
323        eq_(r.user_name, 'jack')
324        eq_(r['user_name'], 'jack')
325        eq_(r[users.c.user_name], 'jack')
326
327    def test_column_accessor_dotted_union(self):
328        users = self.tables.users
329
330        users.insert().execute(
331            dict(user_id=1, user_name='john'),
332        )
333
334        # test a little sqlite < 3.10.0 weirdness - with the UNION,
335        # cols come back as "users.user_id" in cursor.description
336        r = testing.db.execute(
337            text(
338                "select users.user_id, users.user_name "
339                "from users "
340                "UNION select users.user_id, "
341                "users.user_name from users"
342            )
343        ).first()
344        eq_(r['user_id'], 1)
345        eq_(r['user_name'], "john")
346        eq_(list(r.keys()), ["user_id", "user_name"])
347
348    def test_column_accessor_sqlite_raw(self):
349        users = self.tables.users
350
351        users.insert().execute(
352            dict(user_id=1, user_name='john'),
353        )
354
355        r = text(
356            "select users.user_id, users.user_name "
357            "from users "
358            "UNION select users.user_id, "
359            "users.user_name from users",
360            bind=testing.db).execution_options(sqlite_raw_colnames=True). \
361            execute().first()
362
363        if testing.against("sqlite < 3.10.0"):
364            not_in_('user_id', r)
365            not_in_('user_name', r)
366            eq_(r['users.user_id'], 1)
367            eq_(r['users.user_name'], "john")
368
369            eq_(list(r.keys()), ["users.user_id", "users.user_name"])
370        else:
371            not_in_('users.user_id', r)
372            not_in_('users.user_name', r)
373            eq_(r['user_id'], 1)
374            eq_(r['user_name'], "john")
375
376            eq_(list(r.keys()), ["user_id", "user_name"])
377
378    def test_column_accessor_sqlite_translated(self):
379        users = self.tables.users
380
381        users.insert().execute(
382            dict(user_id=1, user_name='john'),
383        )
384
385        r = text(
386            "select users.user_id, users.user_name "
387            "from users "
388            "UNION select users.user_id, "
389            "users.user_name from users",
390            bind=testing.db).execute().first()
391        eq_(r['user_id'], 1)
392        eq_(r['user_name'], "john")
393
394        if testing.against("sqlite < 3.10.0"):
395            eq_(r['users.user_id'], 1)
396            eq_(r['users.user_name'], "john")
397        else:
398            not_in_('users.user_id', r)
399            not_in_('users.user_name', r)
400
401        eq_(list(r.keys()), ["user_id", "user_name"])
402
403    def test_column_accessor_labels_w_dots(self):
404        users = self.tables.users
405
406        users.insert().execute(
407            dict(user_id=1, user_name='john'),
408        )
409        # test using literal tablename.colname
410        r = text(
411            'select users.user_id AS "users.user_id", '
412            'users.user_name AS "users.user_name" '
413            'from users', bind=testing.db).\
414            execution_options(sqlite_raw_colnames=True).execute().first()
415        eq_(r['users.user_id'], 1)
416        eq_(r['users.user_name'], "john")
417        not_in_("user_name", r)
418        eq_(list(r.keys()), ["users.user_id", "users.user_name"])
419
420    def test_column_accessor_unary(self):
421        users = self.tables.users
422
423        users.insert().execute(
424            dict(user_id=1, user_name='john'),
425        )
426
427        # unary expressions
428        r = select([users.c.user_name.distinct()]).order_by(
429            users.c.user_name).execute().first()
430        eq_(r[users.c.user_name], 'john')
431        eq_(r.user_name, 'john')
432
433    def test_column_accessor_err(self):
434        r = testing.db.execute(select([1])).first()
435        assert_raises_message(
436            AttributeError,
437            "Could not locate column in row for column 'foo'",
438            getattr, r, "foo"
439        )
440        assert_raises_message(
441            KeyError,
442            "Could not locate column in row for column 'foo'",
443            lambda: r['foo']
444        )
445
446    def test_graceful_fetch_on_non_rows(self):
447        """test that calling fetchone() etc. on a result that doesn't
448        return rows fails gracefully.
449
450        """
451
452        # these proxies don't work with no cursor.description present.
453        # so they don't apply to this test at the moment.
454        # result.FullyBufferedResultProxy,
455        # result.BufferedRowResultProxy,
456        # result.BufferedColumnResultProxy
457
458        users = self.tables.users
459
460        conn = testing.db.connect()
461        for meth in [
462            lambda r: r.fetchone(),
463            lambda r: r.fetchall(),
464            lambda r: r.first(),
465            lambda r: r.scalar(),
466            lambda r: r.fetchmany(),
467            lambda r: r._getter('user'),
468            lambda r: r._has_key('user'),
469        ]:
470            trans = conn.begin()
471            result = conn.execute(users.insert(), user_id=1)
472            assert_raises_message(
473                exc.ResourceClosedError,
474                "This result object does not return rows. "
475                "It has been closed automatically.",
476                meth, result,
477            )
478            trans.rollback()
479
480    def test_fetchone_til_end(self):
481        result = testing.db.execute("select * from users")
482        eq_(result.fetchone(), None)
483        eq_(result.fetchone(), None)
484        eq_(result.fetchone(), None)
485        result.close()
486        assert_raises_message(
487            exc.ResourceClosedError,
488            "This result object is closed.",
489            result.fetchone
490        )
491
492    def test_connectionless_autoclose_rows_exhausted(self):
493        users = self.tables.users
494        users.insert().execute(
495            dict(user_id=1, user_name='john'),
496        )
497
498        result = testing.db.execute("select * from users")
499        connection = result.connection
500        assert not connection.closed
501        eq_(result.fetchone(), (1, 'john'))
502        assert not connection.closed
503        eq_(result.fetchone(), None)
504        assert connection.closed
505
506    @testing.requires.returning
507    def test_connectionless_autoclose_crud_rows_exhausted(self):
508        users = self.tables.users
509        stmt = users.insert().values(user_id=1, user_name='john').\
510            returning(users.c.user_id)
511        result = testing.db.execute(stmt)
512        connection = result.connection
513        assert not connection.closed
514        eq_(result.fetchone(), (1, ))
515        assert not connection.closed
516        eq_(result.fetchone(), None)
517        assert connection.closed
518
519    def test_connectionless_autoclose_no_rows(self):
520        result = testing.db.execute("select * from users")
521        connection = result.connection
522        assert not connection.closed
523        eq_(result.fetchone(), None)
524        assert connection.closed
525
526    def test_connectionless_autoclose_no_metadata(self):
527        result = testing.db.execute("update users set user_id=5")
528        connection = result.connection
529        assert connection.closed
530        assert_raises_message(
531            exc.ResourceClosedError,
532            "This result object does not return rows.",
533            result.fetchone
534        )
535
536    def test_row_case_sensitive(self):
537        row = testing.db.execute(
538            select([
539                literal_column("1").label("case_insensitive"),
540                literal_column("2").label("CaseSensitive")
541            ])
542        ).first()
543
544        eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
545
546        in_("case_insensitive", row._keymap)
547        in_("CaseSensitive", row._keymap)
548        not_in_("casesensitive", row._keymap)
549
550        eq_(row["case_insensitive"], 1)
551        eq_(row["CaseSensitive"], 2)
552
553        assert_raises(
554            KeyError,
555            lambda: row["Case_insensitive"]
556        )
557        assert_raises(
558            KeyError,
559            lambda: row["casesensitive"]
560        )
561
562    def test_row_case_sensitive_unoptimized(self):
563        ins_db = engines.testing_engine(options={"case_sensitive": True})
564        row = ins_db.execute(
565            select([
566                literal_column("1").label("case_insensitive"),
567                literal_column("2").label("CaseSensitive"),
568                text("3 AS screw_up_the_cols")
569            ])
570        ).first()
571
572        eq_(
573            list(row.keys()),
574            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
575
576        in_("case_insensitive", row._keymap)
577        in_("CaseSensitive", row._keymap)
578        not_in_("casesensitive", row._keymap)
579
580        eq_(row["case_insensitive"], 1)
581        eq_(row["CaseSensitive"], 2)
582        eq_(row["screw_up_the_cols"], 3)
583
584        assert_raises(KeyError, lambda: row["Case_insensitive"])
585        assert_raises(KeyError, lambda: row["casesensitive"])
586        assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
587
588    def test_row_case_insensitive(self):
589        ins_db = engines.testing_engine(options={"case_sensitive": False})
590        row = ins_db.execute(
591            select([
592                literal_column("1").label("case_insensitive"),
593                literal_column("2").label("CaseSensitive")
594            ])
595        ).first()
596
597        eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
598
599        in_("case_insensitive", row._keymap)
600        in_("CaseSensitive", row._keymap)
601        in_("casesensitive", row._keymap)
602
603        eq_(row["case_insensitive"], 1)
604        eq_(row["CaseSensitive"], 2)
605        eq_(row["Case_insensitive"], 1)
606        eq_(row["casesensitive"], 2)
607
608    def test_row_case_insensitive_unoptimized(self):
609        ins_db = engines.testing_engine(options={"case_sensitive": False})
610        row = ins_db.execute(
611            select([
612                literal_column("1").label("case_insensitive"),
613                literal_column("2").label("CaseSensitive"),
614                text("3 AS screw_up_the_cols")
615            ])
616        ).first()
617
618        eq_(
619            list(row.keys()),
620            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
621
622        in_("case_insensitive", row._keymap)
623        in_("CaseSensitive", row._keymap)
624        in_("casesensitive", row._keymap)
625
626        eq_(row["case_insensitive"], 1)
627        eq_(row["CaseSensitive"], 2)
628        eq_(row["screw_up_the_cols"], 3)
629        eq_(row["Case_insensitive"], 1)
630        eq_(row["casesensitive"], 2)
631        eq_(row["screw_UP_the_cols"], 3)
632
633    def test_row_as_args(self):
634        users = self.tables.users
635
636        users.insert().execute(user_id=1, user_name='john')
637        r = users.select(users.c.user_id == 1).execute().first()
638        users.delete().execute()
639        users.insert().execute(r)
640        eq_(users.select().execute().fetchall(), [(1, 'john')])
641
642    def test_result_as_args(self):
643        users = self.tables.users
644        users2 = self.tables.users2
645
646        users.insert().execute([
647            dict(user_id=1, user_name='john'),
648            dict(user_id=2, user_name='ed')])
649        r = users.select().execute()
650        users2.insert().execute(list(r))
651        eq_(
652            users2.select().order_by(users2.c.user_id).execute().fetchall(),
653            [(1, 'john'), (2, 'ed')]
654        )
655
656        users2.delete().execute()
657        r = users.select().execute()
658        users2.insert().execute(*list(r))
659        eq_(
660            users2.select().order_by(users2.c.user_id).execute().fetchall(),
661            [(1, 'john'), (2, 'ed')]
662        )
663
664    @testing.requires.duplicate_names_in_cursor_description
665    def test_ambiguous_column(self):
666        users = self.tables.users
667        addresses = self.tables.addresses
668
669        users.insert().execute(user_id=1, user_name='john')
670        result = users.outerjoin(addresses).select().execute()
671        r = result.first()
672
673        assert_raises_message(
674            exc.InvalidRequestError,
675            "Ambiguous column name",
676            lambda: r['user_id']
677        )
678
679        # pure positional targeting; users.c.user_id
680        # and addresses.c.user_id are known!
681        # works as of 1.1 issue #3501
682        eq_(r[users.c.user_id], 1)
683        eq_(r[addresses.c.user_id], None)
684
685        # try to trick it - fake_table isn't in the result!
686        # we get the correct error
687        fake_table = Table('fake', MetaData(), Column('user_id', Integer))
688        assert_raises_message(
689            exc.InvalidRequestError,
690            "Could not locate column in row for column 'fake.user_id'",
691            lambda: r[fake_table.c.user_id]
692        )
693
694        r = util.pickle.loads(util.pickle.dumps(r))
695        assert_raises_message(
696            exc.InvalidRequestError,
697            "Ambiguous column name",
698            lambda: r['user_id']
699        )
700
701        result = users.outerjoin(addresses).select().execute()
702        result = _result.BufferedColumnResultProxy(result.context)
703        r = result.first()
704        assert isinstance(r, _result.BufferedColumnRow)
705        assert_raises_message(
706            exc.InvalidRequestError,
707            "Ambiguous column name",
708            lambda: r['user_id']
709        )
710
711    @testing.requires.duplicate_names_in_cursor_description
712    def test_ambiguous_column_by_col(self):
713        users = self.tables.users
714
715        users.insert().execute(user_id=1, user_name='john')
716        ua = users.alias()
717        u2 = users.alias()
718        result = select([users.c.user_id, ua.c.user_id]).execute()
719        row = result.first()
720
721        # as of 1.1 issue #3501, we use pure positional
722        # targeting for the column objects here
723        eq_(row[users.c.user_id], 1)
724
725        eq_(row[ua.c.user_id], 1)
726
727        # this now works as of 1.1 issue #3501;
728        # previously this was stuck on "ambiguous column name"
729        assert_raises_message(
730            exc.InvalidRequestError,
731            "Could not locate column in row",
732            lambda: row[u2.c.user_id]
733        )
734
735    @testing.requires.duplicate_names_in_cursor_description
736    def test_ambiguous_column_case_sensitive(self):
737        eng = engines.testing_engine(options=dict(case_sensitive=False))
738
739        row = eng.execute(select([
740            literal_column('1').label('SOMECOL'),
741            literal_column('1').label('SOMECOL'),
742        ])).first()
743
744        assert_raises_message(
745            exc.InvalidRequestError,
746            "Ambiguous column name",
747            lambda: row['somecol']
748        )
749
750    @testing.requires.duplicate_names_in_cursor_description
751    def test_ambiguous_column_contains(self):
752        users = self.tables.users
753        addresses = self.tables.addresses
754
755        # ticket 2702.  in 0.7 we'd get True, False.
756        # in 0.8, both columns are present so it's True;
757        # but when they're fetched you'll get the ambiguous error.
758        users.insert().execute(user_id=1, user_name='john')
759        result = select([users.c.user_id, addresses.c.user_id]).\
760            select_from(users.outerjoin(addresses)).execute()
761        row = result.first()
762
763        eq_(
764            set([users.c.user_id in row, addresses.c.user_id in row]),
765            set([True])
766        )
767
768    def test_ambiguous_column_by_col_plus_label(self):
769        users = self.tables.users
770
771        users.insert().execute(user_id=1, user_name='john')
772        result = select(
773            [users.c.user_id,
774                type_coerce(users.c.user_id, Integer).label('foo')]).execute()
775        row = result.first()
776        eq_(
777            row[users.c.user_id], 1
778        )
779        eq_(
780            row[1], 1
781        )
782
783    def test_fetch_partial_result_map(self):
784        users = self.tables.users
785
786        users.insert().execute(user_id=7, user_name='ed')
787
788        t = text("select * from users").columns(
789            user_name=String()
790        )
791        eq_(
792            testing.db.execute(t).fetchall(), [(7, 'ed')]
793        )
794
795    def test_fetch_unordered_result_map(self):
796        users = self.tables.users
797
798        users.insert().execute(user_id=7, user_name='ed')
799
800        class Goofy1(TypeDecorator):
801            impl = String
802
803            def process_result_value(self, value, dialect):
804                return value + "a"
805
806        class Goofy2(TypeDecorator):
807            impl = String
808
809            def process_result_value(self, value, dialect):
810                return value + "b"
811
812        class Goofy3(TypeDecorator):
813            impl = String
814
815            def process_result_value(self, value, dialect):
816                return value + "c"
817
818        t = text(
819            "select user_name as a, user_name as b, "
820            "user_name as c from users").columns(
821            a=Goofy1(), b=Goofy2(), c=Goofy3()
822        )
823        eq_(
824            testing.db.execute(t).fetchall(), [
825                ('eda', 'edb', 'edc')
826            ]
827        )
828
829    @testing.requires.subqueries
830    def test_column_label_targeting(self):
831        users = self.tables.users
832
833        users.insert().execute(user_id=7, user_name='ed')
834
835        for s in (
836            users.select().alias('foo'),
837            users.select().alias(users.name),
838        ):
839            row = s.select(use_labels=True).execute().first()
840            eq_(row[s.c.user_id], 7)
841            eq_(row[s.c.user_name], 'ed')
842
843    def test_keys(self):
844        users = self.tables.users
845
846        users.insert().execute(user_id=1, user_name='foo')
847        result = users.select().execute()
848        eq_(
849            result.keys(),
850            ['user_id', 'user_name']
851        )
852        row = result.first()
853        eq_(
854            row.keys(),
855            ['user_id', 'user_name']
856        )
857
858    def test_keys_anon_labels(self):
859        """test [ticket:3483]"""
860
861        users = self.tables.users
862
863        users.insert().execute(user_id=1, user_name='foo')
864        result = testing.db.execute(
865            select([
866                users.c.user_id,
867                users.c.user_name.label(None),
868                func.count(literal_column('1'))]).
869            group_by(users.c.user_id, users.c.user_name)
870        )
871
872        eq_(
873            result.keys(),
874            ['user_id', 'user_name_1', 'count_1']
875        )
876        row = result.first()
877        eq_(
878            row.keys(),
879            ['user_id', 'user_name_1', 'count_1']
880        )
881
882    def test_items(self):
883        users = self.tables.users
884
885        users.insert().execute(user_id=1, user_name='foo')
886        r = users.select().execute().first()
887        eq_(
888            [(x[0].lower(), x[1]) for x in list(r.items())],
889            [('user_id', 1), ('user_name', 'foo')])
890
891    def test_len(self):
892        users = self.tables.users
893
894        users.insert().execute(user_id=1, user_name='foo')
895        r = users.select().execute().first()
896        eq_(len(r), 2)
897
898        r = testing.db.execute('select user_name, user_id from users'). \
899            first()
900        eq_(len(r), 2)
901        r = testing.db.execute('select user_name from users').first()
902        eq_(len(r), 1)
903
904    def test_sorting_in_python(self):
905        users = self.tables.users
906
907        users.insert().execute(
908            dict(user_id=1, user_name='foo'),
909            dict(user_id=2, user_name='bar'),
910            dict(user_id=3, user_name='def'),
911        )
912
913        rows = users.select().order_by(users.c.user_name).execute().fetchall()
914
915        eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')])
916
917        eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')])
918
919    def test_column_order_with_simple_query(self):
920        # should return values in column definition order
921        users = self.tables.users
922
923        users.insert().execute(user_id=1, user_name='foo')
924        r = users.select(users.c.user_id == 1).execute().first()
925        eq_(r[0], 1)
926        eq_(r[1], 'foo')
927        eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name'])
928        eq_(list(r.values()), [1, 'foo'])
929
930    def test_column_order_with_text_query(self):
931        # should return values in query order
932        users = self.tables.users
933
934        users.insert().execute(user_id=1, user_name='foo')
935        r = testing.db.execute('select user_name, user_id from users'). \
936            first()
937        eq_(r[0], 'foo')
938        eq_(r[1], 1)
939        eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id'])
940        eq_(list(r.values()), ['foo', 1])
941
942    @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
943    @testing.crashes('firebird', 'An identifier must begin with a letter')
944    @testing.provide_metadata
945    def test_column_accessor_shadow(self):
946        shadowed = Table(
947            'test_shadowed', self.metadata,
948            Column('shadow_id', INT, primary_key=True),
949            Column('shadow_name', VARCHAR(20)),
950            Column('parent', VARCHAR(20)),
951            Column('row', VARCHAR(40)),
952            Column('_parent', VARCHAR(20)),
953            Column('_row', VARCHAR(20)),
954        )
955        self.metadata.create_all()
956        shadowed.insert().execute(
957            shadow_id=1, shadow_name='The Shadow', parent='The Light',
958            row='Without light there is no shadow',
959            _parent='Hidden parent', _row='Hidden row')
960        r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
961
962        eq_(r.shadow_id, 1)
963        eq_(r['shadow_id'], 1)
964        eq_(r[shadowed.c.shadow_id], 1)
965
966        eq_(r.shadow_name, 'The Shadow')
967        eq_(r['shadow_name'], 'The Shadow')
968        eq_(r[shadowed.c.shadow_name], 'The Shadow')
969
970        eq_(r.parent, 'The Light')
971        eq_(r['parent'], 'The Light')
972        eq_(r[shadowed.c.parent], 'The Light')
973
974        eq_(r.row, 'Without light there is no shadow')
975        eq_(r['row'], 'Without light there is no shadow')
976        eq_(r[shadowed.c.row], 'Without light there is no shadow')
977
978        eq_(r['_parent'], 'Hidden parent')
979        eq_(r['_row'], 'Hidden row')
980
981    def test_nontuple_row(self):
982        """ensure the C version of BaseRowProxy handles
983        duck-type-dependent rows."""
984
985        from sqlalchemy.engine import RowProxy
986
987        class MyList(object):
988
989            def __init__(self, data):
990                self.internal_list = data
991
992            def __len__(self):
993                return len(self.internal_list)
994
995            def __getitem__(self, i):
996                return list.__getitem__(self.internal_list, i)
997
998        proxy = RowProxy(object(), MyList(['value']), [None], {
999                         'key': (None, None, 0), 0: (None, None, 0)})
1000        eq_(list(proxy), ['value'])
1001        eq_(proxy[0], 'value')
1002        eq_(proxy['key'], 'value')
1003
1004    @testing.provide_metadata
1005    def test_no_rowcount_on_selects_inserts(self):
1006        """assert that rowcount is only called on deletes and updates.
1007
1008        This because cursor.rowcount may can be expensive on some dialects
1009        such as Firebird, however many dialects require it be called
1010        before the cursor is closed.
1011
1012        """
1013
1014        metadata = self.metadata
1015
1016        engine = engines.testing_engine()
1017
1018        t = Table('t1', metadata,
1019                  Column('data', String(10))
1020                  )
1021        metadata.create_all(engine)
1022
1023        with patch.object(
1024                engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
1025            mock_rowcount.__get__ = Mock()
1026            engine.execute(t.insert(),
1027                           {'data': 'd1'},
1028                           {'data': 'd2'},
1029                           {'data': 'd3'})
1030
1031            eq_(len(mock_rowcount.__get__.mock_calls), 0)
1032
1033            eq_(
1034                engine.execute(t.select()).fetchall(),
1035                [('d1', ), ('d2', ), ('d3', )]
1036            )
1037            eq_(len(mock_rowcount.__get__.mock_calls), 0)
1038
1039            engine.execute(t.update(), {'data': 'd4'})
1040
1041            eq_(len(mock_rowcount.__get__.mock_calls), 1)
1042
1043            engine.execute(t.delete())
1044            eq_(len(mock_rowcount.__get__.mock_calls), 2)
1045
1046    def test_rowproxy_is_sequence(self):
1047        import collections
1048        from sqlalchemy.engine import RowProxy
1049
1050        row = RowProxy(
1051            object(), ['value'], [None],
1052            {'key': (None, None, 0), 0: (None, None, 0)})
1053        assert isinstance(row, collections.Sequence)
1054
1055    @testing.provide_metadata
1056    def test_rowproxy_getitem_indexes_compiled(self):
1057        values = Table('rp', self.metadata,
1058                       Column('key', String(10), primary_key=True),
1059                       Column('value', String(10)))
1060        values.create()
1061
1062        testing.db.execute(values.insert(), dict(key='One', value='Uno'))
1063        row = testing.db.execute(values.select()).first()
1064        eq_(row['key'], 'One')
1065        eq_(row['value'], 'Uno')
1066        eq_(row[0], 'One')
1067        eq_(row[1], 'Uno')
1068        eq_(row[-2], 'One')
1069        eq_(row[-1], 'Uno')
1070        eq_(row[1:0:-1], ('Uno',))
1071
1072    @testing.only_on("sqlite")
1073    def test_rowproxy_getitem_indexes_raw(self):
1074        row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
1075        eq_(row['key'], 'One')
1076        eq_(row['value'], 'Uno')
1077        eq_(row[0], 'One')
1078        eq_(row[1], 'Uno')
1079        eq_(row[-2], 'One')
1080        eq_(row[-1], 'Uno')
1081        eq_(row[1:0:-1], ('Uno',))
1082
1083    @testing.requires.cextensions
1084    def test_row_c_sequence_check(self):
1085        import csv
1086
1087        metadata = MetaData()
1088        metadata.bind = 'sqlite://'
1089        users = Table('users', metadata,
1090                      Column('id', Integer, primary_key=True),
1091                      Column('name', String(40)),
1092                      )
1093        users.create()
1094
1095        users.insert().execute(name='Test')
1096        row = users.select().execute().fetchone()
1097
1098        s = util.StringIO()
1099        writer = csv.writer(s)
1100        # csv performs PySequenceCheck call
1101        writer.writerow(row)
1102        assert s.getvalue().strip() == '1,Test'
1103
1104    @testing.requires.selectone
1105    def test_empty_accessors(self):
1106        statements = [
1107            (
1108                "select 1",
1109                [
1110                    lambda r: r.last_inserted_params(),
1111                    lambda r: r.last_updated_params(),
1112                    lambda r: r.prefetch_cols(),
1113                    lambda r: r.postfetch_cols(),
1114                    lambda r: r.inserted_primary_key
1115                ],
1116                "Statement is not a compiled expression construct."
1117            ),
1118            (
1119                select([1]),
1120                [
1121                    lambda r: r.last_inserted_params(),
1122                    lambda r: r.inserted_primary_key
1123                ],
1124                r"Statement is not an insert\(\) expression construct."
1125            ),
1126            (
1127                select([1]),
1128                [
1129                    lambda r: r.last_updated_params(),
1130                ],
1131                r"Statement is not an update\(\) expression construct."
1132            ),
1133            (
1134                select([1]),
1135                [
1136                    lambda r: r.prefetch_cols(),
1137                    lambda r: r.postfetch_cols()
1138                ],
1139                r"Statement is not an insert\(\) "
1140                r"or update\(\) expression construct."
1141            ),
1142        ]
1143
1144        for stmt, meths, msg in statements:
1145            r = testing.db.execute(stmt)
1146            try:
1147                for meth in meths:
1148                    assert_raises_message(
1149                        sa_exc.InvalidRequestError,
1150                        msg,
1151                        meth, r
1152                    )
1153
1154            finally:
1155                r.close()
1156
1157
1158class KeyTargetingTest(fixtures.TablesTest):
1159    run_inserts = 'once'
1160    run_deletes = None
1161    __backend__ = True
1162
1163    @classmethod
1164    def define_tables(cls, metadata):
1165        Table(
1166            'keyed1', metadata, Column("a", CHAR(2), key="b"),
1167            Column("c", CHAR(2), key="q")
1168        )
1169        Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
1170        Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
1171        Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
1172        Table('content', metadata, Column('t', String(30), key="type"))
1173        Table('bar', metadata, Column('ctype', String(30), key="content_type"))
1174
1175        if testing.requires.schemas.enabled:
1176            Table(
1177                'wschema', metadata,
1178                Column("a", CHAR(2), key="b"),
1179                Column("c", CHAR(2), key="q"),
1180                schema=testing.config.test_schema
1181            )
1182
1183    @classmethod
1184    def insert_data(cls):
1185        cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
1186        cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
1187        cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
1188        cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
1189        cls.tables.content.insert().execute(type="t1")
1190
1191        if testing.requires.schemas.enabled:
1192            cls.tables[
1193                '%s.wschema' % testing.config.test_schema].insert().execute(
1194                dict(b="a1", q="c1"))
1195
1196    @testing.requires.schemas
1197    def test_keyed_accessor_wschema(self):
1198        keyed1 = self.tables['%s.wschema' % testing.config.test_schema]
1199        row = testing.db.execute(keyed1.select()).first()
1200
1201        eq_(row.b, "a1")
1202        eq_(row.q, "c1")
1203        eq_(row.a, "a1")
1204        eq_(row.c, "c1")
1205
1206    def test_keyed_accessor_single(self):
1207        keyed1 = self.tables.keyed1
1208        row = testing.db.execute(keyed1.select()).first()
1209
1210        eq_(row.b, "a1")
1211        eq_(row.q, "c1")
1212        eq_(row.a, "a1")
1213        eq_(row.c, "c1")
1214
1215    def test_keyed_accessor_single_labeled(self):
1216        keyed1 = self.tables.keyed1
1217        row = testing.db.execute(keyed1.select().apply_labels()).first()
1218
1219        eq_(row.keyed1_b, "a1")
1220        eq_(row.keyed1_q, "c1")
1221        eq_(row.keyed1_a, "a1")
1222        eq_(row.keyed1_c, "c1")
1223
1224    @testing.requires.duplicate_names_in_cursor_description
1225    def test_keyed_accessor_composite_conflict_2(self):
1226        keyed1 = self.tables.keyed1
1227        keyed2 = self.tables.keyed2
1228
1229        row = testing.db.execute(select([keyed1, keyed2])).first()
1230        # row.b is unambiguous
1231        eq_(row.b, "b2")
1232        # row.a is ambiguous
1233        assert_raises_message(
1234            exc.InvalidRequestError,
1235            "Ambig",
1236            getattr, row, "a"
1237        )
1238
1239    def test_keyed_accessor_composite_names_precedent(self):
1240        keyed1 = self.tables.keyed1
1241        keyed4 = self.tables.keyed4
1242
1243        row = testing.db.execute(select([keyed1, keyed4])).first()
1244        eq_(row.b, "b4")
1245        eq_(row.q, "q4")
1246        eq_(row.a, "a1")
1247        eq_(row.c, "c1")
1248
1249    @testing.requires.duplicate_names_in_cursor_description
1250    def test_keyed_accessor_composite_keys_precedent(self):
1251        keyed1 = self.tables.keyed1
1252        keyed3 = self.tables.keyed3
1253
1254        row = testing.db.execute(select([keyed1, keyed3])).first()
1255        eq_(row.q, "c1")
1256        assert_raises_message(
1257            exc.InvalidRequestError,
1258            "Ambiguous column name 'a'",
1259            getattr, row, "b"
1260        )
1261        assert_raises_message(
1262            exc.InvalidRequestError,
1263            "Ambiguous column name 'a'",
1264            getattr, row, "a"
1265        )
1266        eq_(row.d, "d3")
1267
1268    def test_keyed_accessor_composite_labeled(self):
1269        keyed1 = self.tables.keyed1
1270        keyed2 = self.tables.keyed2
1271
1272        row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \
1273            first()
1274        eq_(row.keyed1_b, "a1")
1275        eq_(row.keyed1_a, "a1")
1276        eq_(row.keyed1_q, "c1")
1277        eq_(row.keyed1_c, "c1")
1278        eq_(row.keyed2_a, "a2")
1279        eq_(row.keyed2_b, "b2")
1280        assert_raises(KeyError, lambda: row['keyed2_c'])
1281        assert_raises(KeyError, lambda: row['keyed2_q'])
1282
1283    def test_column_label_overlap_fallback(self):
1284        content, bar = self.tables.content, self.tables.bar
1285        row = testing.db.execute(
1286            select([content.c.type.label("content_type")])).first()
1287
1288        not_in_(content.c.type, row)
1289        not_in_(bar.c.content_type, row)
1290
1291        in_(sql.column('content_type'), row)
1292
1293        row = testing.db.execute(select([func.now().label("content_type")])). \
1294            first()
1295        not_in_(content.c.type, row)
1296        not_in_(bar.c.content_type, row)
1297        in_(sql.column('content_type'), row)
1298
1299    def test_column_label_overlap_fallback_2(self):
1300        content, bar = self.tables.content, self.tables.bar
1301        row = testing.db.execute(content.select(use_labels=True)).first()
1302        in_(content.c.type, row)
1303        not_in_(bar.c.content_type, row)
1304        not_in_(sql.column('content_type'), row)
1305
1306    def test_columnclause_schema_column_one(self):
1307        keyed2 = self.tables.keyed2
1308
1309        # this is addressed by [ticket:2932]
1310        # ColumnClause._compare_name_for_result allows the
1311        # columns which the statement is against to be lightweight
1312        # cols, which results in a more liberal comparison scheme
1313        a, b = sql.column('a'), sql.column('b')
1314        stmt = select([a, b]).select_from(table("keyed2"))
1315        row = testing.db.execute(stmt).first()
1316
1317        in_(keyed2.c.a, row)
1318        in_(keyed2.c.b, row)
1319        in_(a, row)
1320        in_(b, row)
1321
1322    def test_columnclause_schema_column_two(self):
1323        keyed2 = self.tables.keyed2
1324
1325        a, b = sql.column('a'), sql.column('b')
1326        stmt = select([keyed2.c.a, keyed2.c.b])
1327        row = testing.db.execute(stmt).first()
1328
1329        in_(keyed2.c.a, row)
1330        in_(keyed2.c.b, row)
1331        in_(a, row)
1332        in_(b, row)
1333
1334    def test_columnclause_schema_column_three(self):
1335        keyed2 = self.tables.keyed2
1336
1337        # this is also addressed by [ticket:2932]
1338
1339        a, b = sql.column('a'), sql.column('b')
1340        stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
1341        row = testing.db.execute(stmt).first()
1342
1343        in_(keyed2.c.a, row)
1344        in_(keyed2.c.b, row)
1345        in_(a, row)
1346        in_(b, row)
1347        in_(stmt.c.a, row)
1348        in_(stmt.c.b, row)
1349
1350    def test_columnclause_schema_column_four(self):
1351        keyed2 = self.tables.keyed2
1352
1353        # this is also addressed by [ticket:2932]
1354
1355        a, b = sql.column('keyed2_a'), sql.column('keyed2_b')
1356        stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
1357            a, b)
1358        row = testing.db.execute(stmt).first()
1359
1360        in_(keyed2.c.a, row)
1361        in_(keyed2.c.b, row)
1362        in_(a, row)
1363        in_(b, row)
1364        in_(stmt.c.keyed2_a, row)
1365        in_(stmt.c.keyed2_b, row)
1366
1367    def test_columnclause_schema_column_five(self):
1368        keyed2 = self.tables.keyed2
1369
1370        # this is also addressed by [ticket:2932]
1371
1372        stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
1373            keyed2_a=CHAR, keyed2_b=CHAR)
1374        row = testing.db.execute(stmt).first()
1375
1376        in_(keyed2.c.a, row)
1377        in_(keyed2.c.b, row)
1378        in_(stmt.c.keyed2_a, row)
1379        in_(stmt.c.keyed2_b, row)
1380
1381
1382class PositionalTextTest(fixtures.TablesTest):
1383    run_inserts = 'once'
1384    run_deletes = None
1385    __backend__ = True
1386
1387    @classmethod
1388    def define_tables(cls, metadata):
1389        Table(
1390            'text1',
1391            metadata,
1392            Column("a", CHAR(2)),
1393            Column("b", CHAR(2)),
1394            Column("c", CHAR(2)),
1395            Column("d", CHAR(2))
1396        )
1397
1398    @classmethod
1399    def insert_data(cls):
1400        cls.tables.text1.insert().execute([
1401            dict(a="a1", b="b1", c="c1", d="d1"),
1402        ])
1403
1404    def test_via_column(self):
1405        c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d')
1406        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1407
1408        result = testing.db.execute(stmt)
1409        row = result.first()
1410
1411        eq_(row[c2], "b1")
1412        eq_(row[c4], "d1")
1413        eq_(row[1], "b1")
1414        eq_(row["b"], "b1")
1415        eq_(row.keys(), ["a", "b", "c", "d"])
1416        eq_(row["r"], "c1")
1417        eq_(row["d"], "d1")
1418
1419    def test_fewer_cols_than_sql_positional(self):
1420        c1, c2 = column('q'), column('p')
1421        stmt = text("select a, b, c, d from text1").columns(c1, c2)
1422
1423        # no warning as this can be similar for non-positional
1424        result = testing.db.execute(stmt)
1425        row = result.first()
1426
1427        eq_(row[c1], "a1")
1428        eq_(row["c"], "c1")
1429
1430    def test_fewer_cols_than_sql_non_positional(self):
1431        c1, c2 = column('a'), column('p')
1432        stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR)
1433
1434        # no warning as this can be similar for non-positional
1435        result = testing.db.execute(stmt)
1436        row = result.first()
1437
1438        # c1 name matches, locates
1439        eq_(row[c1], "a1")
1440        eq_(row["c"], "c1")
1441
1442        # c2 name does not match, doesn't locate
1443        assert_raises_message(
1444            exc.NoSuchColumnError,
1445            "in row for column 'p'",
1446            lambda: row[c2]
1447        )
1448
1449    def test_more_cols_than_sql(self):
1450        c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d')
1451        stmt = text("select a, b from text1").columns(c1, c2, c3, c4)
1452
1453        with assertions.expect_warnings(
1454                r"Number of columns in textual SQL \(4\) is "
1455                r"smaller than number of columns requested \(2\)"):
1456            result = testing.db.execute(stmt)
1457
1458        row = result.first()
1459        eq_(row[c2], "b1")
1460
1461        assert_raises_message(
1462            exc.NoSuchColumnError,
1463            "in row for column 'r'",
1464            lambda: row[c3]
1465        )
1466
1467    def test_dupe_col_obj(self):
1468        c1, c2, c3 = column('q'), column('p'), column('r')
1469        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2)
1470
1471        assert_raises_message(
1472            exc.InvalidRequestError,
1473            "Duplicate column expression requested in "
1474            "textual SQL: <.*.ColumnClause.*; p>",
1475            testing.db.execute, stmt
1476        )
1477
1478    def test_anon_aliased_unique(self):
1479        text1 = self.tables.text1
1480
1481        c1 = text1.c.a.label(None)
1482        c2 = text1.alias().c.c
1483        c3 = text1.alias().c.b
1484        c4 = text1.alias().c.d.label(None)
1485
1486        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1487        result = testing.db.execute(stmt)
1488        row = result.first()
1489
1490        eq_(row[c1], "a1")
1491        eq_(row[c2], "b1")
1492        eq_(row[c3], "c1")
1493        eq_(row[c4], "d1")
1494
1495        # key fallback rules still match this to a column
1496        # unambiguously based on its name
1497        eq_(row[text1.c.a], "a1")
1498
1499        # key fallback rules still match this to a column
1500        # unambiguously based on its name
1501        eq_(row[text1.c.d], "d1")
1502
1503        # text1.c.b goes nowhere....because we hit key fallback
1504        # but the text1.c.b doesn't derive from text1.c.c
1505        assert_raises_message(
1506            exc.NoSuchColumnError,
1507            "Could not locate column in row for column 'text1.b'",
1508            lambda: row[text1.c.b]
1509        )
1510
1511    def test_anon_aliased_overlapping(self):
1512        text1 = self.tables.text1
1513
1514        c1 = text1.c.a.label(None)
1515        c2 = text1.alias().c.a
1516        c3 = text1.alias().c.a.label(None)
1517        c4 = text1.c.a.label(None)
1518
1519        stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
1520        result = testing.db.execute(stmt)
1521        row = result.first()
1522
1523        eq_(row[c1], "a1")
1524        eq_(row[c2], "b1")
1525        eq_(row[c3], "c1")
1526        eq_(row[c4], "d1")
1527
1528        # key fallback rules still match this to a column
1529        # unambiguously based on its name
1530        eq_(row[text1.c.a], "a1")
1531
1532    def test_anon_aliased_name_conflict(self):
1533        text1 = self.tables.text1
1534
1535        c1 = text1.c.a.label("a")
1536        c2 = text1.alias().c.a
1537        c3 = text1.alias().c.a.label("a")
1538        c4 = text1.c.a.label("a")
1539
1540        # all cols are named "a".  if we are positional, we don't care.
1541        # this is new logic in 1.1
1542        stmt = text("select a, b as a, c as a, d as a from text1").columns(
1543            c1, c2, c3, c4)
1544        result = testing.db.execute(stmt)
1545        row = result.first()
1546
1547        eq_(row[c1], "a1")
1548        eq_(row[c2], "b1")
1549        eq_(row[c3], "c1")
1550        eq_(row[c4], "d1")
1551
1552        # fails, because we hit key fallback and find conflicts
1553        # in columns that are presnet
1554        assert_raises_message(
1555            exc.NoSuchColumnError,
1556            "Could not locate column in row for column 'text1.a'",
1557            lambda: row[text1.c.a]
1558        )
1559
1560
1561class AlternateResultProxyTest(fixtures.TablesTest):
1562    __requires__ = ('sqlite', )
1563
1564    @classmethod
1565    def setup_bind(cls):
1566        cls.engine = engine = engines.testing_engine('sqlite://')
1567        return engine
1568
1569    @classmethod
1570    def define_tables(cls, metadata):
1571        Table(
1572            'test', metadata,
1573            Column('x', Integer, primary_key=True),
1574            Column('y', String(50, convert_unicode='force'))
1575        )
1576
1577    @classmethod
1578    def insert_data(cls):
1579        cls.engine.execute(cls.tables.test.insert(), [
1580            {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
1581        ])
1582
1583    @contextmanager
1584    def _proxy_fixture(self, cls):
1585        self.table = self.tables.test
1586
1587        class ExcCtx(default.DefaultExecutionContext):
1588
1589            def get_result_proxy(self):
1590                return cls(self)
1591        self.patcher = patch.object(
1592            self.engine.dialect, "execution_ctx_cls", ExcCtx)
1593        with self.patcher:
1594            yield
1595
1596    def _test_proxy(self, cls):
1597        with self._proxy_fixture(cls):
1598            rows = []
1599            r = self.engine.execute(select([self.table]))
1600            assert isinstance(r, cls)
1601            for i in range(5):
1602                rows.append(r.fetchone())
1603            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
1604
1605            rows = r.fetchmany(3)
1606            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
1607
1608            rows = r.fetchall()
1609            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
1610
1611            r = self.engine.execute(select([self.table]))
1612            rows = r.fetchmany(None)
1613            eq_(rows[0], (1, "t_1"))
1614            # number of rows here could be one, or the whole thing
1615            assert len(rows) == 1 or len(rows) == 11
1616
1617            r = self.engine.execute(select([self.table]).limit(1))
1618            r.fetchone()
1619            eq_(r.fetchone(), None)
1620
1621            r = self.engine.execute(select([self.table]).limit(5))
1622            rows = r.fetchmany(6)
1623            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
1624
1625            # result keeps going just fine with blank results...
1626            eq_(r.fetchmany(2), [])
1627
1628            eq_(r.fetchmany(2), [])
1629
1630            eq_(r.fetchall(), [])
1631
1632            eq_(r.fetchone(), None)
1633
1634            # until we close
1635            r.close()
1636
1637            self._assert_result_closed(r)
1638
1639            r = self.engine.execute(select([self.table]).limit(5))
1640            eq_(r.first(), (1, "t_1"))
1641            self._assert_result_closed(r)
1642
1643            r = self.engine.execute(select([self.table]).limit(5))
1644            eq_(r.scalar(), 1)
1645            self._assert_result_closed(r)
1646
1647    def _assert_result_closed(self, r):
1648        assert_raises_message(
1649            sa_exc.ResourceClosedError,
1650            "object is closed",
1651            r.fetchone
1652        )
1653
1654        assert_raises_message(
1655            sa_exc.ResourceClosedError,
1656            "object is closed",
1657            r.fetchmany, 2
1658        )
1659
1660        assert_raises_message(
1661            sa_exc.ResourceClosedError,
1662            "object is closed",
1663            r.fetchall
1664        )
1665
1666    def test_basic_plain(self):
1667        self._test_proxy(_result.ResultProxy)
1668
1669    def test_basic_buffered_row_result_proxy(self):
1670        self._test_proxy(_result.BufferedRowResultProxy)
1671
1672    def test_basic_fully_buffered_result_proxy(self):
1673        self._test_proxy(_result.FullyBufferedResultProxy)
1674
1675    def test_basic_buffered_column_result_proxy(self):
1676        self._test_proxy(_result.BufferedColumnResultProxy)
1677
1678    def test_resultprocessor_plain(self):
1679        self._test_result_processor(_result.ResultProxy, False)
1680
1681    def test_resultprocessor_plain_cached(self):
1682        self._test_result_processor(_result.ResultProxy, True)
1683
1684    def test_resultprocessor_buffered_column(self):
1685        self._test_result_processor(_result.BufferedColumnResultProxy, False)
1686
1687    def test_resultprocessor_buffered_column_cached(self):
1688        self._test_result_processor(_result.BufferedColumnResultProxy, True)
1689
1690    def test_resultprocessor_buffered_row(self):
1691        self._test_result_processor(_result.BufferedRowResultProxy, False)
1692
1693    def test_resultprocessor_buffered_row_cached(self):
1694        self._test_result_processor(_result.BufferedRowResultProxy, True)
1695
1696    def test_resultprocessor_fully_buffered(self):
1697        self._test_result_processor(_result.FullyBufferedResultProxy, False)
1698
1699    def test_resultprocessor_fully_buffered_cached(self):
1700        self._test_result_processor(_result.FullyBufferedResultProxy, True)
1701
1702    def _test_result_processor(self, cls, use_cache):
1703        class MyType(TypeDecorator):
1704            impl = String()
1705
1706            def process_result_value(self, value, dialect):
1707                return "HI " + value
1708
1709        with self._proxy_fixture(cls):
1710            with self.engine.connect() as conn:
1711                if use_cache:
1712                    cache = {}
1713                    conn = conn.execution_options(compiled_cache=cache)
1714
1715                stmt = select([literal("THERE", type_=MyType())])
1716                for i in range(2):
1717                    r = conn.execute(stmt)
1718                    eq_(r.scalar(), "HI THERE")
1719
1720    def test_buffered_row_growth(self):
1721        with self._proxy_fixture(_result.BufferedRowResultProxy):
1722            with self.engine.connect() as conn:
1723                conn.execute(self.table.insert(), [
1724                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
1725                ])
1726                result = conn.execute(self.table.select())
1727                checks = {
1728                    0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
1729                    1351: 1000
1730                }
1731                for idx, row in enumerate(result, 0):
1732                    if idx in checks:
1733                        eq_(result._bufsize, checks[idx])
1734                    le_(
1735                        len(result._BufferedRowResultProxy__rowbuffer),
1736                        1000
1737                    )
1738
1739    def test_max_row_buffer_option(self):
1740        with self._proxy_fixture(_result.BufferedRowResultProxy):
1741            with self.engine.connect() as conn:
1742                conn.execute(self.table.insert(), [
1743                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
1744                ])
1745                result = conn.execution_options(max_row_buffer=27).execute(
1746                    self.table.select()
1747                )
1748                for idx, row in enumerate(result, 0):
1749                    if idx in (16, 70, 150, 250):
1750                        eq_(result._bufsize, 27)
1751                    le_(
1752                        len(result._BufferedRowResultProxy__rowbuffer),
1753                        27
1754                    )
1755