1#! coding: utf-8
2
3import copy
4import inspect
5import sys
6
7from sqlalchemy import exc
8from sqlalchemy import sql
9from sqlalchemy import testing
10from sqlalchemy import util
11from sqlalchemy.sql import column
12from sqlalchemy.testing import assert_raises
13from sqlalchemy.testing import assert_raises_message
14from sqlalchemy.testing import eq_
15from sqlalchemy.testing import expect_warnings
16from sqlalchemy.testing import fails_if
17from sqlalchemy.testing import fixtures
18from sqlalchemy.testing import is_
19from sqlalchemy.testing import mock
20from sqlalchemy.testing import ne_
21from sqlalchemy.testing.util import gc_collect
22from sqlalchemy.testing.util import picklers
23from sqlalchemy.util import classproperty
24from sqlalchemy.util import compat
25from sqlalchemy.util import get_callable_argspec
26from sqlalchemy.util import langhelpers
27from sqlalchemy.util import WeakSequence
28
29
30class _KeyedTupleTest(object):
31    def _fixture(self, values, labels):
32        raise NotImplementedError()
33
34    def test_empty(self):
35        keyed_tuple = self._fixture([], [])
36        eq_(str(keyed_tuple), "()")
37        eq_(len(keyed_tuple), 0)
38
39        eq_(list(keyed_tuple.keys()), [])
40        eq_(keyed_tuple._fields, ())
41        eq_(keyed_tuple._asdict(), {})
42
43    def test_values_but_no_labels(self):
44        keyed_tuple = self._fixture([1, 2], [])
45        eq_(str(keyed_tuple), "(1, 2)")
46        eq_(len(keyed_tuple), 2)
47
48        eq_(list(keyed_tuple.keys()), [])
49        eq_(keyed_tuple._fields, ())
50        eq_(keyed_tuple._asdict(), {})
51
52        eq_(keyed_tuple[0], 1)
53        eq_(keyed_tuple[1], 2)
54
55    def test_basic_creation(self):
56        keyed_tuple = self._fixture([1, 2], ["a", "b"])
57        eq_(str(keyed_tuple), "(1, 2)")
58        eq_(list(keyed_tuple.keys()), ["a", "b"])
59        eq_(keyed_tuple._fields, ("a", "b"))
60        eq_(keyed_tuple._asdict(), {"a": 1, "b": 2})
61
62    def test_basic_index_access(self):
63        keyed_tuple = self._fixture([1, 2], ["a", "b"])
64        eq_(keyed_tuple[0], 1)
65        eq_(keyed_tuple[1], 2)
66
67        def should_raise():
68            keyed_tuple[2]
69
70        assert_raises(IndexError, should_raise)
71
72    def test_basic_attribute_access(self):
73        keyed_tuple = self._fixture([1, 2], ["a", "b"])
74        eq_(keyed_tuple.a, 1)
75        eq_(keyed_tuple.b, 2)
76
77        def should_raise():
78            keyed_tuple.c
79
80        assert_raises(AttributeError, should_raise)
81
82    def test_none_label(self):
83        keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
84        eq_(str(keyed_tuple), "(1, 2, 3)")
85
86        eq_(list(keyed_tuple.keys()), ["a", "b"])
87        eq_(keyed_tuple._fields, ("a", "b"))
88        eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
89
90        # attribute access: can't get at value 2
91        eq_(keyed_tuple.a, 1)
92        eq_(keyed_tuple.b, 3)
93
94        # index access: can get at value 2
95        eq_(keyed_tuple[0], 1)
96        eq_(keyed_tuple[1], 2)
97        eq_(keyed_tuple[2], 3)
98
99    def test_duplicate_labels(self):
100        keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "b"])
101        eq_(str(keyed_tuple), "(1, 2, 3)")
102
103        eq_(list(keyed_tuple.keys()), ["a", "b", "b"])
104        eq_(keyed_tuple._fields, ("a", "b", "b"))
105        eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
106
107        # attribute access: can't get at value 2
108        eq_(keyed_tuple.a, 1)
109        eq_(keyed_tuple.b, 3)
110
111        # index access: can get at value 2
112        eq_(keyed_tuple[0], 1)
113        eq_(keyed_tuple[1], 2)
114        eq_(keyed_tuple[2], 3)
115
116    def test_immutable(self):
117        keyed_tuple = self._fixture([1, 2], ["a", "b"])
118        eq_(str(keyed_tuple), "(1, 2)")
119
120        eq_(keyed_tuple.a, 1)
121
122        assert_raises(AttributeError, setattr, keyed_tuple, "a", 5)
123
124        def should_raise():
125            keyed_tuple[0] = 100
126
127        assert_raises(TypeError, should_raise)
128
129    def test_serialize(self):
130
131        keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
132
133        for loads, dumps in picklers():
134            kt = loads(dumps(keyed_tuple))
135
136            eq_(str(kt), "(1, 2, 3)")
137
138            eq_(list(kt.keys()), ["a", "b"])
139            eq_(kt._fields, ("a", "b"))
140            eq_(kt._asdict(), {"a": 1, "b": 3})
141
142
143class KeyedTupleTest(_KeyedTupleTest, fixtures.TestBase):
144    def _fixture(self, values, labels):
145        return util.KeyedTuple(values, labels)
146
147
148class LWKeyedTupleTest(_KeyedTupleTest, fixtures.TestBase):
149    def _fixture(self, values, labels):
150        return util.lightweight_named_tuple("n", labels)(values)
151
152
153class WeakSequenceTest(fixtures.TestBase):
154    @testing.requires.predictable_gc
155    def test_cleanout_elements(self):
156        class Foo(object):
157            pass
158
159        f1, f2, f3 = Foo(), Foo(), Foo()
160        w = WeakSequence([f1, f2, f3])
161        eq_(len(w), 3)
162        eq_(len(w._storage), 3)
163        del f2
164        gc_collect()
165        eq_(len(w), 2)
166        eq_(len(w._storage), 2)
167
168    @testing.requires.predictable_gc
169    def test_cleanout_appended(self):
170        class Foo(object):
171            pass
172
173        f1, f2, f3 = Foo(), Foo(), Foo()
174        w = WeakSequence()
175        w.append(f1)
176        w.append(f2)
177        w.append(f3)
178        eq_(len(w), 3)
179        eq_(len(w._storage), 3)
180        del f2
181        gc_collect()
182        eq_(len(w), 2)
183        eq_(len(w._storage), 2)
184
185
186class OrderedDictTest(fixtures.TestBase):
187    def test_odict(self):
188        o = util.OrderedDict()
189        o["a"] = 1
190        o["b"] = 2
191        o["snack"] = "attack"
192        o["c"] = 3
193
194        eq_(list(o.keys()), ["a", "b", "snack", "c"])
195        eq_(list(o.values()), [1, 2, "attack", 3])
196
197        o.pop("snack")
198        eq_(list(o.keys()), ["a", "b", "c"])
199        eq_(list(o.values()), [1, 2, 3])
200
201        try:
202            o.pop("eep")
203            assert False
204        except KeyError:
205            pass
206
207        eq_(o.pop("eep", "woot"), "woot")
208
209        try:
210            o.pop("whiff", "bang", "pow")
211            assert False
212        except TypeError:
213            pass
214
215        eq_(list(o.keys()), ["a", "b", "c"])
216        eq_(list(o.values()), [1, 2, 3])
217
218        o2 = util.OrderedDict(d=4)
219        o2["e"] = 5
220
221        eq_(list(o2.keys()), ["d", "e"])
222        eq_(list(o2.values()), [4, 5])
223
224        o.update(o2)
225        eq_(list(o.keys()), ["a", "b", "c", "d", "e"])
226        eq_(list(o.values()), [1, 2, 3, 4, 5])
227
228        o.setdefault("c", "zzz")
229        o.setdefault("f", 6)
230        eq_(list(o.keys()), ["a", "b", "c", "d", "e", "f"])
231        eq_(list(o.values()), [1, 2, 3, 4, 5, 6])
232
233    def test_odict_constructor(self):
234        o = util.OrderedDict(
235            [("name", "jbe"), ("fullname", "jonathan"), ("password", "")]
236        )
237        eq_(list(o.keys()), ["name", "fullname", "password"])
238
239    def test_odict_copy(self):
240        o = util.OrderedDict()
241        o["zzz"] = 1
242        o["aaa"] = 2
243        eq_(list(o.keys()), ["zzz", "aaa"])
244
245        o2 = o.copy()
246        eq_(list(o2.keys()), list(o.keys()))
247
248        o3 = copy.copy(o)
249        eq_(list(o3.keys()), list(o.keys()))
250
251
252class OrderedSetTest(fixtures.TestBase):
253    def test_mutators_against_iter(self):
254        # testing a set modified against an iterator
255        o = util.OrderedSet([3, 2, 4, 5])
256
257        eq_(o.difference(iter([3, 4])), util.OrderedSet([2, 5]))
258        eq_(o.intersection(iter([3, 4, 6])), util.OrderedSet([3, 4]))
259        eq_(o.union(iter([3, 4, 6])), util.OrderedSet([2, 3, 4, 5, 6]))
260
261
262class FrozenDictTest(fixtures.TestBase):
263    def test_serialize(self):
264        d = util.immutabledict({1: 2, 3: 4})
265        for loads, dumps in picklers():
266            print(loads(dumps(d)))
267
268
269class MemoizedAttrTest(fixtures.TestBase):
270    def test_memoized_property(self):
271        val = [20]
272
273        class Foo(object):
274            @util.memoized_property
275            def bar(self):
276                v = val[0]
277                val[0] += 1
278                return v
279
280        ne_(Foo.bar, None)
281        f1 = Foo()
282        assert "bar" not in f1.__dict__
283        eq_(f1.bar, 20)
284        eq_(f1.bar, 20)
285        eq_(val[0], 21)
286        eq_(f1.__dict__["bar"], 20)
287
288    def test_memoized_instancemethod(self):
289        val = [20]
290
291        class Foo(object):
292            @util.memoized_instancemethod
293            def bar(self):
294                v = val[0]
295                val[0] += 1
296                return v
297
298        assert inspect.ismethod(Foo().bar)
299        ne_(Foo.bar, None)
300        f1 = Foo()
301        assert "bar" not in f1.__dict__
302        eq_(f1.bar(), 20)
303        eq_(f1.bar(), 20)
304        eq_(val[0], 21)
305
306    def test_memoized_slots(self):
307        canary = mock.Mock()
308
309        class Foob(util.MemoizedSlots):
310            __slots__ = ("foo_bar", "gogo")
311
312            def _memoized_method_gogo(self):
313                canary.method()
314                return "gogo"
315
316            def _memoized_attr_foo_bar(self):
317                canary.attr()
318                return "foobar"
319
320        f1 = Foob()
321        assert_raises(AttributeError, setattr, f1, "bar", "bat")
322
323        eq_(f1.foo_bar, "foobar")
324
325        eq_(f1.foo_bar, "foobar")
326
327        eq_(f1.gogo(), "gogo")
328
329        eq_(f1.gogo(), "gogo")
330
331        eq_(canary.mock_calls, [mock.call.attr(), mock.call.method()])
332
333
334class WrapCallableTest(fixtures.TestBase):
335    def test_wrapping_update_wrapper_fn(self):
336        def my_fancy_default():
337            """run the fancy default"""
338            return 10
339
340        c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default)
341
342        eq_(c.__name__, "my_fancy_default")
343        eq_(c.__doc__, "run the fancy default")
344
345    def test_wrapping_update_wrapper_fn_nodocstring(self):
346        def my_fancy_default():
347            return 10
348
349        c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default)
350        eq_(c.__name__, "my_fancy_default")
351        eq_(c.__doc__, None)
352
353    def test_wrapping_update_wrapper_cls(self):
354        class MyFancyDefault(object):
355            """a fancy default"""
356
357            def __call__(self):
358                """run the fancy default"""
359                return 10
360
361        def_ = MyFancyDefault()
362        c = util.wrap_callable(lambda: def_(), def_)
363
364        eq_(c.__name__, "MyFancyDefault")
365        eq_(c.__doc__, "run the fancy default")
366
367    def test_wrapping_update_wrapper_cls_noclsdocstring(self):
368        class MyFancyDefault(object):
369            def __call__(self):
370                """run the fancy default"""
371                return 10
372
373        def_ = MyFancyDefault()
374        c = util.wrap_callable(lambda: def_(), def_)
375        eq_(c.__name__, "MyFancyDefault")
376        eq_(c.__doc__, "run the fancy default")
377
378    def test_wrapping_update_wrapper_cls_nomethdocstring(self):
379        class MyFancyDefault(object):
380            """a fancy default"""
381
382            def __call__(self):
383                return 10
384
385        def_ = MyFancyDefault()
386        c = util.wrap_callable(lambda: def_(), def_)
387        eq_(c.__name__, "MyFancyDefault")
388        eq_(c.__doc__, "a fancy default")
389
390    def test_wrapping_update_wrapper_cls_noclsdocstring_nomethdocstring(self):
391        class MyFancyDefault(object):
392            def __call__(self):
393                return 10
394
395        def_ = MyFancyDefault()
396        c = util.wrap_callable(lambda: def_(), def_)
397        eq_(c.__name__, "MyFancyDefault")
398        eq_(c.__doc__, None)
399
400    def test_wrapping_update_wrapper_functools_parial(self):
401        def my_default(x):
402            return x
403
404        import functools
405
406        my_functools_default = functools.partial(my_default, 5)
407
408        c = util.wrap_callable(
409            lambda: my_functools_default(), my_functools_default
410        )
411        eq_(c.__name__, "partial")
412        eq_(c.__doc__, my_functools_default.__call__.__doc__)
413        eq_(c(), 5)
414
415
416class ToListTest(fixtures.TestBase):
417    def test_from_string(self):
418        eq_(util.to_list("xyz"), ["xyz"])
419
420    def test_from_set(self):
421        spec = util.to_list(set([1, 2, 3]))
422        assert isinstance(spec, list)
423        eq_(sorted(spec), [1, 2, 3])
424
425    def test_from_dict(self):
426        spec = util.to_list({1: "a", 2: "b", 3: "c"})
427        assert isinstance(spec, list)
428        eq_(sorted(spec), [1, 2, 3])
429
430    def test_from_tuple(self):
431        eq_(util.to_list((1, 2, 3)), [1, 2, 3])
432
433    def test_from_bytes(self):
434
435        eq_(util.to_list(compat.b("abc")), [compat.b("abc")])
436
437        eq_(
438            util.to_list([compat.b("abc"), compat.b("def")]),
439            [compat.b("abc"), compat.b("def")],
440        )
441
442
443class ColumnCollectionTest(testing.AssertsCompiledSQL, fixtures.TestBase):
444    def test_in(self):
445        cc = sql.ColumnCollection()
446        cc.add(sql.column("col1"))
447        cc.add(sql.column("col2"))
448        cc.add(sql.column("col3"))
449        assert "col1" in cc
450        assert "col2" in cc
451
452        try:
453            cc["col1"] in cc
454            assert False
455        except exc.ArgumentError as e:
456            eq_(str(e), "__contains__ requires a string argument")
457
458    def test_compare(self):
459        cc1 = sql.ColumnCollection()
460        cc2 = sql.ColumnCollection()
461        cc3 = sql.ColumnCollection()
462        c1 = sql.column("col1")
463        c2 = c1.label("col2")
464        c3 = sql.column("col3")
465        cc1.add(c1)
466        cc2.add(c2)
467        cc3.add(c3)
468        assert (cc1 == cc2).compare(c1 == c2)
469        assert not (cc1 == cc3).compare(c2 == c3)
470
471    @testing.emits_warning("Column ")
472    def test_dupes_add(self):
473        cc = sql.ColumnCollection()
474
475        c1, c2a, c3, c2b = (
476            column("c1"),
477            column("c2"),
478            column("c3"),
479            column("c2"),
480        )
481
482        cc.add(c1)
483        cc.add(c2a)
484        cc.add(c3)
485        cc.add(c2b)
486
487        eq_(cc._all_columns, [c1, c2a, c3, c2b])
488
489        # for iter, c2a is replaced by c2b, ordering
490        # is maintained in that way.  ideally, iter would be
491        # the same as the "_all_columns" collection.
492        eq_(list(cc), [c1, c2b, c3])
493
494        assert cc.contains_column(c2a)
495        assert cc.contains_column(c2b)
496
497        ci = cc.as_immutable()
498        eq_(ci._all_columns, [c1, c2a, c3, c2b])
499        eq_(list(ci), [c1, c2b, c3])
500
501    def test_identical_dupe_add(self):
502        cc = sql.ColumnCollection()
503
504        c1, c2, c3 = (column("c1"), column("c2"), column("c3"))
505
506        cc.add(c1)
507        cc.add(c2)
508        cc.add(c3)
509        cc.add(c2)
510
511        eq_(cc._all_columns, [c1, c2, c3])
512
513        self.assert_compile(
514            cc == [c1, c2, c3], "c1 = c1 AND c2 = c2 AND c3 = c3"
515        )
516
517        # for iter, c2a is replaced by c2b, ordering
518        # is maintained in that way.  ideally, iter would be
519        # the same as the "_all_columns" collection.
520        eq_(list(cc), [c1, c2, c3])
521
522        assert cc.contains_column(c2)
523
524        ci = cc.as_immutable()
525        eq_(ci._all_columns, [c1, c2, c3])
526        eq_(list(ci), [c1, c2, c3])
527
528        self.assert_compile(
529            ci == [c1, c2, c3], "c1 = c1 AND c2 = c2 AND c3 = c3"
530        )
531
532    def test_replace(self):
533        cc = sql.ColumnCollection()
534
535        c1, c2a, c3, c2b = (
536            column("c1"),
537            column("c2"),
538            column("c3"),
539            column("c2"),
540        )
541
542        cc.add(c1)
543        cc.add(c2a)
544        cc.add(c3)
545
546        cc.replace(c2b)
547
548        eq_(cc._all_columns, [c1, c2b, c3])
549        eq_(list(cc), [c1, c2b, c3])
550
551        assert not cc.contains_column(c2a)
552        assert cc.contains_column(c2b)
553
554        ci = cc.as_immutable()
555        eq_(ci._all_columns, [c1, c2b, c3])
556        eq_(list(ci), [c1, c2b, c3])
557
558    def test_replace_key_matches(self):
559        cc = sql.ColumnCollection()
560
561        c1, c2a, c3, c2b = (
562            column("c1"),
563            column("c2"),
564            column("c3"),
565            column("X"),
566        )
567        c2b.key = "c2"
568
569        cc.add(c1)
570        cc.add(c2a)
571        cc.add(c3)
572
573        cc.replace(c2b)
574
575        assert not cc.contains_column(c2a)
576        assert cc.contains_column(c2b)
577
578        eq_(cc._all_columns, [c1, c2b, c3])
579        eq_(list(cc), [c1, c2b, c3])
580
581        ci = cc.as_immutable()
582        eq_(ci._all_columns, [c1, c2b, c3])
583        eq_(list(ci), [c1, c2b, c3])
584
585    def test_replace_name_matches(self):
586        cc = sql.ColumnCollection()
587
588        c1, c2a, c3, c2b = (
589            column("c1"),
590            column("c2"),
591            column("c3"),
592            column("c2"),
593        )
594        c2b.key = "X"
595
596        cc.add(c1)
597        cc.add(c2a)
598        cc.add(c3)
599
600        cc.replace(c2b)
601
602        assert not cc.contains_column(c2a)
603        assert cc.contains_column(c2b)
604
605        eq_(cc._all_columns, [c1, c2b, c3])
606        eq_(list(cc), [c1, c3, c2b])
607
608        ci = cc.as_immutable()
609        eq_(ci._all_columns, [c1, c2b, c3])
610        eq_(list(ci), [c1, c3, c2b])
611
612    def test_replace_no_match(self):
613        cc = sql.ColumnCollection()
614
615        c1, c2, c3, c4 = column("c1"), column("c2"), column("c3"), column("c4")
616        c4.key = "X"
617
618        cc.add(c1)
619        cc.add(c2)
620        cc.add(c3)
621
622        cc.replace(c4)
623
624        assert cc.contains_column(c2)
625        assert cc.contains_column(c4)
626
627        eq_(cc._all_columns, [c1, c2, c3, c4])
628        eq_(list(cc), [c1, c2, c3, c4])
629
630        ci = cc.as_immutable()
631        eq_(ci._all_columns, [c1, c2, c3, c4])
632        eq_(list(ci), [c1, c2, c3, c4])
633
634    def test_dupes_extend(self):
635        cc = sql.ColumnCollection()
636
637        c1, c2a, c3, c2b = (
638            column("c1"),
639            column("c2"),
640            column("c3"),
641            column("c2"),
642        )
643
644        cc.add(c1)
645        cc.add(c2a)
646
647        cc.extend([c3, c2b])
648
649        eq_(cc._all_columns, [c1, c2a, c3, c2b])
650
651        # for iter, c2a is replaced by c2b, ordering
652        # is maintained in that way.  ideally, iter would be
653        # the same as the "_all_columns" collection.
654        eq_(list(cc), [c1, c2b, c3])
655
656        assert cc.contains_column(c2a)
657        assert cc.contains_column(c2b)
658
659        ci = cc.as_immutable()
660        eq_(ci._all_columns, [c1, c2a, c3, c2b])
661        eq_(list(ci), [c1, c2b, c3])
662
663    def test_dupes_update(self):
664        cc = sql.ColumnCollection()
665
666        c1, c2a, c3, c2b = (
667            column("c1"),
668            column("c2"),
669            column("c3"),
670            column("c2"),
671        )
672
673        cc.add(c1)
674        cc.add(c2a)
675
676        cc.update([(c3.key, c3), (c2b.key, c2b)])
677
678        eq_(cc._all_columns, [c1, c2a, c3, c2b])
679
680        assert cc.contains_column(c2a)
681        assert cc.contains_column(c2b)
682
683        # for iter, c2a is replaced by c2b, ordering
684        # is maintained in that way.  ideally, iter would be
685        # the same as the "_all_columns" collection.
686        eq_(list(cc), [c1, c2b, c3])
687
688    def test_extend_existing(self):
689        cc = sql.ColumnCollection()
690
691        c1, c2, c3, c4, c5 = (
692            column("c1"),
693            column("c2"),
694            column("c3"),
695            column("c4"),
696            column("c5"),
697        )
698
699        cc.extend([c1, c2])
700        eq_(cc._all_columns, [c1, c2])
701
702        cc.extend([c3])
703        eq_(cc._all_columns, [c1, c2, c3])
704        cc.extend([c4, c2, c5])
705
706        eq_(cc._all_columns, [c1, c2, c3, c4, c5])
707
708    def test_update_existing(self):
709        cc = sql.ColumnCollection()
710
711        c1, c2, c3, c4, c5 = (
712            column("c1"),
713            column("c2"),
714            column("c3"),
715            column("c4"),
716            column("c5"),
717        )
718
719        cc.update([("c1", c1), ("c2", c2)])
720        eq_(cc._all_columns, [c1, c2])
721
722        cc.update([("c3", c3)])
723        eq_(cc._all_columns, [c1, c2, c3])
724        cc.update([("c4", c4), ("c2", c2), ("c5", c5)])
725
726        eq_(cc._all_columns, [c1, c2, c3, c4, c5])
727
728
729class LRUTest(fixtures.TestBase):
730    def test_lru(self):
731        class item(object):
732            def __init__(self, id_):
733                self.id = id_
734
735            def __str__(self):
736                return "item id %d" % self.id
737
738        lru = util.LRUCache(10, threshold=0.2)
739
740        for id_ in range(1, 20):
741            lru[id_] = item(id_)
742
743        # first couple of items should be gone
744        assert 1 not in lru
745        assert 2 not in lru
746
747        # next batch over the threshold of 10 should be present
748        for id_ in range(11, 20):
749            assert id_ in lru
750
751        lru[12]
752        lru[15]
753        lru[23] = item(23)
754        lru[24] = item(24)
755        lru[25] = item(25)
756        lru[26] = item(26)
757        lru[27] = item(27)
758
759        assert 11 not in lru
760        assert 13 not in lru
761
762        for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15):
763            assert id_ in lru
764
765        i1 = lru[25]
766        i2 = item(25)
767        lru[25] = i2
768        assert 25 in lru
769        assert lru[25] is i2
770
771
772class ImmutableSubclass(str):
773    pass
774
775
776class FlattenIteratorTest(fixtures.TestBase):
777    def test_flatten(self):
778        assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7, 8])) == [
779            1,
780            2,
781            3,
782            4,
783            5,
784            6,
785            7,
786            8,
787        ]
788
789    def test_str_with_iter(self):
790        """ensure that a str object with an __iter__ method (like in
791        PyPy) is not interpreted as an iterable.
792
793        """
794
795        class IterString(str):
796            def __iter__(self):
797                return iter(self + "")
798
799        iter_list = [IterString("asdf"), [IterString("x"), IterString("y")]]
800
801        assert list(util.flatten_iterator(iter_list)) == ["asdf", "x", "y"]
802
803
804class HashOverride(object):
805    def __init__(self, value=None):
806        self.value = value
807
808    def __hash__(self):
809        return hash(self.value)
810
811
812class EqOverride(object):
813    def __init__(self, value=None):
814        self.value = value
815
816    __hash__ = object.__hash__
817
818    def __eq__(self, other):
819        if isinstance(other, EqOverride):
820            return self.value == other.value
821        else:
822            return False
823
824    def __ne__(self, other):
825        if isinstance(other, EqOverride):
826            return self.value != other.value
827        else:
828            return True
829
830
831class HashEqOverride(object):
832    def __init__(self, value=None):
833        self.value = value
834
835    def __hash__(self):
836        return hash(self.value)
837
838    def __eq__(self, other):
839        if isinstance(other, EqOverride):
840            return self.value == other.value
841        else:
842            return False
843
844    def __ne__(self, other):
845        if isinstance(other, EqOverride):
846            return self.value != other.value
847        else:
848            return True
849
850
851class IdentitySetTest(fixtures.TestBase):
852    def assert_eq(self, identityset, expected_iterable):
853        expected = sorted([id(o) for o in expected_iterable])
854        found = sorted([id(o) for o in identityset])
855        eq_(found, expected)
856
857    def test_init(self):
858        ids = util.IdentitySet([1, 2, 3, 2, 1])
859        self.assert_eq(ids, [1, 2, 3])
860
861        ids = util.IdentitySet(ids)
862        self.assert_eq(ids, [1, 2, 3])
863
864        ids = util.IdentitySet()
865        self.assert_eq(ids, [])
866
867        ids = util.IdentitySet([])
868        self.assert_eq(ids, [])
869
870        ids = util.IdentitySet(ids)
871        self.assert_eq(ids, [])
872
873    def test_add(self):
874        for type_ in (object, ImmutableSubclass):
875            data = [type_(), type_()]
876            ids = util.IdentitySet()
877            for i in list(range(2)) + list(range(2)):
878                ids.add(data[i])
879            self.assert_eq(ids, data)
880
881        for type_ in (EqOverride, HashOverride, HashEqOverride):
882            data = [type_(1), type_(1), type_(2)]
883            ids = util.IdentitySet()
884            for i in list(range(3)) + list(range(3)):
885                ids.add(data[i])
886            self.assert_eq(ids, data)
887
888    def test_dunder_sub2(self):
889        IdentitySet = util.IdentitySet
890        o1, o2, o3 = object(), object(), object()
891        ids1 = IdentitySet([o1])
892        ids2 = IdentitySet([o1, o2, o3])
893        eq_(ids2 - ids1, IdentitySet([o2, o3]))
894
895        ids2 -= ids1
896        eq_(ids2, IdentitySet([o2, o3]))
897
898    def test_dunder_eq(self):
899        _, _, twin1, twin2, unique1, unique2 = self._create_sets()
900
901        # basic set math
902        eq_(twin1 == twin2, True)
903        eq_(unique1 == unique2, False)
904
905        # not an IdentitySet
906        not_an_identity_set = object()
907        eq_(unique1 == not_an_identity_set, False)
908
909    def test_dunder_ne(self):
910        _, _, twin1, twin2, unique1, unique2 = self._create_sets()
911
912        # basic set math
913        eq_(twin1 != twin2, False)
914        eq_(unique1 != unique2, True)
915
916        # not an IdentitySet
917        not_an_identity_set = object()
918        eq_(unique1 != not_an_identity_set, True)
919
920    def test_dunder_le(self):
921        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
922
923        # basic set math
924        eq_(sub_ <= super_, True)
925        eq_(super_ <= sub_, False)
926
927        # the same sets
928        eq_(twin1 <= twin2, True)
929        eq_(twin2 <= twin1, True)
930
931        # totally different sets
932        eq_(unique1 <= unique2, False)
933        eq_(unique2 <= unique1, False)
934
935        # not an IdentitySet
936        def should_raise():
937            not_an_identity_set = object()
938            return unique1 <= not_an_identity_set
939
940        self._assert_unorderable_types(should_raise)
941
942    def test_dunder_lt(self):
943        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
944
945        # basic set math
946        eq_(sub_ < super_, True)
947        eq_(super_ < sub_, False)
948
949        # the same sets
950        eq_(twin1 < twin2, False)
951        eq_(twin2 < twin1, False)
952
953        # totally different sets
954        eq_(unique1 < unique2, False)
955        eq_(unique2 < unique1, False)
956
957        # not an IdentitySet
958        def should_raise():
959            not_an_identity_set = object()
960            return unique1 < not_an_identity_set
961
962        self._assert_unorderable_types(should_raise)
963
964    def test_dunder_ge(self):
965        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
966
967        # basic set math
968        eq_(sub_ >= super_, False)
969        eq_(super_ >= sub_, True)
970
971        # the same sets
972        eq_(twin1 >= twin2, True)
973        eq_(twin2 >= twin1, True)
974
975        # totally different sets
976        eq_(unique1 >= unique2, False)
977        eq_(unique2 >= unique1, False)
978
979        # not an IdentitySet
980        def should_raise():
981            not_an_identity_set = object()
982            return unique1 >= not_an_identity_set
983
984        self._assert_unorderable_types(should_raise)
985
986    def test_dunder_gt(self):
987        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
988
989        # basic set math
990        eq_(sub_ > super_, False)
991        eq_(super_ > sub_, True)
992
993        # the same sets
994        eq_(twin1 > twin2, False)
995        eq_(twin2 > twin1, False)
996
997        # totally different sets
998        eq_(unique1 > unique2, False)
999        eq_(unique2 > unique1, False)
1000
1001        # not an IdentitySet
1002        def should_raise():
1003            not_an_identity_set = object()
1004            return unique1 > not_an_identity_set
1005
1006        self._assert_unorderable_types(should_raise)
1007
1008    def test_issubset(self):
1009        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1010
1011        # basic set math
1012        eq_(sub_.issubset(super_), True)
1013        eq_(super_.issubset(sub_), False)
1014
1015        # the same sets
1016        eq_(twin1.issubset(twin2), True)
1017        eq_(twin2.issubset(twin1), True)
1018
1019        # totally different sets
1020        eq_(unique1.issubset(unique2), False)
1021        eq_(unique2.issubset(unique1), False)
1022
1023        # not an IdentitySet
1024        not_an_identity_set = object()
1025        assert_raises(TypeError, unique1.issubset, not_an_identity_set)
1026
1027    def test_issuperset(self):
1028        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1029
1030        # basic set math
1031        eq_(sub_.issuperset(super_), False)
1032        eq_(super_.issuperset(sub_), True)
1033
1034        # the same sets
1035        eq_(twin1.issuperset(twin2), True)
1036        eq_(twin2.issuperset(twin1), True)
1037
1038        # totally different sets
1039        eq_(unique1.issuperset(unique2), False)
1040        eq_(unique2.issuperset(unique1), False)
1041
1042        # not an IdentitySet
1043        not_an_identity_set = object()
1044        assert_raises(TypeError, unique1.issuperset, not_an_identity_set)
1045
1046    def test_union(self):
1047        super_, sub_, twin1, twin2, _, _ = self._create_sets()
1048
1049        # basic set math
1050        eq_(sub_.union(super_), super_)
1051        eq_(super_.union(sub_), super_)
1052
1053        # the same sets
1054        eq_(twin1.union(twin2), twin1)
1055        eq_(twin2.union(twin1), twin1)
1056
1057        # empty sets
1058        empty = util.IdentitySet([])
1059        eq_(empty.union(empty), empty)
1060
1061        # totally different sets
1062        unique1 = util.IdentitySet([1])
1063        unique2 = util.IdentitySet([2])
1064        eq_(unique1.union(unique2), util.IdentitySet([1, 2]))
1065
1066        # not an IdentitySet
1067        not_an_identity_set = object()
1068        assert_raises(TypeError, unique1.union, not_an_identity_set)
1069
1070    def test_dunder_or(self):
1071        super_, sub_, twin1, twin2, _, _ = self._create_sets()
1072
1073        # basic set math
1074        eq_(sub_ | super_, super_)
1075        eq_(super_ | sub_, super_)
1076
1077        # the same sets
1078        eq_(twin1 | twin2, twin1)
1079        eq_(twin2 | twin1, twin1)
1080
1081        # empty sets
1082        empty = util.IdentitySet([])
1083        eq_(empty | empty, empty)
1084
1085        # totally different sets
1086        unique1 = util.IdentitySet([1])
1087        unique2 = util.IdentitySet([2])
1088        eq_(unique1 | unique2, util.IdentitySet([1, 2]))
1089
1090        # not an IdentitySet
1091        def should_raise():
1092            not_an_identity_set = object()
1093            return unique1 | not_an_identity_set
1094
1095        assert_raises(TypeError, should_raise)
1096
1097    def test_update(self):
1098        pass  # TODO
1099
1100    def test_dunder_ior(self):
1101        super_, sub_, _, _, _, _ = self._create_sets()
1102
1103        # basic set math
1104        sub_ |= super_
1105        eq_(sub_, super_)
1106        super_ |= sub_
1107        eq_(super_, super_)
1108
1109        # totally different sets
1110        unique1 = util.IdentitySet([1])
1111        unique2 = util.IdentitySet([2])
1112        unique1 |= unique2
1113        eq_(unique1, util.IdentitySet([1, 2]))
1114        eq_(unique2, util.IdentitySet([2]))
1115
1116        # not an IdentitySet
1117        def should_raise():
1118            unique = util.IdentitySet([1])
1119            not_an_identity_set = object()
1120            unique |= not_an_identity_set
1121
1122        assert_raises(TypeError, should_raise)
1123
1124    def test_difference(self):
1125        _, _, twin1, twin2, _, _ = self._create_sets()
1126
1127        # basic set math
1128        set1 = util.IdentitySet([1, 2, 3])
1129        set2 = util.IdentitySet([2, 3, 4])
1130        eq_(set1.difference(set2), util.IdentitySet([1]))
1131        eq_(set2.difference(set1), util.IdentitySet([4]))
1132
1133        # empty sets
1134        empty = util.IdentitySet([])
1135        eq_(empty.difference(empty), empty)
1136
1137        # the same sets
1138        eq_(twin1.difference(twin2), empty)
1139        eq_(twin2.difference(twin1), empty)
1140
1141        # totally different sets
1142        unique1 = util.IdentitySet([1])
1143        unique2 = util.IdentitySet([2])
1144        eq_(unique1.difference(unique2), util.IdentitySet([1]))
1145        eq_(unique2.difference(unique1), util.IdentitySet([2]))
1146
1147        # not an IdentitySet
1148        not_an_identity_set = object()
1149        assert_raises(TypeError, unique1.difference, not_an_identity_set)
1150
1151    def test_dunder_sub(self):
1152        _, _, twin1, twin2, _, _ = self._create_sets()
1153
1154        # basic set math
1155        set1 = util.IdentitySet([1, 2, 3])
1156        set2 = util.IdentitySet([2, 3, 4])
1157        eq_(set1 - set2, util.IdentitySet([1]))
1158        eq_(set2 - set1, util.IdentitySet([4]))
1159
1160        # empty sets
1161        empty = util.IdentitySet([])
1162        eq_(empty - empty, empty)
1163
1164        # the same sets
1165        eq_(twin1 - twin2, empty)
1166        eq_(twin2 - twin1, empty)
1167
1168        # totally different sets
1169        unique1 = util.IdentitySet([1])
1170        unique2 = util.IdentitySet([2])
1171        eq_(unique1 - unique2, util.IdentitySet([1]))
1172        eq_(unique2 - unique1, util.IdentitySet([2]))
1173
1174        # not an IdentitySet
1175        def should_raise():
1176            not_an_identity_set = object()
1177            unique1 - not_an_identity_set
1178
1179        assert_raises(TypeError, should_raise)
1180
1181    def test_difference_update(self):
1182        pass  # TODO
1183
1184    def test_dunder_isub(self):
1185        pass  # TODO
1186
1187    def test_intersection(self):
1188        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1189
1190        # basic set math
1191        eq_(sub_.intersection(super_), sub_)
1192        eq_(super_.intersection(sub_), sub_)
1193
1194        # the same sets
1195        eq_(twin1.intersection(twin2), twin1)
1196        eq_(twin2.intersection(twin1), twin1)
1197
1198        # empty sets
1199        empty = util.IdentitySet([])
1200        eq_(empty.intersection(empty), empty)
1201
1202        # totally different sets
1203        eq_(unique1.intersection(unique2), empty)
1204
1205        # not an IdentitySet
1206        not_an_identity_set = object()
1207        assert_raises(TypeError, unique1.intersection, not_an_identity_set)
1208
1209    def test_dunder_and(self):
1210        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1211
1212        # basic set math
1213        eq_(sub_ & super_, sub_)
1214        eq_(super_ & sub_, sub_)
1215
1216        # the same sets
1217        eq_(twin1 & twin2, twin1)
1218        eq_(twin2 & twin1, twin1)
1219
1220        # empty sets
1221        empty = util.IdentitySet([])
1222        eq_(empty & empty, empty)
1223
1224        # totally different sets
1225        eq_(unique1 & unique2, empty)
1226
1227        # not an IdentitySet
1228        def should_raise():
1229            not_an_identity_set = object()
1230            return unique1 & not_an_identity_set
1231
1232        assert_raises(TypeError, should_raise)
1233
1234    def test_intersection_update(self):
1235        pass  # TODO
1236
1237    def test_dunder_iand(self):
1238        pass  # TODO
1239
1240    def test_symmetric_difference(self):
1241        _, _, twin1, twin2, _, _ = self._create_sets()
1242
1243        # basic set math
1244        set1 = util.IdentitySet([1, 2, 3])
1245        set2 = util.IdentitySet([2, 3, 4])
1246        eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4]))
1247        eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4]))
1248
1249        # empty sets
1250        empty = util.IdentitySet([])
1251        eq_(empty.symmetric_difference(empty), empty)
1252
1253        # the same sets
1254        eq_(twin1.symmetric_difference(twin2), empty)
1255        eq_(twin2.symmetric_difference(twin1), empty)
1256
1257        # totally different sets
1258        unique1 = util.IdentitySet([1])
1259        unique2 = util.IdentitySet([2])
1260        eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2]))
1261        eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2]))
1262
1263        # not an IdentitySet
1264        not_an_identity_set = object()
1265        assert_raises(
1266            TypeError, unique1.symmetric_difference, not_an_identity_set
1267        )
1268
1269    def test_dunder_xor(self):
1270        _, _, twin1, twin2, _, _ = self._create_sets()
1271
1272        # basic set math
1273        set1 = util.IdentitySet([1, 2, 3])
1274        set2 = util.IdentitySet([2, 3, 4])
1275        eq_(set1 ^ set2, util.IdentitySet([1, 4]))
1276        eq_(set2 ^ set1, util.IdentitySet([1, 4]))
1277
1278        # empty sets
1279        empty = util.IdentitySet([])
1280        eq_(empty ^ empty, empty)
1281
1282        # the same sets
1283        eq_(twin1 ^ twin2, empty)
1284        eq_(twin2 ^ twin1, empty)
1285
1286        # totally different sets
1287        unique1 = util.IdentitySet([1])
1288        unique2 = util.IdentitySet([2])
1289        eq_(unique1 ^ unique2, util.IdentitySet([1, 2]))
1290        eq_(unique2 ^ unique1, util.IdentitySet([1, 2]))
1291
1292        # not an IdentitySet
1293        def should_raise():
1294            not_an_identity_set = object()
1295            return unique1 ^ not_an_identity_set
1296
1297        assert_raises(TypeError, should_raise)
1298
1299    def test_symmetric_difference_update(self):
1300        pass  # TODO
1301
1302    def _create_sets(self):
1303        o1, o2, o3, o4, o5 = object(), object(), object(), object(), object()
1304        super_ = util.IdentitySet([o1, o2, o3])
1305        sub_ = util.IdentitySet([o2])
1306        twin1 = util.IdentitySet([o3])
1307        twin2 = util.IdentitySet([o3])
1308        unique1 = util.IdentitySet([o4])
1309        unique2 = util.IdentitySet([o5])
1310        return super_, sub_, twin1, twin2, unique1, unique2
1311
1312    def _assert_unorderable_types(self, callable_):
1313        if util.py36:
1314            assert_raises_message(
1315                TypeError, "not supported between instances of", callable_
1316            )
1317        elif util.py3k:
1318            assert_raises_message(TypeError, "unorderable types", callable_)
1319        else:
1320            assert_raises_message(
1321                TypeError, "cannot compare sets using cmp()", callable_
1322            )
1323
1324    def test_basic_sanity(self):
1325        IdentitySet = util.IdentitySet
1326
1327        o1, o2, o3 = object(), object(), object()
1328        ids = IdentitySet([o1])
1329        ids.discard(o1)
1330        ids.discard(o1)
1331        ids.add(o1)
1332        ids.remove(o1)
1333        assert_raises(KeyError, ids.remove, o1)
1334
1335        eq_(ids.copy(), ids)
1336
1337        # explicit __eq__ and __ne__ tests
1338        assert ids != None  # noqa
1339        assert not (ids == None)  # noqa
1340
1341        ne_(ids, IdentitySet([o1, o2, o3]))
1342        ids.clear()
1343        assert o1 not in ids
1344        ids.add(o2)
1345        assert o2 in ids
1346        eq_(ids.pop(), o2)
1347        ids.add(o1)
1348        eq_(len(ids), 1)
1349
1350        isuper = IdentitySet([o1, o2])
1351        assert ids < isuper
1352        assert ids.issubset(isuper)
1353        assert isuper.issuperset(ids)
1354        assert isuper > ids
1355
1356        eq_(ids.union(isuper), isuper)
1357        eq_(ids | isuper, isuper)
1358        eq_(isuper - ids, IdentitySet([o2]))
1359        eq_(isuper.difference(ids), IdentitySet([o2]))
1360        eq_(ids.intersection(isuper), IdentitySet([o1]))
1361        eq_(ids & isuper, IdentitySet([o1]))
1362        eq_(ids.symmetric_difference(isuper), IdentitySet([o2]))
1363        eq_(ids ^ isuper, IdentitySet([o2]))
1364
1365        ids.update(isuper)
1366        ids |= isuper
1367        ids.difference_update(isuper)
1368        ids -= isuper
1369        ids.intersection_update(isuper)
1370        ids &= isuper
1371        ids.symmetric_difference_update(isuper)
1372        ids ^= isuper
1373
1374        ids.update("foobar")
1375        try:
1376            ids |= "foobar"
1377            assert False
1378        except TypeError:
1379            assert True
1380
1381        try:
1382            s = set([o1, o2])
1383            s |= ids
1384            assert False
1385        except TypeError:
1386            assert True
1387
1388        assert_raises(TypeError, util.cmp, ids)
1389        assert_raises(TypeError, hash, ids)
1390
1391
1392class OrderedIdentitySetTest(fixtures.TestBase):
1393    def assert_eq(self, identityset, expected_iterable):
1394        expected = [id(o) for o in expected_iterable]
1395        found = [id(o) for o in identityset]
1396        eq_(found, expected)
1397
1398    def test_add(self):
1399        elem = object
1400        s = util.OrderedIdentitySet()
1401        s.add(elem())
1402        s.add(elem())
1403
1404    def test_intersection(self):
1405        elem = object
1406        eq_ = self.assert_eq
1407
1408        a, b, c, d, e, f, g = (
1409            elem(),
1410            elem(),
1411            elem(),
1412            elem(),
1413            elem(),
1414            elem(),
1415            elem(),
1416        )
1417
1418        s1 = util.OrderedIdentitySet([a, b, c])
1419        s2 = util.OrderedIdentitySet([d, e, f])
1420        s3 = util.OrderedIdentitySet([a, d, f, g])
1421        eq_(s1.intersection(s2), [])
1422        eq_(s1.intersection(s3), [a])
1423        eq_(s1.union(s2).intersection(s3), [a, d, f])
1424
1425
1426class DictlikeIteritemsTest(fixtures.TestBase):
1427    baseline = set([("a", 1), ("b", 2), ("c", 3)])
1428
1429    def _ok(self, instance):
1430        iterator = util.dictlike_iteritems(instance)
1431        eq_(set(iterator), self.baseline)
1432
1433    def _notok(self, instance):
1434        assert_raises(TypeError, util.dictlike_iteritems, instance)
1435
1436    def test_dict(self):
1437        d = dict(a=1, b=2, c=3)
1438        self._ok(d)
1439
1440    def test_subdict(self):
1441        class subdict(dict):
1442            pass
1443
1444        d = subdict(a=1, b=2, c=3)
1445        self._ok(d)
1446
1447    if util.py2k:
1448
1449        def test_UserDict(self):
1450            import UserDict
1451
1452            d = UserDict.UserDict(a=1, b=2, c=3)
1453            self._ok(d)
1454
1455    def test_object(self):
1456        self._notok(object())
1457
1458    if util.py2k:
1459
1460        def test_duck_1(self):
1461            class duck1(object):
1462                def iteritems(duck):
1463                    return iter(self.baseline)
1464
1465            self._ok(duck1())
1466
1467    def test_duck_2(self):
1468        class duck2(object):
1469            def items(duck):
1470                return list(self.baseline)
1471
1472        self._ok(duck2())
1473
1474    if util.py2k:
1475
1476        def test_duck_3(self):
1477            class duck3(object):
1478                def iterkeys(duck):
1479                    return iter(["a", "b", "c"])
1480
1481                def __getitem__(duck, key):
1482                    return dict(a=1, b=2, c=3).get(key)
1483
1484            self._ok(duck3())
1485
1486    def test_duck_4(self):
1487        class duck4(object):
1488            def iterkeys(duck):
1489                return iter(["a", "b", "c"])
1490
1491        self._notok(duck4())
1492
1493    def test_duck_5(self):
1494        class duck5(object):
1495            def keys(duck):
1496                return ["a", "b", "c"]
1497
1498            def get(duck, key):
1499                return dict(a=1, b=2, c=3).get(key)
1500
1501        self._ok(duck5())
1502
1503    def test_duck_6(self):
1504        class duck6(object):
1505            def keys(duck):
1506                return ["a", "b", "c"]
1507
1508        self._notok(duck6())
1509
1510
1511class DuckTypeCollectionTest(fixtures.TestBase):
1512    def test_sets(self):
1513        class SetLike(object):
1514            def add(self):
1515                pass
1516
1517        class ForcedSet(list):
1518            __emulates__ = set
1519
1520        for type_ in (set, SetLike, ForcedSet):
1521            eq_(util.duck_type_collection(type_), set)
1522            instance = type_()
1523            eq_(util.duck_type_collection(instance), set)
1524
1525        for type_ in (frozenset,):
1526            is_(util.duck_type_collection(type_), None)
1527            instance = type_()
1528            is_(util.duck_type_collection(instance), None)
1529
1530
1531class PublicFactoryTest(fixtures.TestBase):
1532    def _fixture(self):
1533        class Thingy(object):
1534            def __init__(self, value):
1535                "make a thingy"
1536                self.value = value
1537
1538            @classmethod
1539            def foobar(cls, x, y):
1540                "do the foobar"
1541                return Thingy(x + y)
1542
1543        return Thingy
1544
1545    def test_classmethod(self):
1546        Thingy = self._fixture()
1547        foob = langhelpers.public_factory(Thingy.foobar, ".sql.elements.foob")
1548        eq_(foob(3, 4).value, 7)
1549        eq_(foob(x=3, y=4).value, 7)
1550        eq_(foob.__doc__, "do the foobar")
1551        eq_(foob.__module__, "sqlalchemy.sql.elements")
1552        assert Thingy.foobar.__doc__.startswith("This function is mirrored;")
1553
1554    def test_constructor(self):
1555        Thingy = self._fixture()
1556        foob = langhelpers.public_factory(Thingy, ".sql.elements.foob")
1557        eq_(foob(7).value, 7)
1558        eq_(foob(value=7).value, 7)
1559        eq_(foob.__doc__, "make a thingy")
1560        eq_(foob.__module__, "sqlalchemy.sql.elements")
1561        assert Thingy.__init__.__doc__.startswith(
1562            "Construct a new :class:`.Thingy` object."
1563        )
1564
1565
1566class ArgInspectionTest(fixtures.TestBase):
1567    def test_get_cls_kwargs(self):
1568        class A(object):
1569            def __init__(self, a):
1570                pass
1571
1572        class A1(A):
1573            def __init__(self, a1):
1574                pass
1575
1576        class A11(A1):
1577            def __init__(self, a11, **kw):
1578                pass
1579
1580        class B(object):
1581            def __init__(self, b, **kw):
1582                pass
1583
1584        class B1(B):
1585            def __init__(self, b1, **kw):
1586                pass
1587
1588        class B2(B):
1589            def __init__(self, b2):
1590                pass
1591
1592        class AB(A, B):
1593            def __init__(self, ab):
1594                pass
1595
1596        class BA(B, A):
1597            def __init__(self, ba, **kwargs):
1598                pass
1599
1600        class BA1(BA):
1601            pass
1602
1603        class CAB(A, B):
1604            pass
1605
1606        class CBA(B, A):
1607            pass
1608
1609        class CB1A1(B1, A1):
1610            pass
1611
1612        class CAB1(A, B1):
1613            pass
1614
1615        class CB1A(B1, A):
1616            pass
1617
1618        class CB2A(B2, A):
1619            pass
1620
1621        class D(object):
1622            pass
1623
1624        class BA2(B, A):
1625            pass
1626
1627        class A11B1(A11, B1):
1628            pass
1629
1630        def test(cls, *expected):
1631            eq_(set(util.get_cls_kwargs(cls)), set(expected))
1632
1633        test(A, "a")
1634        test(A1, "a1")
1635        test(A11, "a11", "a1")
1636        test(B, "b")
1637        test(B1, "b1", "b")
1638        test(AB, "ab")
1639        test(BA, "ba", "b", "a")
1640        test(BA1, "ba", "b", "a")
1641        test(CAB, "a")
1642        test(CBA, "b", "a")
1643        test(CAB1, "a")
1644        test(CB1A, "b1", "b", "a")
1645        test(CB2A, "b2")
1646        test(CB1A1, "a1", "b1", "b")
1647        test(D)
1648        test(BA2, "a", "b")
1649        test(A11B1, "a1", "a11", "b", "b1")
1650
1651    def test_get_func_kwargs(self):
1652        def f1():
1653            pass
1654
1655        def f2(foo):
1656            pass
1657
1658        def f3(*foo):
1659            pass
1660
1661        def f4(**foo):
1662            pass
1663
1664        def test(fn, *expected):
1665            eq_(set(util.get_func_kwargs(fn)), set(expected))
1666
1667        test(f1)
1668        test(f2, "foo")
1669        test(f3)
1670        test(f4)
1671
1672    def test_callable_argspec_fn(self):
1673        def foo(x, y, **kw):
1674            pass
1675
1676        eq_(get_callable_argspec(foo), (["x", "y"], None, "kw", None))
1677
1678    def test_callable_argspec_fn_no_self(self):
1679        def foo(x, y, **kw):
1680            pass
1681
1682        eq_(
1683            get_callable_argspec(foo, no_self=True),
1684            (["x", "y"], None, "kw", None),
1685        )
1686
1687    def test_callable_argspec_fn_no_self_but_self(self):
1688        def foo(self, x, y, **kw):
1689            pass
1690
1691        eq_(
1692            get_callable_argspec(foo, no_self=True),
1693            (["self", "x", "y"], None, "kw", None),
1694        )
1695
1696    @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
1697    def test_callable_argspec_py_builtin(self):
1698        import datetime
1699
1700        assert_raises(TypeError, get_callable_argspec, datetime.datetime.now)
1701
1702    @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
1703    def test_callable_argspec_obj_init(self):
1704        assert_raises(TypeError, get_callable_argspec, object)
1705
1706    def test_callable_argspec_method(self):
1707        class Foo(object):
1708            def foo(self, x, y, **kw):
1709                pass
1710
1711        eq_(
1712            get_callable_argspec(Foo.foo),
1713            (["self", "x", "y"], None, "kw", None),
1714        )
1715
1716    def test_callable_argspec_instance_method_no_self(self):
1717        class Foo(object):
1718            def foo(self, x, y, **kw):
1719                pass
1720
1721        eq_(
1722            get_callable_argspec(Foo().foo, no_self=True),
1723            (["x", "y"], None, "kw", None),
1724        )
1725
1726    def test_callable_argspec_unbound_method_no_self(self):
1727        class Foo(object):
1728            def foo(self, x, y, **kw):
1729                pass
1730
1731        eq_(
1732            get_callable_argspec(Foo.foo, no_self=True),
1733            (["self", "x", "y"], None, "kw", None),
1734        )
1735
1736    def test_callable_argspec_init(self):
1737        class Foo(object):
1738            def __init__(self, x, y):
1739                pass
1740
1741        eq_(get_callable_argspec(Foo), (["self", "x", "y"], None, None, None))
1742
1743    def test_callable_argspec_init_no_self(self):
1744        class Foo(object):
1745            def __init__(self, x, y):
1746                pass
1747
1748        eq_(
1749            get_callable_argspec(Foo, no_self=True),
1750            (["x", "y"], None, None, None),
1751        )
1752
1753    def test_callable_argspec_call(self):
1754        class Foo(object):
1755            def __call__(self, x, y):
1756                pass
1757
1758        eq_(
1759            get_callable_argspec(Foo()), (["self", "x", "y"], None, None, None)
1760        )
1761
1762    def test_callable_argspec_call_no_self(self):
1763        class Foo(object):
1764            def __call__(self, x, y):
1765                pass
1766
1767        eq_(
1768            get_callable_argspec(Foo(), no_self=True),
1769            (["x", "y"], None, None, None),
1770        )
1771
1772    @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
1773    def test_callable_argspec_partial(self):
1774        from functools import partial
1775
1776        def foo(x, y, z, **kw):
1777            pass
1778
1779        bar = partial(foo, 5)
1780
1781        assert_raises(TypeError, get_callable_argspec, bar)
1782
1783
1784class SymbolTest(fixtures.TestBase):
1785    def test_basic(self):
1786        sym1 = util.symbol("foo")
1787        assert sym1.name == "foo"
1788        sym2 = util.symbol("foo")
1789
1790        assert sym1 is sym2
1791        assert sym1 == sym2
1792
1793        sym3 = util.symbol("bar")
1794        assert sym1 is not sym3
1795        assert sym1 != sym3
1796
1797    def test_pickle(self):
1798        sym1 = util.symbol("foo")
1799        sym2 = util.symbol("foo")
1800
1801        assert sym1 is sym2
1802
1803        # default
1804        s = util.pickle.dumps(sym1)
1805        sym3 = util.pickle.loads(s)
1806
1807        for protocol in 0, 1, 2:
1808            print(protocol)
1809            serial = util.pickle.dumps(sym1)
1810            rt = util.pickle.loads(serial)
1811            assert rt is sym1
1812            assert rt is sym2
1813
1814    def test_bitflags(self):
1815        sym1 = util.symbol("sym1", canonical=1)
1816        sym2 = util.symbol("sym2", canonical=2)
1817
1818        assert sym1 & sym1
1819        assert not sym1 & sym2
1820        assert not sym1 & sym1 & sym2
1821
1822    def test_composites(self):
1823        sym1 = util.symbol("sym1", canonical=1)
1824        sym2 = util.symbol("sym2", canonical=2)
1825        sym3 = util.symbol("sym3", canonical=4)
1826        sym4 = util.symbol("sym4", canonical=8)
1827
1828        assert sym1 & (sym2 | sym1 | sym4)
1829        assert not sym1 & (sym2 | sym3)
1830
1831        assert not (sym1 | sym2) & (sym3 | sym4)
1832        assert (sym1 | sym2) & (sym2 | sym4)
1833
1834
1835class TestFormatArgspec(fixtures.TestBase):
1836    def test_specs(self):
1837        def test(fn, wanted, grouped=None):
1838            if grouped is None:
1839                parsed = util.format_argspec_plus(fn)
1840            else:
1841                parsed = util.format_argspec_plus(fn, grouped=grouped)
1842            eq_(parsed, wanted)
1843
1844        test(
1845            lambda: None,
1846            {
1847                "args": "()",
1848                "self_arg": None,
1849                "apply_kw": "()",
1850                "apply_pos": "()",
1851            },
1852        )
1853
1854        test(
1855            lambda: None,
1856            {"args": "", "self_arg": None, "apply_kw": "", "apply_pos": ""},
1857            grouped=False,
1858        )
1859
1860        test(
1861            lambda self: None,
1862            {
1863                "args": "(self)",
1864                "self_arg": "self",
1865                "apply_kw": "(self)",
1866                "apply_pos": "(self)",
1867            },
1868        )
1869
1870        test(
1871            lambda self: None,
1872            {
1873                "args": "self",
1874                "self_arg": "self",
1875                "apply_kw": "self",
1876                "apply_pos": "self",
1877            },
1878            grouped=False,
1879        )
1880
1881        test(
1882            lambda *a: None,
1883            {
1884                "args": "(*a)",
1885                "self_arg": "a[0]",
1886                "apply_kw": "(*a)",
1887                "apply_pos": "(*a)",
1888            },
1889        )
1890
1891        test(
1892            lambda **kw: None,
1893            {
1894                "args": "(**kw)",
1895                "self_arg": None,
1896                "apply_kw": "(**kw)",
1897                "apply_pos": "(**kw)",
1898            },
1899        )
1900
1901        test(
1902            lambda *a, **kw: None,
1903            {
1904                "args": "(*a, **kw)",
1905                "self_arg": "a[0]",
1906                "apply_kw": "(*a, **kw)",
1907                "apply_pos": "(*a, **kw)",
1908            },
1909        )
1910
1911        test(
1912            lambda a, *b: None,
1913            {
1914                "args": "(a, *b)",
1915                "self_arg": "a",
1916                "apply_kw": "(a, *b)",
1917                "apply_pos": "(a, *b)",
1918            },
1919        )
1920
1921        test(
1922            lambda a, **b: None,
1923            {
1924                "args": "(a, **b)",
1925                "self_arg": "a",
1926                "apply_kw": "(a, **b)",
1927                "apply_pos": "(a, **b)",
1928            },
1929        )
1930
1931        test(
1932            lambda a, *b, **c: None,
1933            {
1934                "args": "(a, *b, **c)",
1935                "self_arg": "a",
1936                "apply_kw": "(a, *b, **c)",
1937                "apply_pos": "(a, *b, **c)",
1938            },
1939        )
1940
1941        test(
1942            lambda a, b=1, **c: None,
1943            {
1944                "args": "(a, b=1, **c)",
1945                "self_arg": "a",
1946                "apply_kw": "(a, b=b, **c)",
1947                "apply_pos": "(a, b, **c)",
1948            },
1949        )
1950
1951        test(
1952            lambda a=1, b=2: None,
1953            {
1954                "args": "(a=1, b=2)",
1955                "self_arg": "a",
1956                "apply_kw": "(a=a, b=b)",
1957                "apply_pos": "(a, b)",
1958            },
1959        )
1960
1961        test(
1962            lambda a=1, b=2: None,
1963            {
1964                "args": "a=1, b=2",
1965                "self_arg": "a",
1966                "apply_kw": "a=a, b=b",
1967                "apply_pos": "a, b",
1968            },
1969            grouped=False,
1970        )
1971
1972    @testing.fails_if(
1973        lambda: util.pypy,
1974        "pypy doesn't report Obj.__init__ as object.__init__",
1975    )
1976    def test_init_grouped(self):
1977        object_spec = {
1978            "args": "(self)",
1979            "self_arg": "self",
1980            "apply_pos": "(self)",
1981            "apply_kw": "(self)",
1982        }
1983        wrapper_spec = {
1984            "args": "(self, *args, **kwargs)",
1985            "self_arg": "self",
1986            "apply_pos": "(self, *args, **kwargs)",
1987            "apply_kw": "(self, *args, **kwargs)",
1988        }
1989        custom_spec = {
1990            "args": "(slef, a=123)",
1991            "self_arg": "slef",  # yes, slef
1992            "apply_pos": "(slef, a)",
1993            "apply_kw": "(slef, a=a)",
1994        }
1995
1996        self._test_init(None, object_spec, wrapper_spec, custom_spec)
1997        self._test_init(True, object_spec, wrapper_spec, custom_spec)
1998
1999    @testing.fails_if(
2000        lambda: util.pypy,
2001        "pypy doesn't report Obj.__init__ as object.__init__",
2002    )
2003    def test_init_bare(self):
2004        object_spec = {
2005            "args": "self",
2006            "self_arg": "self",
2007            "apply_pos": "self",
2008            "apply_kw": "self",
2009        }
2010        wrapper_spec = {
2011            "args": "self, *args, **kwargs",
2012            "self_arg": "self",
2013            "apply_pos": "self, *args, **kwargs",
2014            "apply_kw": "self, *args, **kwargs",
2015        }
2016        custom_spec = {
2017            "args": "slef, a=123",
2018            "self_arg": "slef",  # yes, slef
2019            "apply_pos": "slef, a",
2020            "apply_kw": "slef, a=a",
2021        }
2022
2023        self._test_init(False, object_spec, wrapper_spec, custom_spec)
2024
2025    def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec):
2026        def test(fn, wanted):
2027            if grouped is None:
2028                parsed = util.format_argspec_init(fn)
2029            else:
2030                parsed = util.format_argspec_init(fn, grouped=grouped)
2031            eq_(parsed, wanted)
2032
2033        class Obj(object):
2034            pass
2035
2036        test(Obj.__init__, object_spec)
2037
2038        class Obj(object):
2039            def __init__(self):
2040                pass
2041
2042        test(Obj.__init__, object_spec)
2043
2044        class Obj(object):
2045            def __init__(slef, a=123):
2046                pass
2047
2048        test(Obj.__init__, custom_spec)
2049
2050        class Obj(list):
2051            pass
2052
2053        test(Obj.__init__, wrapper_spec)
2054
2055        class Obj(list):
2056            def __init__(self, *args, **kwargs):
2057                pass
2058
2059        test(Obj.__init__, wrapper_spec)
2060
2061        class Obj(list):
2062            def __init__(self):
2063                pass
2064
2065        test(Obj.__init__, object_spec)
2066
2067        class Obj(list):
2068            def __init__(slef, a=123):
2069                pass
2070
2071        test(Obj.__init__, custom_spec)
2072
2073
2074class GenericReprTest(fixtures.TestBase):
2075    def test_all_positional(self):
2076        class Foo(object):
2077            def __init__(self, a, b, c):
2078                self.a = a
2079                self.b = b
2080                self.c = c
2081
2082        eq_(util.generic_repr(Foo(1, 2, 3)), "Foo(1, 2, 3)")
2083
2084    def test_positional_plus_kw(self):
2085        class Foo(object):
2086            def __init__(self, a, b, c=5, d=4):
2087                self.a = a
2088                self.b = b
2089                self.c = c
2090                self.d = d
2091
2092        eq_(util.generic_repr(Foo(1, 2, 3, 6)), "Foo(1, 2, c=3, d=6)")
2093
2094    def test_kw_defaults(self):
2095        class Foo(object):
2096            def __init__(self, a=1, b=2, c=3, d=4):
2097                self.a = a
2098                self.b = b
2099                self.c = c
2100                self.d = d
2101
2102        eq_(util.generic_repr(Foo(1, 5, 3, 7)), "Foo(b=5, d=7)")
2103
2104    def test_multi_kw(self):
2105        class Foo(object):
2106            def __init__(self, a, b, c=3, d=4):
2107                self.a = a
2108                self.b = b
2109                self.c = c
2110                self.d = d
2111
2112        class Bar(Foo):
2113            def __init__(self, e, f, g=5, **kw):
2114                self.e = e
2115                self.f = f
2116                self.g = g
2117                super(Bar, self).__init__(**kw)
2118
2119        eq_(
2120            util.generic_repr(
2121                Bar("e", "f", g=7, a=6, b=5, d=9), to_inspect=[Bar, Foo]
2122            ),
2123            "Bar('e', 'f', g=7, a=6, b=5, d=9)",
2124        )
2125
2126        eq_(
2127            util.generic_repr(Bar("e", "f", a=6, b=5), to_inspect=[Bar, Foo]),
2128            "Bar('e', 'f', a=6, b=5)",
2129        )
2130
2131    def test_multi_kw_repeated(self):
2132        class Foo(object):
2133            def __init__(self, a=1, b=2):
2134                self.a = a
2135                self.b = b
2136
2137        class Bar(Foo):
2138            def __init__(self, b=3, c=4, **kw):
2139                self.c = c
2140                super(Bar, self).__init__(b=b, **kw)
2141
2142        eq_(
2143            util.generic_repr(Bar(a="a", b="b", c="c"), to_inspect=[Bar, Foo]),
2144            "Bar(b='b', c='c', a='a')",
2145        )
2146
2147    def test_discard_vargs(self):
2148        class Foo(object):
2149            def __init__(self, a, b, *args):
2150                self.a = a
2151                self.b = b
2152                self.c, self.d = args[0:2]
2153
2154        eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2)")
2155
2156    def test_discard_vargs_kwargs(self):
2157        class Foo(object):
2158            def __init__(self, a, b, *args, **kw):
2159                self.a = a
2160                self.b = b
2161                self.c, self.d = args[0:2]
2162
2163        eq_(util.generic_repr(Foo(1, 2, 3, 4, x=7, y=4)), "Foo(1, 2)")
2164
2165    def test_significant_vargs(self):
2166        class Foo(object):
2167            def __init__(self, a, b, *args):
2168                self.a = a
2169                self.b = b
2170                self.args = args
2171
2172        eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2, 3, 4)")
2173
2174    def test_no_args(self):
2175        class Foo(object):
2176            def __init__(self):
2177                pass
2178
2179        eq_(util.generic_repr(Foo()), "Foo()")
2180
2181    def test_no_init(self):
2182        class Foo(object):
2183            pass
2184
2185        eq_(util.generic_repr(Foo()), "Foo()")
2186
2187
2188class AsInterfaceTest(fixtures.TestBase):
2189    class Something(object):
2190        def _ignoreme(self):
2191            pass
2192
2193        def foo(self):
2194            pass
2195
2196        def bar(self):
2197            pass
2198
2199    class Partial(object):
2200        def bar(self):
2201            pass
2202
2203    class Object(object):
2204        pass
2205
2206    def test_instance(self):
2207        obj = object()
2208        assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
2209
2210        assert_raises(TypeError, util.as_interface, obj, methods=("foo"))
2211
2212        assert_raises(
2213            TypeError,
2214            util.as_interface,
2215            obj,
2216            cls=self.Something,
2217            required=("foo"),
2218        )
2219
2220        obj = self.Something()
2221        eq_(obj, util.as_interface(obj, cls=self.Something))
2222        eq_(obj, util.as_interface(obj, methods=("foo",)))
2223        eq_(
2224            obj,
2225            util.as_interface(
2226                obj, cls=self.Something, required=("outofband",)
2227            ),
2228        )
2229        partial = self.Partial()
2230
2231        slotted = self.Object()
2232        slotted.bar = lambda self: 123
2233
2234        for obj in partial, slotted:
2235            eq_(obj, util.as_interface(obj, cls=self.Something))
2236            assert_raises(TypeError, util.as_interface, obj, methods=("foo"))
2237            eq_(obj, util.as_interface(obj, methods=("bar",)))
2238            eq_(
2239                obj,
2240                util.as_interface(obj, cls=self.Something, required=("bar",)),
2241            )
2242            assert_raises(
2243                TypeError,
2244                util.as_interface,
2245                obj,
2246                cls=self.Something,
2247                required=("foo",),
2248            )
2249
2250            assert_raises(
2251                TypeError,
2252                util.as_interface,
2253                obj,
2254                cls=self.Something,
2255                required=self.Something,
2256            )
2257
2258    def test_dict(self):
2259        obj = {}
2260        assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
2261        assert_raises(TypeError, util.as_interface, obj, methods="foo")
2262        assert_raises(
2263            TypeError,
2264            util.as_interface,
2265            obj,
2266            cls=self.Something,
2267            required="foo",
2268        )
2269
2270        def assertAdapted(obj, *methods):
2271            assert isinstance(obj, type)
2272            found = set([m for m in dir(obj) if not m.startswith("_")])
2273            for method in methods:
2274                assert method in found
2275                found.remove(method)
2276            assert not found
2277
2278        def fn(self):
2279            return 123
2280
2281        obj = {"foo": fn, "bar": fn}
2282        res = util.as_interface(obj, cls=self.Something)
2283        assertAdapted(res, "foo", "bar")
2284        res = util.as_interface(
2285            obj, cls=self.Something, required=self.Something
2286        )
2287        assertAdapted(res, "foo", "bar")
2288        res = util.as_interface(obj, cls=self.Something, required=("foo",))
2289        assertAdapted(res, "foo", "bar")
2290        res = util.as_interface(obj, methods=("foo", "bar"))
2291        assertAdapted(res, "foo", "bar")
2292        res = util.as_interface(obj, methods=("foo", "bar", "baz"))
2293        assertAdapted(res, "foo", "bar")
2294        res = util.as_interface(obj, methods=("foo", "bar"), required=("foo",))
2295        assertAdapted(res, "foo", "bar")
2296        assert_raises(TypeError, util.as_interface, obj, methods=("foo",))
2297        assert_raises(
2298            TypeError,
2299            util.as_interface,
2300            obj,
2301            methods=("foo", "bar", "baz"),
2302            required=("baz",),
2303        )
2304        obj = {"foo": 123}
2305        assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
2306
2307
2308class TestClassHierarchy(fixtures.TestBase):
2309    def test_object(self):
2310        eq_(set(util.class_hierarchy(object)), set((object,)))
2311
2312    def test_single(self):
2313        class A(object):
2314            pass
2315
2316        class B(object):
2317            pass
2318
2319        eq_(set(util.class_hierarchy(A)), set((A, object)))
2320        eq_(set(util.class_hierarchy(B)), set((B, object)))
2321
2322        class C(A, B):
2323            pass
2324
2325        eq_(set(util.class_hierarchy(A)), set((A, B, C, object)))
2326        eq_(set(util.class_hierarchy(B)), set((A, B, C, object)))
2327
2328    if util.py2k:
2329
2330        def test_oldstyle_mixin(self):
2331            class A(object):
2332                pass
2333
2334            class Mixin:
2335                pass
2336
2337            class B(A, Mixin):
2338                pass
2339
2340            eq_(set(util.class_hierarchy(B)), set((A, B, object)))
2341            eq_(set(util.class_hierarchy(Mixin)), set())
2342            eq_(set(util.class_hierarchy(A)), set((A, B, object)))
2343
2344
2345class ReraiseTest(fixtures.TestBase):
2346    @testing.requires.python3
2347    def test_raise_from_cause_same_cause(self):
2348        class MyException(Exception):
2349            pass
2350
2351        def go():
2352            try:
2353                raise MyException("exc one")
2354            except Exception as err:
2355                util.raise_from_cause(err)
2356
2357        try:
2358            go()
2359            assert False
2360        except MyException as err:
2361            is_(err.__cause__, None)
2362
2363    def test_reraise_disallow_same_cause(self):
2364        class MyException(Exception):
2365            pass
2366
2367        def go():
2368            try:
2369                raise MyException("exc one")
2370            except Exception as err:
2371                type_, value, tb = sys.exc_info()
2372                util.reraise(type_, err, tb, value)
2373
2374        assert_raises_message(AssertionError, "Same cause emitted", go)
2375
2376    def test_raise_from_cause(self):
2377        class MyException(Exception):
2378            pass
2379
2380        class MyOtherException(Exception):
2381            pass
2382
2383        me = MyException("exc on")
2384
2385        def go():
2386            try:
2387                raise me
2388            except Exception:
2389                util.raise_from_cause(MyOtherException("exc two"))
2390
2391        try:
2392            go()
2393            assert False
2394        except MyOtherException as moe:
2395            if testing.requires.python3.enabled:
2396                is_(moe.__cause__, me)
2397
2398    @testing.requires.python2
2399    def test_safe_reraise_py2k_warning(self):
2400        class MyException(Exception):
2401            pass
2402
2403        class MyOtherException(Exception):
2404            pass
2405
2406        m1 = MyException("exc one")
2407        m2 = MyOtherException("exc two")
2408
2409        def go2():
2410            raise m2
2411
2412        def go():
2413            try:
2414                raise m1
2415            except Exception:
2416                with util.safe_reraise():
2417                    go2()
2418
2419        with expect_warnings(
2420            "An exception has occurred during handling of a previous "
2421            "exception.  The previous exception "
2422            "is:.*MyException.*exc one"
2423        ):
2424            try:
2425                go()
2426                assert False
2427            except MyOtherException:
2428                pass
2429
2430
2431class TestClassProperty(fixtures.TestBase):
2432    def test_simple(self):
2433        class A(object):
2434            something = {"foo": 1}
2435
2436        class B(A):
2437            @classproperty
2438            def something(cls):
2439                d = dict(super(B, cls).something)
2440                d.update({"bazz": 2})
2441                return d
2442
2443        eq_(B.something, {"foo": 1, "bazz": 2})
2444
2445
2446class TestProperties(fixtures.TestBase):
2447    def test_pickle(self):
2448        data = {"hello": "bla"}
2449        props = util.Properties(data)
2450
2451        for loader, dumper in picklers():
2452            s = dumper(props)
2453            p = loader(s)
2454
2455            eq_(props._data, p._data)
2456            eq_(props.keys(), p.keys())
2457
2458    def test_pickle_immuatbleprops(self):
2459        data = {"hello": "bla"}
2460        props = util.Properties(data).as_immutable()
2461
2462        for loader, dumper in picklers():
2463            s = dumper(props)
2464            p = loader(s)
2465
2466            eq_(props._data, p._data)
2467            eq_(props.keys(), p.keys())
2468
2469    def test_pickle_orderedprops(self):
2470        data = {"hello": "bla"}
2471        props = util.OrderedProperties()
2472        props.update(data)
2473
2474        for loader, dumper in picklers():
2475            s = dumper(props)
2476            p = loader(s)
2477
2478            eq_(props._data, p._data)
2479            eq_(props.keys(), p.keys())
2480
2481
2482class QuotedTokenParserTest(fixtures.TestBase):
2483    def _test(self, string, expected):
2484        eq_(langhelpers.quoted_token_parser(string), expected)
2485
2486    def test_single(self):
2487        self._test("name", ["name"])
2488
2489    def test_dotted(self):
2490        self._test("schema.name", ["schema", "name"])
2491
2492    def test_dotted_quoted_left(self):
2493        self._test('"Schema".name', ["Schema", "name"])
2494
2495    def test_dotted_quoted_left_w_quote_left_edge(self):
2496        self._test('"""Schema".name', ['"Schema', "name"])
2497
2498    def test_dotted_quoted_left_w_quote_right_edge(self):
2499        self._test('"Schema""".name', ['Schema"', "name"])
2500
2501    def test_dotted_quoted_left_w_quote_middle(self):
2502        self._test('"Sch""ema".name', ['Sch"ema', "name"])
2503
2504    def test_dotted_quoted_right(self):
2505        self._test('schema."SomeName"', ["schema", "SomeName"])
2506
2507    def test_dotted_quoted_right_w_quote_left_edge(self):
2508        self._test('schema."""name"', ["schema", '"name'])
2509
2510    def test_dotted_quoted_right_w_quote_right_edge(self):
2511        self._test('schema."name"""', ["schema", 'name"'])
2512
2513    def test_dotted_quoted_right_w_quote_middle(self):
2514        self._test('schema."na""me"', ["schema", 'na"me'])
2515
2516    def test_quoted_single_w_quote_left_edge(self):
2517        self._test('"""name"', ['"name'])
2518
2519    def test_quoted_single_w_quote_right_edge(self):
2520        self._test('"name"""', ['name"'])
2521
2522    def test_quoted_single_w_quote_middle(self):
2523        self._test('"na""me"', ['na"me'])
2524
2525    def test_dotted_quoted_left_w_dot_left_edge(self):
2526        self._test('".Schema".name', [".Schema", "name"])
2527
2528    def test_dotted_quoted_left_w_dot_right_edge(self):
2529        self._test('"Schema.".name', ["Schema.", "name"])
2530
2531    def test_dotted_quoted_left_w_dot_middle(self):
2532        self._test('"Sch.ema".name', ["Sch.ema", "name"])
2533
2534    def test_dotted_quoted_right_w_dot_left_edge(self):
2535        self._test('schema.".name"', ["schema", ".name"])
2536
2537    def test_dotted_quoted_right_w_dot_right_edge(self):
2538        self._test('schema."name."', ["schema", "name."])
2539
2540    def test_dotted_quoted_right_w_dot_middle(self):
2541        self._test('schema."na.me"', ["schema", "na.me"])
2542
2543    def test_quoted_single_w_dot_left_edge(self):
2544        self._test('".name"', [".name"])
2545
2546    def test_quoted_single_w_dot_right_edge(self):
2547        self._test('"name."', ["name."])
2548
2549    def test_quoted_single_w_dot_middle(self):
2550        self._test('"na.me"', ["na.me"])
2551
2552
2553class BackslashReplaceTest(fixtures.TestBase):
2554    def test_ascii_to_utf8(self):
2555        eq_(
2556            compat.decode_backslashreplace(util.b("hello world"), "utf-8"),
2557            util.u("hello world"),
2558        )
2559
2560    def test_utf8_to_utf8(self):
2561        eq_(
2562            compat.decode_backslashreplace(
2563                util.u("some message méil").encode("utf-8"), "utf-8"
2564            ),
2565            util.u("some message méil"),
2566        )
2567
2568    def test_latin1_to_utf8(self):
2569        eq_(
2570            compat.decode_backslashreplace(
2571                util.u("some message méil").encode("latin-1"), "utf-8"
2572            ),
2573            util.u("some message m\\xe9il"),
2574        )
2575
2576        eq_(
2577            compat.decode_backslashreplace(
2578                util.u("some message méil").encode("latin-1"), "latin-1"
2579            ),
2580            util.u("some message méil"),
2581        )
2582
2583    def test_cp1251_to_utf8(self):
2584        message = util.u("some message П").encode("cp1251")
2585        eq_(message, b"some message \xcf")
2586        eq_(
2587            compat.decode_backslashreplace(message, "utf-8"),
2588            util.u("some message \\xcf"),
2589        )
2590
2591        eq_(
2592            compat.decode_backslashreplace(message, "cp1251"),
2593            util.u("some message П"),
2594        )
2595