1import copy
2import sys
3
4from sqlalchemy import util, sql, exc, testing
5from sqlalchemy.testing import assert_raises, assert_raises_message, fixtures
6from sqlalchemy.testing import eq_, is_, ne_, fails_if, mock, expect_warnings
7from sqlalchemy.testing.util import picklers, gc_collect
8from sqlalchemy.util import classproperty, WeakSequence, get_callable_argspec
9from sqlalchemy.sql import column
10from sqlalchemy.util import langhelpers, compat
11import inspect
12
13
14class _KeyedTupleTest(object):
15
16    def _fixture(self, values, labels):
17        raise NotImplementedError()
18
19    def test_empty(self):
20        keyed_tuple = self._fixture([], [])
21        eq_(str(keyed_tuple), '()')
22        eq_(len(keyed_tuple), 0)
23
24        eq_(list(keyed_tuple.keys()), [])
25        eq_(keyed_tuple._fields, ())
26        eq_(keyed_tuple._asdict(), {})
27
28    def test_values_but_no_labels(self):
29        keyed_tuple = self._fixture([1, 2], [])
30        eq_(str(keyed_tuple), '(1, 2)')
31        eq_(len(keyed_tuple), 2)
32
33        eq_(list(keyed_tuple.keys()), [])
34        eq_(keyed_tuple._fields, ())
35        eq_(keyed_tuple._asdict(), {})
36
37        eq_(keyed_tuple[0], 1)
38        eq_(keyed_tuple[1], 2)
39
40    def test_basic_creation(self):
41        keyed_tuple = self._fixture([1, 2], ['a', 'b'])
42        eq_(str(keyed_tuple), '(1, 2)')
43        eq_(list(keyed_tuple.keys()), ['a', 'b'])
44        eq_(keyed_tuple._fields, ('a', 'b'))
45        eq_(keyed_tuple._asdict(), {'a': 1, 'b': 2})
46
47    def test_basic_index_access(self):
48        keyed_tuple = self._fixture([1, 2], ['a', 'b'])
49        eq_(keyed_tuple[0], 1)
50        eq_(keyed_tuple[1], 2)
51
52        def should_raise():
53            keyed_tuple[2]
54        assert_raises(IndexError, should_raise)
55
56    def test_basic_attribute_access(self):
57        keyed_tuple = self._fixture([1, 2], ['a', 'b'])
58        eq_(keyed_tuple.a, 1)
59        eq_(keyed_tuple.b, 2)
60
61        def should_raise():
62            keyed_tuple.c
63        assert_raises(AttributeError, should_raise)
64
65    def test_none_label(self):
66        keyed_tuple = self._fixture([1, 2, 3], ['a', None, 'b'])
67        eq_(str(keyed_tuple), '(1, 2, 3)')
68
69        eq_(list(keyed_tuple.keys()), ['a', 'b'])
70        eq_(keyed_tuple._fields, ('a', 'b'))
71        eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3})
72
73        # attribute access: can't get at value 2
74        eq_(keyed_tuple.a, 1)
75        eq_(keyed_tuple.b, 3)
76
77        # index access: can get at value 2
78        eq_(keyed_tuple[0], 1)
79        eq_(keyed_tuple[1], 2)
80        eq_(keyed_tuple[2], 3)
81
82    def test_duplicate_labels(self):
83        keyed_tuple = self._fixture([1, 2, 3], ['a', 'b', 'b'])
84        eq_(str(keyed_tuple), '(1, 2, 3)')
85
86        eq_(list(keyed_tuple.keys()), ['a', 'b', 'b'])
87        eq_(keyed_tuple._fields, ('a', 'b', 'b'))
88        eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3})
89
90        # attribute access: can't get at value 2
91        eq_(keyed_tuple.a, 1)
92        eq_(keyed_tuple.b, 3)
93
94        # index access: can get at value 2
95        eq_(keyed_tuple[0], 1)
96        eq_(keyed_tuple[1], 2)
97        eq_(keyed_tuple[2], 3)
98
99    def test_immutable(self):
100        keyed_tuple = self._fixture([1, 2], ['a', 'b'])
101        eq_(str(keyed_tuple), '(1, 2)')
102
103        eq_(keyed_tuple.a, 1)
104
105        assert_raises(AttributeError, setattr, keyed_tuple, "a", 5)
106
107        def should_raise():
108            keyed_tuple[0] = 100
109        assert_raises(TypeError, should_raise)
110
111    def test_serialize(self):
112
113        keyed_tuple = self._fixture([1, 2, 3], ['a', None, 'b'])
114
115        for loads, dumps in picklers():
116            kt = loads(dumps(keyed_tuple))
117
118            eq_(str(kt), '(1, 2, 3)')
119
120            eq_(list(kt.keys()), ['a', 'b'])
121            eq_(kt._fields, ('a', 'b'))
122            eq_(kt._asdict(), {'a': 1, 'b': 3})
123
124
125class KeyedTupleTest(_KeyedTupleTest, fixtures.TestBase):
126    def _fixture(self, values, labels):
127        return util.KeyedTuple(values, labels)
128
129
130class LWKeyedTupleTest(_KeyedTupleTest, fixtures.TestBase):
131    def _fixture(self, values, labels):
132        return util.lightweight_named_tuple('n', labels)(values)
133
134
135class WeakSequenceTest(fixtures.TestBase):
136    @testing.requires.predictable_gc
137    def test_cleanout_elements(self):
138        class Foo(object):
139            pass
140        f1, f2, f3 = Foo(), Foo(), Foo()
141        w = WeakSequence([f1, f2, f3])
142        eq_(len(w), 3)
143        eq_(len(w._storage), 3)
144        del f2
145        gc_collect()
146        eq_(len(w), 2)
147        eq_(len(w._storage), 2)
148
149    @testing.requires.predictable_gc
150    def test_cleanout_appended(self):
151        class Foo(object):
152            pass
153        f1, f2, f3 = Foo(), Foo(), Foo()
154        w = WeakSequence()
155        w.append(f1)
156        w.append(f2)
157        w.append(f3)
158        eq_(len(w), 3)
159        eq_(len(w._storage), 3)
160        del f2
161        gc_collect()
162        eq_(len(w), 2)
163        eq_(len(w._storage), 2)
164
165
166class OrderedDictTest(fixtures.TestBase):
167
168    def test_odict(self):
169        o = util.OrderedDict()
170        o['a'] = 1
171        o['b'] = 2
172        o['snack'] = 'attack'
173        o['c'] = 3
174
175        eq_(list(o.keys()), ['a', 'b', 'snack', 'c'])
176        eq_(list(o.values()), [1, 2, 'attack', 3])
177
178        o.pop('snack')
179        eq_(list(o.keys()), ['a', 'b', 'c'])
180        eq_(list(o.values()), [1, 2, 3])
181
182        try:
183            o.pop('eep')
184            assert False
185        except KeyError:
186            pass
187
188        eq_(o.pop('eep', 'woot'), 'woot')
189
190        try:
191            o.pop('whiff', 'bang', 'pow')
192            assert False
193        except TypeError:
194            pass
195
196        eq_(list(o.keys()), ['a', 'b', 'c'])
197        eq_(list(o.values()), [1, 2, 3])
198
199        o2 = util.OrderedDict(d=4)
200        o2['e'] = 5
201
202        eq_(list(o2.keys()), ['d', 'e'])
203        eq_(list(o2.values()), [4, 5])
204
205        o.update(o2)
206        eq_(list(o.keys()), ['a', 'b', 'c', 'd', 'e'])
207        eq_(list(o.values()), [1, 2, 3, 4, 5])
208
209        o.setdefault('c', 'zzz')
210        o.setdefault('f', 6)
211        eq_(list(o.keys()), ['a', 'b', 'c', 'd', 'e', 'f'])
212        eq_(list(o.values()), [1, 2, 3, 4, 5, 6])
213
214    def test_odict_constructor(self):
215        o = util.OrderedDict([('name', 'jbe'), ('fullname', 'jonathan'
216                             ), ('password', '')])
217        eq_(list(o.keys()), ['name', 'fullname', 'password'])
218
219    def test_odict_copy(self):
220        o = util.OrderedDict()
221        o["zzz"] = 1
222        o["aaa"] = 2
223        eq_(list(o.keys()), ['zzz', 'aaa'])
224
225        o2 = o.copy()
226        eq_(list(o2.keys()), list(o.keys()))
227
228        o3 = copy.copy(o)
229        eq_(list(o3.keys()), list(o.keys()))
230
231
232class OrderedSetTest(fixtures.TestBase):
233
234    def test_mutators_against_iter(self):
235        # testing a set modified against an iterator
236        o = util.OrderedSet([3, 2, 4, 5])
237
238        eq_(o.difference(iter([3, 4])), util.OrderedSet([2, 5]))
239        eq_(o.intersection(iter([3, 4, 6])), util.OrderedSet([3, 4]))
240        eq_(o.union(iter([3, 4, 6])), util.OrderedSet([2, 3, 4, 5, 6]))
241
242
243class FrozenDictTest(fixtures.TestBase):
244
245    def test_serialize(self):
246        d = util.immutabledict({1: 2, 3: 4})
247        for loads, dumps in picklers():
248            print(loads(dumps(d)))
249
250
251class MemoizedAttrTest(fixtures.TestBase):
252
253    def test_memoized_property(self):
254        val = [20]
255
256        class Foo(object):
257            @util.memoized_property
258            def bar(self):
259                v = val[0]
260                val[0] += 1
261                return v
262
263        ne_(Foo.bar, None)
264        f1 = Foo()
265        assert 'bar' not in f1.__dict__
266        eq_(f1.bar, 20)
267        eq_(f1.bar, 20)
268        eq_(val[0], 21)
269        eq_(f1.__dict__['bar'], 20)
270
271    def test_memoized_instancemethod(self):
272        val = [20]
273
274        class Foo(object):
275            @util.memoized_instancemethod
276            def bar(self):
277                v = val[0]
278                val[0] += 1
279                return v
280
281        assert inspect.ismethod(Foo().bar)
282        ne_(Foo.bar, None)
283        f1 = Foo()
284        assert 'bar' not in f1.__dict__
285        eq_(f1.bar(), 20)
286        eq_(f1.bar(), 20)
287        eq_(val[0], 21)
288
289    def test_memoized_slots(self):
290        canary = mock.Mock()
291
292        class Foob(util.MemoizedSlots):
293            __slots__ = ('foo_bar', 'gogo')
294
295            def _memoized_method_gogo(self):
296                canary.method()
297                return "gogo"
298
299            def _memoized_attr_foo_bar(self):
300                canary.attr()
301                return "foobar"
302
303        f1 = Foob()
304        assert_raises(AttributeError, setattr, f1, "bar", "bat")
305
306        eq_(f1.foo_bar, "foobar")
307
308        eq_(f1.foo_bar, "foobar")
309
310        eq_(f1.gogo(), "gogo")
311
312        eq_(f1.gogo(), "gogo")
313
314        eq_(canary.mock_calls, [mock.call.attr(), mock.call.method()])
315
316
317class ToListTest(fixtures.TestBase):
318    def test_from_string(self):
319        eq_(
320            util.to_list("xyz"),
321            ["xyz"]
322        )
323
324    def test_from_set(self):
325        spec = util.to_list(set([1, 2, 3]))
326        assert isinstance(spec, list)
327        eq_(
328            sorted(spec),
329            [1, 2, 3]
330        )
331
332    def test_from_dict(self):
333        spec = util.to_list({1: "a", 2: "b", 3: "c"})
334        assert isinstance(spec, list)
335        eq_(
336            sorted(spec),
337            [1, 2, 3]
338        )
339
340    def test_from_tuple(self):
341        eq_(
342            util.to_list((1, 2, 3)),
343            [1, 2, 3]
344        )
345
346    def test_from_bytes(self):
347
348        eq_(
349            util.to_list(compat.b('abc')),
350            [compat.b('abc')]
351        )
352
353        eq_(
354            util.to_list([
355                compat.b('abc'), compat.b('def')]),
356            [compat.b('abc'), compat.b('def')]
357        )
358
359
360class ColumnCollectionTest(fixtures.TestBase):
361
362    def test_in(self):
363        cc = sql.ColumnCollection()
364        cc.add(sql.column('col1'))
365        cc.add(sql.column('col2'))
366        cc.add(sql.column('col3'))
367        assert 'col1' in cc
368        assert 'col2' in cc
369
370        try:
371            cc['col1'] in cc
372            assert False
373        except exc.ArgumentError as e:
374            eq_(str(e), "__contains__ requires a string argument")
375
376    def test_compare(self):
377        cc1 = sql.ColumnCollection()
378        cc2 = sql.ColumnCollection()
379        cc3 = sql.ColumnCollection()
380        c1 = sql.column('col1')
381        c2 = c1.label('col2')
382        c3 = sql.column('col3')
383        cc1.add(c1)
384        cc2.add(c2)
385        cc3.add(c3)
386        assert (cc1 == cc2).compare(c1 == c2)
387        assert not (cc1 == cc3).compare(c2 == c3)
388
389    @testing.emits_warning("Column ")
390    def test_dupes_add(self):
391        cc = sql.ColumnCollection()
392
393        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2')
394
395        cc.add(c1)
396        cc.add(c2a)
397        cc.add(c3)
398        cc.add(c2b)
399
400        eq_(cc._all_columns, [c1, c2a, c3, c2b])
401
402        # for iter, c2a is replaced by c2b, ordering
403        # is maintained in that way.  ideally, iter would be
404        # the same as the "_all_columns" collection.
405        eq_(list(cc), [c1, c2b, c3])
406
407        assert cc.contains_column(c2a)
408        assert cc.contains_column(c2b)
409
410        ci = cc.as_immutable()
411        eq_(ci._all_columns, [c1, c2a, c3, c2b])
412        eq_(list(ci), [c1, c2b, c3])
413
414    def test_replace(self):
415        cc = sql.ColumnCollection()
416
417        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2')
418
419        cc.add(c1)
420        cc.add(c2a)
421        cc.add(c3)
422
423        cc.replace(c2b)
424
425        eq_(cc._all_columns, [c1, c2b, c3])
426        eq_(list(cc), [c1, c2b, c3])
427
428        assert not cc.contains_column(c2a)
429        assert cc.contains_column(c2b)
430
431        ci = cc.as_immutable()
432        eq_(ci._all_columns, [c1, c2b, c3])
433        eq_(list(ci), [c1, c2b, c3])
434
435    def test_replace_key_matches(self):
436        cc = sql.ColumnCollection()
437
438        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('X')
439        c2b.key = 'c2'
440
441        cc.add(c1)
442        cc.add(c2a)
443        cc.add(c3)
444
445        cc.replace(c2b)
446
447        assert not cc.contains_column(c2a)
448        assert cc.contains_column(c2b)
449
450        eq_(cc._all_columns, [c1, c2b, c3])
451        eq_(list(cc), [c1, c2b, c3])
452
453        ci = cc.as_immutable()
454        eq_(ci._all_columns, [c1, c2b, c3])
455        eq_(list(ci), [c1, c2b, c3])
456
457    def test_replace_name_matches(self):
458        cc = sql.ColumnCollection()
459
460        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2')
461        c2b.key = 'X'
462
463        cc.add(c1)
464        cc.add(c2a)
465        cc.add(c3)
466
467        cc.replace(c2b)
468
469        assert not cc.contains_column(c2a)
470        assert cc.contains_column(c2b)
471
472        eq_(cc._all_columns, [c1, c2b, c3])
473        eq_(list(cc), [c1, c3, c2b])
474
475        ci = cc.as_immutable()
476        eq_(ci._all_columns, [c1, c2b, c3])
477        eq_(list(ci), [c1, c3, c2b])
478
479    def test_replace_no_match(self):
480        cc = sql.ColumnCollection()
481
482        c1, c2, c3, c4 = column('c1'), column('c2'), column('c3'), column('c4')
483        c4.key = 'X'
484
485        cc.add(c1)
486        cc.add(c2)
487        cc.add(c3)
488
489        cc.replace(c4)
490
491        assert cc.contains_column(c2)
492        assert cc.contains_column(c4)
493
494        eq_(cc._all_columns, [c1, c2, c3, c4])
495        eq_(list(cc), [c1, c2, c3, c4])
496
497        ci = cc.as_immutable()
498        eq_(ci._all_columns, [c1, c2, c3, c4])
499        eq_(list(ci), [c1, c2, c3, c4])
500
501    def test_dupes_extend(self):
502        cc = sql.ColumnCollection()
503
504        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2')
505
506        cc.add(c1)
507        cc.add(c2a)
508
509        cc.extend([c3, c2b])
510
511        eq_(cc._all_columns, [c1, c2a, c3, c2b])
512
513        # for iter, c2a is replaced by c2b, ordering
514        # is maintained in that way.  ideally, iter would be
515        # the same as the "_all_columns" collection.
516        eq_(list(cc), [c1, c2b, c3])
517
518        assert cc.contains_column(c2a)
519        assert cc.contains_column(c2b)
520
521        ci = cc.as_immutable()
522        eq_(ci._all_columns, [c1, c2a, c3, c2b])
523        eq_(list(ci), [c1, c2b, c3])
524
525    def test_dupes_update(self):
526        cc = sql.ColumnCollection()
527
528        c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2')
529
530        cc.add(c1)
531        cc.add(c2a)
532
533        cc.update([(c3.key, c3), (c2b.key, c2b)])
534
535        eq_(cc._all_columns, [c1, c2a, c3, c2b])
536
537        assert cc.contains_column(c2a)
538        assert cc.contains_column(c2b)
539
540        # for iter, c2a is replaced by c2b, ordering
541        # is maintained in that way.  ideally, iter would be
542        # the same as the "_all_columns" collection.
543        eq_(list(cc), [c1, c2b, c3])
544
545    def test_extend_existing(self):
546        cc = sql.ColumnCollection()
547
548        c1, c2, c3, c4, c5 = column('c1'), column('c2'), column('c3'), column('c4'), column('c5')
549
550        cc.extend([c1, c2])
551        eq_(cc._all_columns, [c1, c2])
552
553        cc.extend([c3])
554        eq_(cc._all_columns, [c1, c2, c3])
555        cc.extend([c4, c2, c5])
556
557        eq_(cc._all_columns, [c1, c2, c3, c4, c5])
558
559    def test_update_existing(self):
560        cc = sql.ColumnCollection()
561
562        c1, c2, c3, c4, c5 = column('c1'), column('c2'), column('c3'), column('c4'), column('c5')
563
564        cc.update([('c1', c1), ('c2', c2)])
565        eq_(cc._all_columns, [c1, c2])
566
567        cc.update([('c3', c3)])
568        eq_(cc._all_columns, [c1, c2, c3])
569        cc.update([('c4', c4), ('c2', c2), ('c5', c5)])
570
571        eq_(cc._all_columns, [c1, c2, c3, c4, c5])
572
573
574
575class LRUTest(fixtures.TestBase):
576
577    def test_lru(self):
578        class item(object):
579            def __init__(self, id):
580                self.id = id
581
582            def __str__(self):
583                return "item id %d" % self.id
584
585        l = util.LRUCache(10, threshold=.2)
586
587        for id in range(1, 20):
588            l[id] = item(id)
589
590        # first couple of items should be gone
591        assert 1 not in l
592        assert 2 not in l
593
594        # next batch over the threshold of 10 should be present
595        for id_ in range(11, 20):
596            assert id_ in l
597
598        l[12]
599        l[15]
600        l[23] = item(23)
601        l[24] = item(24)
602        l[25] = item(25)
603        l[26] = item(26)
604        l[27] = item(27)
605
606        assert 11 not in l
607        assert 13 not in l
608
609        for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15):
610            assert id_ in l
611
612        i1 = l[25]
613        i2 = item(25)
614        l[25] = i2
615        assert 25 in l
616        assert l[25] is i2
617
618
619class ImmutableSubclass(str):
620    pass
621
622
623class FlattenIteratorTest(fixtures.TestBase):
624
625    def test_flatten(self):
626        assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7,
627                    8])) == [1, 2, 3, 4, 5, 6, 7, 8]
628
629    def test_str_with_iter(self):
630        """ensure that a str object with an __iter__ method (like in
631        PyPy) is not interpreted as an iterable.
632
633        """
634
635        class IterString(str):
636            def __iter__(self):
637                return iter(self + '')
638
639        assert list(util.flatten_iterator([IterString('asdf'),
640                    [IterString('x'), IterString('y')]])) == ['asdf',
641                'x', 'y']
642
643
644class HashOverride(object):
645
646    def __init__(self, value=None):
647        self.value = value
648
649    def __hash__(self):
650        return hash(self.value)
651
652
653class EqOverride(object):
654
655    def __init__(self, value=None):
656        self.value = value
657    __hash__ = object.__hash__
658
659    def __eq__(self, other):
660        if isinstance(other, EqOverride):
661            return self.value == other.value
662        else:
663            return False
664
665    def __ne__(self, other):
666        if isinstance(other, EqOverride):
667            return self.value != other.value
668        else:
669            return True
670
671
672class HashEqOverride(object):
673
674    def __init__(self, value=None):
675        self.value = value
676
677    def __hash__(self):
678        return hash(self.value)
679
680    def __eq__(self, other):
681        if isinstance(other, EqOverride):
682            return self.value == other.value
683        else:
684            return False
685
686    def __ne__(self, other):
687        if isinstance(other, EqOverride):
688            return self.value != other.value
689        else:
690            return True
691
692
693class IdentitySetTest(fixtures.TestBase):
694
695    def assert_eq(self, identityset, expected_iterable):
696        expected = sorted([id(o) for o in expected_iterable])
697        found = sorted([id(o) for o in identityset])
698        eq_(found, expected)
699
700    def test_init(self):
701        ids = util.IdentitySet([1, 2, 3, 2, 1])
702        self.assert_eq(ids, [1, 2, 3])
703
704        ids = util.IdentitySet(ids)
705        self.assert_eq(ids, [1, 2, 3])
706
707        ids = util.IdentitySet()
708        self.assert_eq(ids, [])
709
710        ids = util.IdentitySet([])
711        self.assert_eq(ids, [])
712
713        ids = util.IdentitySet(ids)
714        self.assert_eq(ids, [])
715
716    def test_add(self):
717        for type_ in (object, ImmutableSubclass):
718            data = [type_(), type_()]
719            ids = util.IdentitySet()
720            for i in list(range(2)) + list(range(2)):
721                ids.add(data[i])
722            self.assert_eq(ids, data)
723
724        for type_ in (EqOverride, HashOverride, HashEqOverride):
725            data = [type_(1), type_(1), type_(2)]
726            ids = util.IdentitySet()
727            for i in list(range(3)) + list(range(3)):
728                ids.add(data[i])
729            self.assert_eq(ids, data)
730
731    def test_dunder_sub2(self):
732        IdentitySet = util.IdentitySet
733        o1, o2, o3 = object(), object(), object()
734        ids1 = IdentitySet([o1])
735        ids2 = IdentitySet([o1, o2, o3])
736        eq_(
737            ids2 - ids1,
738            IdentitySet([o2, o3])
739        )
740
741        ids2 -= ids1
742        eq_(ids2, IdentitySet([o2, o3]))
743
744    def test_dunder_eq(self):
745        _, _, twin1, twin2, unique1, unique2 = self._create_sets()
746
747        # basic set math
748        eq_(twin1 == twin2, True)
749        eq_(unique1 == unique2, False)
750
751        # not an IdentitySet
752        not_an_identity_set = object()
753        eq_(unique1 == not_an_identity_set, False)
754
755    def test_dunder_ne(self):
756        _, _, twin1, twin2, unique1, unique2 = self._create_sets()
757
758        # basic set math
759        eq_(twin1 != twin2, False)
760        eq_(unique1 != unique2, True)
761
762        # not an IdentitySet
763        not_an_identity_set = object()
764        eq_(unique1 != not_an_identity_set, True)
765
766    def test_dunder_le(self):
767        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
768
769        # basic set math
770        eq_(sub_ <= super_, True)
771        eq_(super_ <= sub_, False)
772
773        # the same sets
774        eq_(twin1 <= twin2, True)
775        eq_(twin2 <= twin1, True)
776
777        # totally different sets
778        eq_(unique1 <= unique2, False)
779        eq_(unique2 <= unique1, False)
780
781        # not an IdentitySet
782        def should_raise():
783            not_an_identity_set = object()
784            return unique1 <= not_an_identity_set
785        self._assert_unorderable_types(should_raise)
786
787    def test_dunder_lt(self):
788        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
789
790        # basic set math
791        eq_(sub_ < super_, True)
792        eq_(super_ < sub_, False)
793
794        # the same sets
795        eq_(twin1 < twin2, False)
796        eq_(twin2 < twin1, False)
797
798        # totally different sets
799        eq_(unique1 < unique2, False)
800        eq_(unique2 < unique1, False)
801
802        # not an IdentitySet
803        def should_raise():
804            not_an_identity_set = object()
805            return unique1 < not_an_identity_set
806        self._assert_unorderable_types(should_raise)
807
808    def test_dunder_ge(self):
809        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
810
811        # basic set math
812        eq_(sub_ >= super_, False)
813        eq_(super_ >= sub_, True)
814
815        # the same sets
816        eq_(twin1 >= twin2, True)
817        eq_(twin2 >= twin1, True)
818
819        # totally different sets
820        eq_(unique1 >= unique2, False)
821        eq_(unique2 >= unique1, False)
822
823        # not an IdentitySet
824        def should_raise():
825            not_an_identity_set = object()
826            return unique1 >= not_an_identity_set
827        self._assert_unorderable_types(should_raise)
828
829    def test_dunder_gt(self):
830        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
831
832        # basic set math
833        eq_(sub_ > super_, False)
834        eq_(super_ > sub_, True)
835
836        # the same sets
837        eq_(twin1 > twin2, False)
838        eq_(twin2 > twin1, False)
839
840        # totally different sets
841        eq_(unique1 > unique2, False)
842        eq_(unique2 > unique1, False)
843
844        # not an IdentitySet
845        def should_raise():
846            not_an_identity_set = object()
847            return unique1 > not_an_identity_set
848        self._assert_unorderable_types(should_raise)
849
850    def test_issubset(self):
851        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
852
853        # basic set math
854        eq_(sub_.issubset(super_), True)
855        eq_(super_.issubset(sub_), False)
856
857        # the same sets
858        eq_(twin1.issubset(twin2), True)
859        eq_(twin2.issubset(twin1), True)
860
861        # totally different sets
862        eq_(unique1.issubset(unique2), False)
863        eq_(unique2.issubset(unique1), False)
864
865        # not an IdentitySet
866        not_an_identity_set = object()
867        assert_raises(TypeError, unique1.issubset, not_an_identity_set)
868
869    def test_issuperset(self):
870        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
871
872        # basic set math
873        eq_(sub_.issuperset(super_), False)
874        eq_(super_.issuperset(sub_), True)
875
876        # the same sets
877        eq_(twin1.issuperset(twin2), True)
878        eq_(twin2.issuperset(twin1), True)
879
880        # totally different sets
881        eq_(unique1.issuperset(unique2), False)
882        eq_(unique2.issuperset(unique1), False)
883
884        # not an IdentitySet
885        not_an_identity_set = object()
886        assert_raises(TypeError, unique1.issuperset, not_an_identity_set)
887
888    def test_union(self):
889        super_, sub_, twin1, twin2, _, _ = self._create_sets()
890
891        # basic set math
892        eq_(sub_.union(super_), super_)
893        eq_(super_.union(sub_), super_)
894
895        # the same sets
896        eq_(twin1.union(twin2), twin1)
897        eq_(twin2.union(twin1), twin1)
898
899        # empty sets
900        empty = util.IdentitySet([])
901        eq_(empty.union(empty), empty)
902
903        # totally different sets
904        unique1 = util.IdentitySet([1])
905        unique2 = util.IdentitySet([2])
906        eq_(unique1.union(unique2), util.IdentitySet([1, 2]))
907
908        # not an IdentitySet
909        not_an_identity_set = object()
910        assert_raises(TypeError, unique1.union, not_an_identity_set)
911
912    def test_dunder_or(self):
913        super_, sub_, twin1, twin2, _, _ = self._create_sets()
914
915        # basic set math
916        eq_(sub_ | super_, super_)
917        eq_(super_ | sub_, super_)
918
919        # the same sets
920        eq_(twin1 | twin2, twin1)
921        eq_(twin2 | twin1, twin1)
922
923        # empty sets
924        empty = util.IdentitySet([])
925        eq_(empty | empty, empty)
926
927        # totally different sets
928        unique1 = util.IdentitySet([1])
929        unique2 = util.IdentitySet([2])
930        eq_(unique1 | unique2, util.IdentitySet([1, 2]))
931
932        # not an IdentitySet
933        def should_raise():
934            not_an_identity_set = object()
935            return unique1 | not_an_identity_set
936        assert_raises(TypeError, should_raise)
937
938    def test_update(self):
939        pass  # TODO
940
941    def test_dunder_ior(self):
942        super_, sub_, _, _, _, _ = self._create_sets()
943
944        # basic set math
945        sub_ |= super_
946        eq_(sub_, super_)
947        super_ |= sub_
948        eq_(super_, super_)
949
950        # totally different sets
951        unique1 = util.IdentitySet([1])
952        unique2 = util.IdentitySet([2])
953        unique1 |= unique2
954        eq_(unique1, util.IdentitySet([1, 2]))
955        eq_(unique2, util.IdentitySet([2]))
956
957        # not an IdentitySet
958        def should_raise():
959            unique = util.IdentitySet([1])
960            not_an_identity_set = object()
961            unique |= not_an_identity_set
962        assert_raises(TypeError, should_raise)
963
964    def test_difference(self):
965        _, _, twin1, twin2, _, _ = self._create_sets()
966
967        # basic set math
968        set1 = util.IdentitySet([1, 2, 3])
969        set2 = util.IdentitySet([2, 3, 4])
970        eq_(set1.difference(set2), util.IdentitySet([1]))
971        eq_(set2.difference(set1), util.IdentitySet([4]))
972
973        # empty sets
974        empty = util.IdentitySet([])
975        eq_(empty.difference(empty), empty)
976
977        # the same sets
978        eq_(twin1.difference(twin2), empty)
979        eq_(twin2.difference(twin1), empty)
980
981        # totally different sets
982        unique1 = util.IdentitySet([1])
983        unique2 = util.IdentitySet([2])
984        eq_(unique1.difference(unique2), util.IdentitySet([1]))
985        eq_(unique2.difference(unique1), util.IdentitySet([2]))
986
987        # not an IdentitySet
988        not_an_identity_set = object()
989        assert_raises(TypeError, unique1.difference, not_an_identity_set)
990
991    def test_dunder_sub(self):
992        _, _, twin1, twin2, _, _ = self._create_sets()
993
994        # basic set math
995        set1 = util.IdentitySet([1, 2, 3])
996        set2 = util.IdentitySet([2, 3, 4])
997        eq_(set1 - set2, util.IdentitySet([1]))
998        eq_(set2 - set1, util.IdentitySet([4]))
999
1000        # empty sets
1001        empty = util.IdentitySet([])
1002        eq_(empty - empty, empty)
1003
1004        # the same sets
1005        eq_(twin1 - twin2, empty)
1006        eq_(twin2 - twin1, empty)
1007
1008        # totally different sets
1009        unique1 = util.IdentitySet([1])
1010        unique2 = util.IdentitySet([2])
1011        eq_(unique1 - unique2, util.IdentitySet([1]))
1012        eq_(unique2 - unique1, util.IdentitySet([2]))
1013
1014        # not an IdentitySet
1015        def should_raise():
1016            not_an_identity_set = object()
1017            unique1 - not_an_identity_set
1018        assert_raises(TypeError, should_raise)
1019
1020    def test_difference_update(self):
1021        pass  # TODO
1022
1023    def test_dunder_isub(self):
1024        pass  # TODO
1025
1026    def test_intersection(self):
1027        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1028
1029        # basic set math
1030        eq_(sub_.intersection(super_), sub_)
1031        eq_(super_.intersection(sub_), sub_)
1032
1033        # the same sets
1034        eq_(twin1.intersection(twin2), twin1)
1035        eq_(twin2.intersection(twin1), twin1)
1036
1037        # empty sets
1038        empty = util.IdentitySet([])
1039        eq_(empty.intersection(empty), empty)
1040
1041        # totally different sets
1042        eq_(unique1.intersection(unique2), empty)
1043
1044        # not an IdentitySet
1045        not_an_identity_set = object()
1046        assert_raises(TypeError, unique1.intersection, not_an_identity_set)
1047
1048    def test_dunder_and(self):
1049        super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
1050
1051        # basic set math
1052        eq_(sub_ & super_, sub_)
1053        eq_(super_ & sub_, sub_)
1054
1055        # the same sets
1056        eq_(twin1 & twin2, twin1)
1057        eq_(twin2 & twin1, twin1)
1058
1059        # empty sets
1060        empty = util.IdentitySet([])
1061        eq_(empty & empty, empty)
1062
1063        # totally different sets
1064        eq_(unique1 & unique2, empty)
1065
1066        # not an IdentitySet
1067        def should_raise():
1068            not_an_identity_set = object()
1069            return unique1 & not_an_identity_set
1070        assert_raises(TypeError, should_raise)
1071
1072    def test_intersection_update(self):
1073        pass  # TODO
1074
1075    def test_dunder_iand(self):
1076        pass  # TODO
1077
1078    def test_symmetric_difference(self):
1079        _, _, twin1, twin2, _, _ = self._create_sets()
1080
1081        # basic set math
1082        set1 = util.IdentitySet([1, 2, 3])
1083        set2 = util.IdentitySet([2, 3, 4])
1084        eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4]))
1085        eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4]))
1086
1087        # empty sets
1088        empty = util.IdentitySet([])
1089        eq_(empty.symmetric_difference(empty), empty)
1090
1091        # the same sets
1092        eq_(twin1.symmetric_difference(twin2), empty)
1093        eq_(twin2.symmetric_difference(twin1), empty)
1094
1095        # totally different sets
1096        unique1 = util.IdentitySet([1])
1097        unique2 = util.IdentitySet([2])
1098        eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2]))
1099        eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2]))
1100
1101        # not an IdentitySet
1102        not_an_identity_set = object()
1103        assert_raises(
1104            TypeError, unique1.symmetric_difference, not_an_identity_set)
1105
1106    def test_dunder_xor(self):
1107        _, _, twin1, twin2, _, _ = self._create_sets()
1108
1109        # basic set math
1110        set1 = util.IdentitySet([1, 2, 3])
1111        set2 = util.IdentitySet([2, 3, 4])
1112        eq_(set1 ^ set2, util.IdentitySet([1, 4]))
1113        eq_(set2 ^ set1, util.IdentitySet([1, 4]))
1114
1115        # empty sets
1116        empty = util.IdentitySet([])
1117        eq_(empty ^ empty, empty)
1118
1119        # the same sets
1120        eq_(twin1 ^ twin2, empty)
1121        eq_(twin2 ^ twin1, empty)
1122
1123        # totally different sets
1124        unique1 = util.IdentitySet([1])
1125        unique2 = util.IdentitySet([2])
1126        eq_(unique1 ^ unique2, util.IdentitySet([1, 2]))
1127        eq_(unique2 ^ unique1, util.IdentitySet([1, 2]))
1128
1129        # not an IdentitySet
1130        def should_raise():
1131            not_an_identity_set = object()
1132            return unique1 ^ not_an_identity_set
1133        assert_raises(TypeError, should_raise)
1134
1135    def test_symmetric_difference_update(self):
1136        pass  # TODO
1137
1138    def _create_sets(self):
1139        o1, o2, o3, o4, o5 = object(), object(), object(), object(), object()
1140        super_ = util.IdentitySet([o1, o2, o3])
1141        sub_ = util.IdentitySet([o2])
1142        twin1 = util.IdentitySet([o3])
1143        twin2 = util.IdentitySet([o3])
1144        unique1 = util.IdentitySet([o4])
1145        unique2 = util.IdentitySet([o5])
1146        return super_, sub_, twin1, twin2, unique1, unique2
1147
1148    def _assert_unorderable_types(self, callable_):
1149        if util.py36:
1150            assert_raises_message(
1151                TypeError, 'not supported between instances of', callable_)
1152        elif util.py3k:
1153            assert_raises_message(
1154                TypeError, 'unorderable types', callable_)
1155        else:
1156            assert_raises_message(
1157                TypeError, 'cannot compare sets using cmp()', callable_)
1158
1159    def test_basic_sanity(self):
1160        IdentitySet = util.IdentitySet
1161
1162        o1, o2, o3 = object(), object(), object()
1163        ids = IdentitySet([o1])
1164        ids.discard(o1)
1165        ids.discard(o1)
1166        ids.add(o1)
1167        ids.remove(o1)
1168        assert_raises(KeyError, ids.remove, o1)
1169
1170        eq_(ids.copy(), ids)
1171
1172        # explicit __eq__ and __ne__ tests
1173        assert ids != None
1174        assert not(ids == None)
1175
1176        ne_(ids, IdentitySet([o1, o2, o3]))
1177        ids.clear()
1178        assert o1 not in ids
1179        ids.add(o2)
1180        assert o2 in ids
1181        eq_(ids.pop(), o2)
1182        ids.add(o1)
1183        eq_(len(ids), 1)
1184
1185        isuper = IdentitySet([o1, o2])
1186        assert ids < isuper
1187        assert ids.issubset(isuper)
1188        assert isuper.issuperset(ids)
1189        assert isuper > ids
1190
1191        eq_(ids.union(isuper), isuper)
1192        eq_(ids | isuper, isuper)
1193        eq_(isuper - ids, IdentitySet([o2]))
1194        eq_(isuper.difference(ids), IdentitySet([o2]))
1195        eq_(ids.intersection(isuper), IdentitySet([o1]))
1196        eq_(ids & isuper, IdentitySet([o1]))
1197        eq_(ids.symmetric_difference(isuper), IdentitySet([o2]))
1198        eq_(ids ^ isuper, IdentitySet([o2]))
1199
1200        ids.update(isuper)
1201        ids |= isuper
1202        ids.difference_update(isuper)
1203        ids -= isuper
1204        ids.intersection_update(isuper)
1205        ids &= isuper
1206        ids.symmetric_difference_update(isuper)
1207        ids ^= isuper
1208
1209        ids.update('foobar')
1210        try:
1211            ids |= 'foobar'
1212            assert False
1213        except TypeError:
1214            assert True
1215
1216        try:
1217            s = set([o1, o2])
1218            s |= ids
1219            assert False
1220        except TypeError:
1221            assert True
1222
1223        assert_raises(TypeError, util.cmp, ids)
1224        assert_raises(TypeError, hash, ids)
1225
1226
1227class OrderedIdentitySetTest(fixtures.TestBase):
1228
1229    def assert_eq(self, identityset, expected_iterable):
1230        expected = [id(o) for o in expected_iterable]
1231        found = [id(o) for o in identityset]
1232        eq_(found, expected)
1233
1234    def test_add(self):
1235        elem = object
1236        s = util.OrderedIdentitySet()
1237        s.add(elem())
1238        s.add(elem())
1239
1240    def test_intersection(self):
1241        elem = object
1242        eq_ = self.assert_eq
1243
1244        a, b, c, d, e, f, g = \
1245                elem(), elem(), elem(), elem(), elem(), elem(), elem()
1246
1247        s1 = util.OrderedIdentitySet([a, b, c])
1248        s2 = util.OrderedIdentitySet([d, e, f])
1249        s3 = util.OrderedIdentitySet([a, d, f, g])
1250        eq_(s1.intersection(s2), [])
1251        eq_(s1.intersection(s3), [a])
1252        eq_(s1.union(s2).intersection(s3), [a, d, f])
1253
1254
1255class DictlikeIteritemsTest(fixtures.TestBase):
1256    baseline = set([('a', 1), ('b', 2), ('c', 3)])
1257
1258    def _ok(self, instance):
1259        iterator = util.dictlike_iteritems(instance)
1260        eq_(set(iterator), self.baseline)
1261
1262    def _notok(self, instance):
1263        assert_raises(TypeError,
1264                          util.dictlike_iteritems,
1265                          instance)
1266
1267    def test_dict(self):
1268        d = dict(a=1, b=2, c=3)
1269        self._ok(d)
1270
1271    def test_subdict(self):
1272        class subdict(dict):
1273            pass
1274        d = subdict(a=1, b=2, c=3)
1275        self._ok(d)
1276
1277    if util.py2k:
1278        def test_UserDict(self):
1279            import UserDict
1280            d = UserDict.UserDict(a=1, b=2, c=3)
1281            self._ok(d)
1282
1283    def test_object(self):
1284        self._notok(object())
1285
1286    if util.py2k:
1287        def test_duck_1(self):
1288            class duck1(object):
1289                def iteritems(duck):
1290                    return iter(self.baseline)
1291            self._ok(duck1())
1292
1293    def test_duck_2(self):
1294        class duck2(object):
1295            def items(duck):
1296                return list(self.baseline)
1297        self._ok(duck2())
1298
1299    if util.py2k:
1300        def test_duck_3(self):
1301            class duck3(object):
1302                def iterkeys(duck):
1303                    return iter(['a', 'b', 'c'])
1304
1305                def __getitem__(duck, key):
1306                    return dict(a=1, b=2, c=3).get(key)
1307            self._ok(duck3())
1308
1309    def test_duck_4(self):
1310        class duck4(object):
1311            def iterkeys(duck):
1312                return iter(['a', 'b', 'c'])
1313        self._notok(duck4())
1314
1315    def test_duck_5(self):
1316        class duck5(object):
1317            def keys(duck):
1318                return ['a', 'b', 'c']
1319
1320            def get(duck, key):
1321                return dict(a=1, b=2, c=3).get(key)
1322        self._ok(duck5())
1323
1324    def test_duck_6(self):
1325        class duck6(object):
1326            def keys(duck):
1327                return ['a', 'b', 'c']
1328        self._notok(duck6())
1329
1330
1331class DuckTypeCollectionTest(fixtures.TestBase):
1332
1333    def test_sets(self):
1334        class SetLike(object):
1335            def add(self):
1336                pass
1337
1338        class ForcedSet(list):
1339            __emulates__ = set
1340
1341        for type_ in (set,
1342                      SetLike,
1343                      ForcedSet):
1344            eq_(util.duck_type_collection(type_), set)
1345            instance = type_()
1346            eq_(util.duck_type_collection(instance), set)
1347
1348        for type_ in (frozenset, ):
1349            is_(util.duck_type_collection(type_), None)
1350            instance = type_()
1351            is_(util.duck_type_collection(instance), None)
1352
1353
1354class PublicFactoryTest(fixtures.TestBase):
1355
1356    def _fixture(self):
1357        class Thingy(object):
1358            def __init__(self, value):
1359                "make a thingy"
1360                self.value = value
1361
1362            @classmethod
1363            def foobar(cls, x, y):
1364                "do the foobar"
1365                return Thingy(x + y)
1366
1367        return Thingy
1368
1369    def test_classmethod(self):
1370        Thingy = self._fixture()
1371        foob = langhelpers.public_factory(
1372            Thingy.foobar, ".sql.elements.foob")
1373        eq_(foob(3, 4).value, 7)
1374        eq_(foob(x=3, y=4).value, 7)
1375        eq_(foob.__doc__, "do the foobar")
1376        eq_(foob.__module__, "sqlalchemy.sql.elements")
1377        assert Thingy.foobar.__doc__.startswith("This function is mirrored;")
1378
1379    def test_constructor(self):
1380        Thingy = self._fixture()
1381        foob = langhelpers.public_factory(
1382            Thingy, ".sql.elements.foob")
1383        eq_(foob(7).value, 7)
1384        eq_(foob(value=7).value, 7)
1385        eq_(foob.__doc__, "make a thingy")
1386        eq_(foob.__module__, "sqlalchemy.sql.elements")
1387        assert Thingy.__init__.__doc__.startswith(
1388            "Construct a new :class:`.Thingy` object.")
1389
1390
1391class ArgInspectionTest(fixtures.TestBase):
1392
1393    def test_get_cls_kwargs(self):
1394
1395        class A(object):
1396            def __init__(self, a):
1397                pass
1398
1399        class A1(A):
1400            def __init__(self, a1):
1401                pass
1402
1403        class A11(A1):
1404            def __init__(self, a11, **kw):
1405                pass
1406
1407        class B(object):
1408            def __init__(self, b, **kw):
1409                pass
1410
1411        class B1(B):
1412            def __init__(self, b1, **kw):
1413                pass
1414
1415        class B2(B):
1416            def __init__(self, b2):
1417                pass
1418
1419        class AB(A, B):
1420            def __init__(self, ab):
1421                pass
1422
1423        class BA(B, A):
1424            def __init__(self, ba, **kwargs):
1425                pass
1426
1427        class BA1(BA):
1428            pass
1429
1430        class CAB(A, B):
1431            pass
1432
1433        class CBA(B, A):
1434            pass
1435
1436        class CB1A1(B1, A1):
1437            pass
1438
1439        class CAB1(A, B1):
1440            pass
1441
1442        class CB1A(B1, A):
1443            pass
1444
1445        class CB2A(B2, A):
1446            pass
1447
1448        class D(object):
1449            pass
1450
1451        class BA2(B, A):
1452            pass
1453
1454        class A11B1(A11, B1):
1455            pass
1456
1457        def test(cls, *expected):
1458            eq_(set(util.get_cls_kwargs(cls)), set(expected))
1459
1460        test(A, 'a')
1461        test(A1, 'a1')
1462        test(A11, 'a11', 'a1')
1463        test(B, 'b')
1464        test(B1, 'b1', 'b')
1465        test(AB, 'ab')
1466        test(BA, 'ba', 'b', 'a')
1467        test(BA1, 'ba', 'b', 'a')
1468        test(CAB, 'a')
1469        test(CBA, 'b', 'a')
1470        test(CAB1, 'a')
1471        test(CB1A, 'b1', 'b', 'a')
1472        test(CB2A, 'b2')
1473        test(CB1A1, "a1", "b1", "b")
1474        test(D)
1475        test(BA2, "a", "b")
1476        test(A11B1, "a1", "a11", "b", "b1")
1477
1478    def test_get_func_kwargs(self):
1479
1480        def f1():
1481            pass
1482
1483        def f2(foo):
1484            pass
1485
1486        def f3(*foo):
1487            pass
1488
1489        def f4(**foo):
1490            pass
1491
1492        def test(fn, *expected):
1493            eq_(set(util.get_func_kwargs(fn)), set(expected))
1494
1495        test(f1)
1496        test(f2, 'foo')
1497        test(f3)
1498        test(f4)
1499
1500    def test_callable_argspec_fn(self):
1501        def foo(x, y, **kw):
1502            pass
1503        eq_(
1504            get_callable_argspec(foo),
1505            (['x', 'y'], None, 'kw', None)
1506        )
1507
1508    def test_callable_argspec_fn_no_self(self):
1509        def foo(x, y, **kw):
1510            pass
1511        eq_(
1512            get_callable_argspec(foo, no_self=True),
1513            (['x', 'y'], None, 'kw', None)
1514        )
1515
1516    def test_callable_argspec_fn_no_self_but_self(self):
1517        def foo(self, x, y, **kw):
1518            pass
1519        eq_(
1520            get_callable_argspec(foo, no_self=True),
1521            (['self', 'x', 'y'], None, 'kw', None)
1522        )
1523
1524    @fails_if(lambda: util.pypy,  "pypy returns plain *arg, **kw")
1525    def test_callable_argspec_py_builtin(self):
1526        import datetime
1527        assert_raises(
1528            TypeError,
1529            get_callable_argspec, datetime.datetime.now
1530        )
1531
1532    @fails_if(lambda: util.pypy,  "pypy returns plain *arg, **kw")
1533    def test_callable_argspec_obj_init(self):
1534        assert_raises(
1535            TypeError,
1536            get_callable_argspec, object
1537        )
1538
1539    def test_callable_argspec_method(self):
1540        class Foo(object):
1541            def foo(self, x, y, **kw):
1542                pass
1543        eq_(
1544            get_callable_argspec(Foo.foo),
1545            (['self', 'x', 'y'], None, 'kw', None)
1546        )
1547
1548    def test_callable_argspec_instance_method_no_self(self):
1549        class Foo(object):
1550            def foo(self, x, y, **kw):
1551                pass
1552        eq_(
1553            get_callable_argspec(Foo().foo, no_self=True),
1554            (['x', 'y'], None, 'kw', None)
1555        )
1556
1557    def test_callable_argspec_unbound_method_no_self(self):
1558        class Foo(object):
1559            def foo(self, x, y, **kw):
1560                pass
1561        eq_(
1562            get_callable_argspec(Foo.foo, no_self=True),
1563            (['self', 'x', 'y'], None, 'kw', None)
1564        )
1565
1566    def test_callable_argspec_init(self):
1567        class Foo(object):
1568            def __init__(self, x, y):
1569                pass
1570
1571        eq_(
1572            get_callable_argspec(Foo),
1573            (['self', 'x', 'y'], None, None, None)
1574        )
1575
1576    def test_callable_argspec_init_no_self(self):
1577        class Foo(object):
1578            def __init__(self, x, y):
1579                pass
1580
1581        eq_(
1582            get_callable_argspec(Foo, no_self=True),
1583            (['x', 'y'], None, None, None)
1584        )
1585
1586    def test_callable_argspec_call(self):
1587        class Foo(object):
1588            def __call__(self, x, y):
1589                pass
1590        eq_(
1591            get_callable_argspec(Foo()),
1592            (['self', 'x', 'y'], None, None, None)
1593        )
1594
1595    def test_callable_argspec_call_no_self(self):
1596        class Foo(object):
1597            def __call__(self, x, y):
1598                pass
1599        eq_(
1600            get_callable_argspec(Foo(), no_self=True),
1601            (['x', 'y'], None, None, None)
1602        )
1603
1604    @fails_if(lambda: util.pypy,  "pypy returns plain *arg, **kw")
1605    def test_callable_argspec_partial(self):
1606        from functools import partial
1607        def foo(x, y, z, **kw):
1608            pass
1609        bar = partial(foo, 5)
1610
1611        assert_raises(
1612            TypeError,
1613            get_callable_argspec, bar
1614        )
1615
1616class SymbolTest(fixtures.TestBase):
1617
1618    def test_basic(self):
1619        sym1 = util.symbol('foo')
1620        assert sym1.name == 'foo'
1621        sym2 = util.symbol('foo')
1622
1623        assert sym1 is sym2
1624        assert sym1 == sym2
1625
1626        sym3 = util.symbol('bar')
1627        assert sym1 is not sym3
1628        assert sym1 != sym3
1629
1630    def test_pickle(self):
1631        sym1 = util.symbol('foo')
1632        sym2 = util.symbol('foo')
1633
1634        assert sym1 is sym2
1635
1636        # default
1637        s = util.pickle.dumps(sym1)
1638        sym3 = util.pickle.loads(s)
1639
1640        for protocol in 0, 1, 2:
1641            print(protocol)
1642            serial = util.pickle.dumps(sym1)
1643            rt = util.pickle.loads(serial)
1644            assert rt is sym1
1645            assert rt is sym2
1646
1647    def test_bitflags(self):
1648        sym1 = util.symbol('sym1', canonical=1)
1649        sym2 = util.symbol('sym2', canonical=2)
1650
1651        assert sym1 & sym1
1652        assert not sym1 & sym2
1653        assert not sym1 & sym1 & sym2
1654
1655    def test_composites(self):
1656        sym1 = util.symbol('sym1', canonical=1)
1657        sym2 = util.symbol('sym2', canonical=2)
1658        sym3 = util.symbol('sym3', canonical=4)
1659        sym4 = util.symbol('sym4', canonical=8)
1660
1661        assert sym1 & (sym2 | sym1 | sym4)
1662        assert not sym1 & (sym2 | sym3)
1663
1664        assert not (sym1 | sym2) & (sym3 | sym4)
1665        assert (sym1 | sym2) & (sym2 | sym4)
1666
1667
1668class TestFormatArgspec(fixtures.TestBase):
1669
1670    def test_specs(self):
1671        def test(fn, wanted, grouped=None):
1672            if grouped is None:
1673                parsed = util.format_argspec_plus(fn)
1674            else:
1675                parsed = util.format_argspec_plus(fn, grouped=grouped)
1676            eq_(parsed, wanted)
1677
1678        test(lambda: None,
1679           {'args': '()', 'self_arg': None,
1680            'apply_kw': '()', 'apply_pos': '()'})
1681
1682        test(lambda: None,
1683           {'args': '', 'self_arg': None,
1684            'apply_kw': '', 'apply_pos': ''},
1685           grouped=False)
1686
1687        test(lambda self: None,
1688           {'args': '(self)', 'self_arg': 'self',
1689            'apply_kw': '(self)', 'apply_pos': '(self)'})
1690
1691        test(lambda self: None,
1692           {'args': 'self', 'self_arg': 'self',
1693            'apply_kw': 'self', 'apply_pos': 'self'},
1694           grouped=False)
1695
1696        test(lambda *a: None,
1697           {'args': '(*a)', 'self_arg': 'a[0]',
1698            'apply_kw': '(*a)', 'apply_pos': '(*a)'})
1699
1700        test(lambda **kw: None,
1701           {'args': '(**kw)', 'self_arg': None,
1702            'apply_kw': '(**kw)', 'apply_pos': '(**kw)'})
1703
1704        test(lambda *a, **kw: None,
1705           {'args': '(*a, **kw)', 'self_arg': 'a[0]',
1706            'apply_kw': '(*a, **kw)', 'apply_pos': '(*a, **kw)'})
1707
1708        test(lambda a, *b: None,
1709           {'args': '(a, *b)', 'self_arg': 'a',
1710            'apply_kw': '(a, *b)', 'apply_pos': '(a, *b)'})
1711
1712        test(lambda a, **b: None,
1713           {'args': '(a, **b)', 'self_arg': 'a',
1714            'apply_kw': '(a, **b)', 'apply_pos': '(a, **b)'})
1715
1716        test(lambda a, *b, **c: None,
1717           {'args': '(a, *b, **c)', 'self_arg': 'a',
1718            'apply_kw': '(a, *b, **c)', 'apply_pos': '(a, *b, **c)'})
1719
1720        test(lambda a, b=1, **c: None,
1721           {'args': '(a, b=1, **c)', 'self_arg': 'a',
1722            'apply_kw': '(a, b=b, **c)', 'apply_pos': '(a, b, **c)'})
1723
1724        test(lambda a=1, b=2: None,
1725           {'args': '(a=1, b=2)', 'self_arg': 'a',
1726            'apply_kw': '(a=a, b=b)', 'apply_pos': '(a, b)'})
1727
1728        test(lambda a=1, b=2: None,
1729           {'args': 'a=1, b=2', 'self_arg': 'a',
1730            'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b'},
1731           grouped=False)
1732
1733    @testing.fails_if(lambda: util.pypy,
1734                "pypy doesn't report O.__init__ as object.__init__")
1735    def test_init_grouped(self):
1736        object_spec = {
1737            'args': '(self)', 'self_arg': 'self',
1738            'apply_pos': '(self)', 'apply_kw': '(self)'}
1739        wrapper_spec = {
1740            'args': '(self, *args, **kwargs)', 'self_arg': 'self',
1741            'apply_pos': '(self, *args, **kwargs)',
1742            'apply_kw': '(self, *args, **kwargs)'}
1743        custom_spec = {
1744            'args': '(slef, a=123)', 'self_arg': 'slef',  # yes, slef
1745            'apply_pos': '(slef, a)', 'apply_kw': '(slef, a=a)'}
1746
1747        self._test_init(None, object_spec, wrapper_spec, custom_spec)
1748        self._test_init(True, object_spec, wrapper_spec, custom_spec)
1749
1750    @testing.fails_if(lambda: util.pypy,
1751                "pypy doesn't report O.__init__ as object.__init__")
1752    def test_init_bare(self):
1753        object_spec = {
1754            'args': 'self', 'self_arg': 'self',
1755            'apply_pos': 'self', 'apply_kw': 'self'}
1756        wrapper_spec = {
1757            'args': 'self, *args, **kwargs', 'self_arg': 'self',
1758            'apply_pos': 'self, *args, **kwargs',
1759            'apply_kw': 'self, *args, **kwargs'}
1760        custom_spec = {
1761            'args': 'slef, a=123', 'self_arg': 'slef',  # yes, slef
1762            'apply_pos': 'slef, a', 'apply_kw': 'slef, a=a'}
1763
1764        self._test_init(False, object_spec, wrapper_spec, custom_spec)
1765
1766    def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec):
1767        def test(fn, wanted):
1768            if grouped is None:
1769                parsed = util.format_argspec_init(fn)
1770            else:
1771                parsed = util.format_argspec_init(fn, grouped=grouped)
1772            eq_(parsed, wanted)
1773
1774        class O(object):
1775            pass
1776
1777        test(O.__init__, object_spec)
1778
1779        class O(object):
1780            def __init__(self):
1781                pass
1782
1783        test(O.__init__, object_spec)
1784
1785        class O(object):
1786            def __init__(slef, a=123):
1787                pass
1788
1789        test(O.__init__, custom_spec)
1790
1791        class O(list):
1792            pass
1793
1794        test(O.__init__, wrapper_spec)
1795
1796        class O(list):
1797            def __init__(self, *args, **kwargs):
1798                pass
1799
1800        test(O.__init__, wrapper_spec)
1801
1802        class O(list):
1803            def __init__(self):
1804                pass
1805
1806        test(O.__init__, object_spec)
1807
1808        class O(list):
1809            def __init__(slef, a=123):
1810                pass
1811
1812        test(O.__init__, custom_spec)
1813
1814
1815class GenericReprTest(fixtures.TestBase):
1816
1817    def test_all_positional(self):
1818        class Foo(object):
1819            def __init__(self, a, b, c):
1820                self.a = a
1821                self.b = b
1822                self.c = c
1823        eq_(
1824            util.generic_repr(Foo(1, 2, 3)),
1825            "Foo(1, 2, 3)"
1826        )
1827
1828    def test_positional_plus_kw(self):
1829        class Foo(object):
1830            def __init__(self, a, b, c=5, d=4):
1831                self.a = a
1832                self.b = b
1833                self.c = c
1834                self.d = d
1835        eq_(
1836            util.generic_repr(Foo(1, 2, 3, 6)),
1837            "Foo(1, 2, c=3, d=6)"
1838        )
1839
1840    def test_kw_defaults(self):
1841        class Foo(object):
1842            def __init__(self, a=1, b=2, c=3, d=4):
1843                self.a = a
1844                self.b = b
1845                self.c = c
1846                self.d = d
1847        eq_(
1848            util.generic_repr(Foo(1, 5, 3, 7)),
1849            "Foo(b=5, d=7)"
1850        )
1851
1852    def test_multi_kw(self):
1853        class Foo(object):
1854            def __init__(self, a, b, c=3, d=4):
1855                self.a = a
1856                self.b = b
1857                self.c = c
1858                self.d = d
1859        class Bar(Foo):
1860            def __init__(self, e, f, g=5, **kw):
1861                self.e = e
1862                self.f = f
1863                self.g = g
1864                super(Bar, self).__init__(**kw)
1865
1866        eq_(
1867            util.generic_repr(
1868                Bar('e', 'f', g=7, a=6, b=5, d=9),
1869                to_inspect=[Bar, Foo]
1870            ),
1871            "Bar('e', 'f', g=7, a=6, b=5, d=9)"
1872        )
1873
1874        eq_(
1875            util.generic_repr(
1876                Bar('e', 'f', a=6, b=5),
1877                to_inspect=[Bar, Foo]
1878            ),
1879            "Bar('e', 'f', a=6, b=5)"
1880        )
1881
1882    def test_multi_kw_repeated(self):
1883        class Foo(object):
1884            def __init__(self, a=1, b=2):
1885                self.a = a
1886                self.b = b
1887        class Bar(Foo):
1888            def __init__(self, b=3, c=4, **kw):
1889                self.c = c
1890                super(Bar, self).__init__(b=b, **kw)
1891
1892        eq_(
1893            util.generic_repr(
1894                Bar(a='a', b='b', c='c'),
1895                to_inspect=[Bar, Foo]
1896            ),
1897            "Bar(b='b', c='c', a='a')"
1898        )
1899
1900
1901    def test_discard_vargs(self):
1902        class Foo(object):
1903            def __init__(self, a, b, *args):
1904                self.a = a
1905                self.b = b
1906                self.c, self.d = args[0:2]
1907        eq_(
1908            util.generic_repr(Foo(1, 2, 3, 4)),
1909            "Foo(1, 2)"
1910        )
1911
1912    def test_discard_vargs_kwargs(self):
1913        class Foo(object):
1914            def __init__(self, a, b, *args, **kw):
1915                self.a = a
1916                self.b = b
1917                self.c, self.d = args[0:2]
1918        eq_(
1919            util.generic_repr(Foo(1, 2, 3, 4, x=7, y=4)),
1920            "Foo(1, 2)"
1921        )
1922
1923    def test_significant_vargs(self):
1924        class Foo(object):
1925            def __init__(self, a, b, *args):
1926                self.a = a
1927                self.b = b
1928                self.args = args
1929        eq_(
1930            util.generic_repr(Foo(1, 2, 3, 4)),
1931            "Foo(1, 2, 3, 4)"
1932        )
1933
1934    def test_no_args(self):
1935        class Foo(object):
1936            def __init__(self):
1937                pass
1938        eq_(
1939            util.generic_repr(Foo()),
1940            "Foo()"
1941        )
1942
1943    def test_no_init(self):
1944        class Foo(object):
1945            pass
1946        eq_(
1947            util.generic_repr(Foo()),
1948            "Foo()"
1949        )
1950
1951
1952class AsInterfaceTest(fixtures.TestBase):
1953
1954    class Something(object):
1955
1956        def _ignoreme(self):
1957            pass
1958
1959        def foo(self):
1960            pass
1961
1962        def bar(self):
1963            pass
1964
1965    class Partial(object):
1966
1967        def bar(self):
1968            pass
1969
1970    class Object(object):
1971        pass
1972
1973    def test_instance(self):
1974        obj = object()
1975        assert_raises(TypeError, util.as_interface, obj,
1976                          cls=self.Something)
1977
1978        assert_raises(TypeError, util.as_interface, obj,
1979                          methods=('foo'))
1980
1981        assert_raises(TypeError, util.as_interface, obj,
1982                          cls=self.Something, required=('foo'))
1983
1984        obj = self.Something()
1985        eq_(obj, util.as_interface(obj, cls=self.Something))
1986        eq_(obj, util.as_interface(obj, methods=('foo',)))
1987        eq_(
1988            obj, util.as_interface(obj, cls=self.Something,
1989                                   required=('outofband',)))
1990        partial = self.Partial()
1991
1992        slotted = self.Object()
1993        slotted.bar = lambda self: 123
1994
1995        for obj in partial, slotted:
1996            eq_(obj, util.as_interface(obj, cls=self.Something))
1997            assert_raises(TypeError, util.as_interface, obj,
1998                              methods=('foo'))
1999            eq_(obj, util.as_interface(obj, methods=('bar',)))
2000            eq_(obj, util.as_interface(obj, cls=self.Something,
2001                                       required=('bar',)))
2002            assert_raises(TypeError, util.as_interface, obj,
2003                              cls=self.Something, required=('foo',))
2004
2005            assert_raises(TypeError, util.as_interface, obj,
2006                              cls=self.Something, required=self.Something)
2007
2008    def test_dict(self):
2009        obj = {}
2010        assert_raises(TypeError, util.as_interface, obj,
2011                      cls=self.Something)
2012        assert_raises(TypeError, util.as_interface, obj, methods='foo')
2013        assert_raises(TypeError, util.as_interface, obj,
2014                      cls=self.Something, required='foo')
2015
2016        def assertAdapted(obj, *methods):
2017            assert isinstance(obj, type)
2018            found = set([m for m in dir(obj) if not m.startswith('_')])
2019            for method in methods:
2020                assert method in found
2021                found.remove(method)
2022            assert not found
2023
2024        fn = lambda self: 123
2025        obj = {'foo': fn, 'bar': fn}
2026        res = util.as_interface(obj, cls=self.Something)
2027        assertAdapted(res, 'foo', 'bar')
2028        res = util.as_interface(obj, cls=self.Something,
2029                                required=self.Something)
2030        assertAdapted(res, 'foo', 'bar')
2031        res = util.as_interface(obj, cls=self.Something, required=('foo',))
2032        assertAdapted(res, 'foo', 'bar')
2033        res = util.as_interface(obj, methods=('foo', 'bar'))
2034        assertAdapted(res, 'foo', 'bar')
2035        res = util.as_interface(obj, methods=('foo', 'bar', 'baz'))
2036        assertAdapted(res, 'foo', 'bar')
2037        res = util.as_interface(obj, methods=('foo', 'bar'), required=('foo',))
2038        assertAdapted(res, 'foo', 'bar')
2039        assert_raises(TypeError, util.as_interface, obj, methods=('foo',))
2040        assert_raises(TypeError, util.as_interface, obj,
2041                      methods=('foo', 'bar', 'baz'), required=('baz', ))
2042        obj = {'foo': 123}
2043        assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
2044
2045
2046class TestClassHierarchy(fixtures.TestBase):
2047
2048    def test_object(self):
2049        eq_(set(util.class_hierarchy(object)), set((object,)))
2050
2051    def test_single(self):
2052        class A(object):
2053            pass
2054
2055        class B(object):
2056            pass
2057
2058        eq_(set(util.class_hierarchy(A)), set((A, object)))
2059        eq_(set(util.class_hierarchy(B)), set((B, object)))
2060
2061        class C(A, B):
2062            pass
2063
2064        eq_(set(util.class_hierarchy(A)), set((A, B, C, object)))
2065        eq_(set(util.class_hierarchy(B)), set((A, B, C, object)))
2066
2067    if util.py2k:
2068        def test_oldstyle_mixin(self):
2069            class A(object):
2070                pass
2071
2072            class Mixin:
2073                pass
2074
2075            class B(A, Mixin):
2076                pass
2077
2078            eq_(set(util.class_hierarchy(B)), set((A, B, object)))
2079            eq_(set(util.class_hierarchy(Mixin)), set())
2080            eq_(set(util.class_hierarchy(A)), set((A, B, object)))
2081
2082
2083class ReraiseTest(fixtures.TestBase):
2084    @testing.requires.python3
2085    def test_raise_from_cause_same_cause(self):
2086        class MyException(Exception):
2087            pass
2088
2089        def go():
2090            try:
2091                raise MyException("exc one")
2092            except Exception as err:
2093                util.raise_from_cause(err)
2094
2095        try:
2096            go()
2097            assert False
2098        except MyException as err:
2099            is_(err.__cause__, None)
2100
2101    def test_reraise_disallow_same_cause(self):
2102        class MyException(Exception):
2103            pass
2104
2105        def go():
2106            try:
2107                raise MyException("exc one")
2108            except Exception as err:
2109                type_, value, tb = sys.exc_info()
2110                util.reraise(type_, err, tb, value)
2111
2112        assert_raises_message(
2113            AssertionError,
2114            "Same cause emitted",
2115            go
2116        )
2117
2118    def test_raise_from_cause(self):
2119        class MyException(Exception):
2120            pass
2121
2122        class MyOtherException(Exception):
2123            pass
2124
2125        me = MyException("exc on")
2126
2127        def go():
2128            try:
2129                raise me
2130            except Exception:
2131                util.raise_from_cause(MyOtherException("exc two"))
2132
2133        try:
2134            go()
2135            assert False
2136        except MyOtherException as moe:
2137            if testing.requires.python3.enabled:
2138                is_(moe.__cause__, me)
2139
2140    @testing.requires.python2
2141    def test_safe_reraise_py2k_warning(self):
2142        class MyException(Exception):
2143            pass
2144
2145        class MyOtherException(Exception):
2146            pass
2147
2148        m1 = MyException("exc one")
2149        m2 = MyOtherException("exc two")
2150
2151        def go2():
2152            raise m2
2153
2154        def go():
2155            try:
2156                raise m1
2157            except:
2158                with util.safe_reraise():
2159                    go2()
2160
2161        with expect_warnings(
2162            "An exception has occurred during handling of a previous "
2163            "exception.  The previous exception "
2164            "is:.*MyException.*exc one"
2165        ):
2166            try:
2167                go()
2168                assert False
2169            except MyOtherException:
2170                pass
2171
2172
2173class TestClassProperty(fixtures.TestBase):
2174
2175    def test_simple(self):
2176        class A(object):
2177            something = {'foo': 1}
2178
2179        class B(A):
2180
2181            @classproperty
2182            def something(cls):
2183                d = dict(super(B, cls).something)
2184                d.update({'bazz': 2})
2185                return d
2186
2187        eq_(B.something, {'foo': 1, 'bazz': 2})
2188
2189
2190class TestProperties(fixtures.TestBase):
2191
2192    def test_pickle(self):
2193        data = {'hello': 'bla'}
2194        props = util.Properties(data)
2195
2196        for loader, dumper in picklers():
2197            s = dumper(props)
2198            p = loader(s)
2199
2200            eq_(props._data, p._data)
2201            eq_(props.keys(), p.keys())
2202
2203    def test_pickle_immuatbleprops(self):
2204        data = {'hello': 'bla'}
2205        props = util.Properties(data).as_immutable()
2206
2207        for loader, dumper in picklers():
2208            s = dumper(props)
2209            p = loader(s)
2210
2211            eq_(props._data, p._data)
2212            eq_(props.keys(), p.keys())
2213
2214    def test_pickle_orderedprops(self):
2215        data = {'hello': 'bla'}
2216        props = util.OrderedProperties()
2217        props.update(data)
2218
2219        for loader, dumper in picklers():
2220            s = dumper(props)
2221            p = loader(s)
2222
2223            eq_(props._data, p._data)
2224            eq_(props.keys(), p.keys())
2225