1import copy 2import sys 3 4from sqlalchemy import util, sql, exc, testing 5from sqlalchemy.testing import assert_raises, assert_raises_message, fixtures 6from sqlalchemy.testing import eq_, is_, ne_, fails_if, mock, expect_warnings 7from sqlalchemy.testing.util import picklers, gc_collect 8from sqlalchemy.util import classproperty, WeakSequence, get_callable_argspec 9from sqlalchemy.sql import column 10from sqlalchemy.util import langhelpers, compat 11import inspect 12 13 14class _KeyedTupleTest(object): 15 16 def _fixture(self, values, labels): 17 raise NotImplementedError() 18 19 def test_empty(self): 20 keyed_tuple = self._fixture([], []) 21 eq_(str(keyed_tuple), '()') 22 eq_(len(keyed_tuple), 0) 23 24 eq_(list(keyed_tuple.keys()), []) 25 eq_(keyed_tuple._fields, ()) 26 eq_(keyed_tuple._asdict(), {}) 27 28 def test_values_but_no_labels(self): 29 keyed_tuple = self._fixture([1, 2], []) 30 eq_(str(keyed_tuple), '(1, 2)') 31 eq_(len(keyed_tuple), 2) 32 33 eq_(list(keyed_tuple.keys()), []) 34 eq_(keyed_tuple._fields, ()) 35 eq_(keyed_tuple._asdict(), {}) 36 37 eq_(keyed_tuple[0], 1) 38 eq_(keyed_tuple[1], 2) 39 40 def test_basic_creation(self): 41 keyed_tuple = self._fixture([1, 2], ['a', 'b']) 42 eq_(str(keyed_tuple), '(1, 2)') 43 eq_(list(keyed_tuple.keys()), ['a', 'b']) 44 eq_(keyed_tuple._fields, ('a', 'b')) 45 eq_(keyed_tuple._asdict(), {'a': 1, 'b': 2}) 46 47 def test_basic_index_access(self): 48 keyed_tuple = self._fixture([1, 2], ['a', 'b']) 49 eq_(keyed_tuple[0], 1) 50 eq_(keyed_tuple[1], 2) 51 52 def should_raise(): 53 keyed_tuple[2] 54 assert_raises(IndexError, should_raise) 55 56 def test_basic_attribute_access(self): 57 keyed_tuple = self._fixture([1, 2], ['a', 'b']) 58 eq_(keyed_tuple.a, 1) 59 eq_(keyed_tuple.b, 2) 60 61 def should_raise(): 62 keyed_tuple.c 63 assert_raises(AttributeError, should_raise) 64 65 def test_none_label(self): 66 keyed_tuple = self._fixture([1, 2, 3], ['a', None, 'b']) 67 eq_(str(keyed_tuple), '(1, 2, 3)') 68 69 eq_(list(keyed_tuple.keys()), ['a', 'b']) 70 eq_(keyed_tuple._fields, ('a', 'b')) 71 eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3}) 72 73 # attribute access: can't get at value 2 74 eq_(keyed_tuple.a, 1) 75 eq_(keyed_tuple.b, 3) 76 77 # index access: can get at value 2 78 eq_(keyed_tuple[0], 1) 79 eq_(keyed_tuple[1], 2) 80 eq_(keyed_tuple[2], 3) 81 82 def test_duplicate_labels(self): 83 keyed_tuple = self._fixture([1, 2, 3], ['a', 'b', 'b']) 84 eq_(str(keyed_tuple), '(1, 2, 3)') 85 86 eq_(list(keyed_tuple.keys()), ['a', 'b', 'b']) 87 eq_(keyed_tuple._fields, ('a', 'b', 'b')) 88 eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3}) 89 90 # attribute access: can't get at value 2 91 eq_(keyed_tuple.a, 1) 92 eq_(keyed_tuple.b, 3) 93 94 # index access: can get at value 2 95 eq_(keyed_tuple[0], 1) 96 eq_(keyed_tuple[1], 2) 97 eq_(keyed_tuple[2], 3) 98 99 def test_immutable(self): 100 keyed_tuple = self._fixture([1, 2], ['a', 'b']) 101 eq_(str(keyed_tuple), '(1, 2)') 102 103 eq_(keyed_tuple.a, 1) 104 105 assert_raises(AttributeError, setattr, keyed_tuple, "a", 5) 106 107 def should_raise(): 108 keyed_tuple[0] = 100 109 assert_raises(TypeError, should_raise) 110 111 def test_serialize(self): 112 113 keyed_tuple = self._fixture([1, 2, 3], ['a', None, 'b']) 114 115 for loads, dumps in picklers(): 116 kt = loads(dumps(keyed_tuple)) 117 118 eq_(str(kt), '(1, 2, 3)') 119 120 eq_(list(kt.keys()), ['a', 'b']) 121 eq_(kt._fields, ('a', 'b')) 122 eq_(kt._asdict(), {'a': 1, 'b': 3}) 123 124 125class KeyedTupleTest(_KeyedTupleTest, fixtures.TestBase): 126 def _fixture(self, values, labels): 127 return util.KeyedTuple(values, labels) 128 129 130class LWKeyedTupleTest(_KeyedTupleTest, fixtures.TestBase): 131 def _fixture(self, values, labels): 132 return util.lightweight_named_tuple('n', labels)(values) 133 134 135class WeakSequenceTest(fixtures.TestBase): 136 @testing.requires.predictable_gc 137 def test_cleanout_elements(self): 138 class Foo(object): 139 pass 140 f1, f2, f3 = Foo(), Foo(), Foo() 141 w = WeakSequence([f1, f2, f3]) 142 eq_(len(w), 3) 143 eq_(len(w._storage), 3) 144 del f2 145 gc_collect() 146 eq_(len(w), 2) 147 eq_(len(w._storage), 2) 148 149 @testing.requires.predictable_gc 150 def test_cleanout_appended(self): 151 class Foo(object): 152 pass 153 f1, f2, f3 = Foo(), Foo(), Foo() 154 w = WeakSequence() 155 w.append(f1) 156 w.append(f2) 157 w.append(f3) 158 eq_(len(w), 3) 159 eq_(len(w._storage), 3) 160 del f2 161 gc_collect() 162 eq_(len(w), 2) 163 eq_(len(w._storage), 2) 164 165 166class OrderedDictTest(fixtures.TestBase): 167 168 def test_odict(self): 169 o = util.OrderedDict() 170 o['a'] = 1 171 o['b'] = 2 172 o['snack'] = 'attack' 173 o['c'] = 3 174 175 eq_(list(o.keys()), ['a', 'b', 'snack', 'c']) 176 eq_(list(o.values()), [1, 2, 'attack', 3]) 177 178 o.pop('snack') 179 eq_(list(o.keys()), ['a', 'b', 'c']) 180 eq_(list(o.values()), [1, 2, 3]) 181 182 try: 183 o.pop('eep') 184 assert False 185 except KeyError: 186 pass 187 188 eq_(o.pop('eep', 'woot'), 'woot') 189 190 try: 191 o.pop('whiff', 'bang', 'pow') 192 assert False 193 except TypeError: 194 pass 195 196 eq_(list(o.keys()), ['a', 'b', 'c']) 197 eq_(list(o.values()), [1, 2, 3]) 198 199 o2 = util.OrderedDict(d=4) 200 o2['e'] = 5 201 202 eq_(list(o2.keys()), ['d', 'e']) 203 eq_(list(o2.values()), [4, 5]) 204 205 o.update(o2) 206 eq_(list(o.keys()), ['a', 'b', 'c', 'd', 'e']) 207 eq_(list(o.values()), [1, 2, 3, 4, 5]) 208 209 o.setdefault('c', 'zzz') 210 o.setdefault('f', 6) 211 eq_(list(o.keys()), ['a', 'b', 'c', 'd', 'e', 'f']) 212 eq_(list(o.values()), [1, 2, 3, 4, 5, 6]) 213 214 def test_odict_constructor(self): 215 o = util.OrderedDict([('name', 'jbe'), ('fullname', 'jonathan' 216 ), ('password', '')]) 217 eq_(list(o.keys()), ['name', 'fullname', 'password']) 218 219 def test_odict_copy(self): 220 o = util.OrderedDict() 221 o["zzz"] = 1 222 o["aaa"] = 2 223 eq_(list(o.keys()), ['zzz', 'aaa']) 224 225 o2 = o.copy() 226 eq_(list(o2.keys()), list(o.keys())) 227 228 o3 = copy.copy(o) 229 eq_(list(o3.keys()), list(o.keys())) 230 231 232class OrderedSetTest(fixtures.TestBase): 233 234 def test_mutators_against_iter(self): 235 # testing a set modified against an iterator 236 o = util.OrderedSet([3, 2, 4, 5]) 237 238 eq_(o.difference(iter([3, 4])), util.OrderedSet([2, 5])) 239 eq_(o.intersection(iter([3, 4, 6])), util.OrderedSet([3, 4])) 240 eq_(o.union(iter([3, 4, 6])), util.OrderedSet([2, 3, 4, 5, 6])) 241 242 243class FrozenDictTest(fixtures.TestBase): 244 245 def test_serialize(self): 246 d = util.immutabledict({1: 2, 3: 4}) 247 for loads, dumps in picklers(): 248 print(loads(dumps(d))) 249 250 251class MemoizedAttrTest(fixtures.TestBase): 252 253 def test_memoized_property(self): 254 val = [20] 255 256 class Foo(object): 257 @util.memoized_property 258 def bar(self): 259 v = val[0] 260 val[0] += 1 261 return v 262 263 ne_(Foo.bar, None) 264 f1 = Foo() 265 assert 'bar' not in f1.__dict__ 266 eq_(f1.bar, 20) 267 eq_(f1.bar, 20) 268 eq_(val[0], 21) 269 eq_(f1.__dict__['bar'], 20) 270 271 def test_memoized_instancemethod(self): 272 val = [20] 273 274 class Foo(object): 275 @util.memoized_instancemethod 276 def bar(self): 277 v = val[0] 278 val[0] += 1 279 return v 280 281 assert inspect.ismethod(Foo().bar) 282 ne_(Foo.bar, None) 283 f1 = Foo() 284 assert 'bar' not in f1.__dict__ 285 eq_(f1.bar(), 20) 286 eq_(f1.bar(), 20) 287 eq_(val[0], 21) 288 289 def test_memoized_slots(self): 290 canary = mock.Mock() 291 292 class Foob(util.MemoizedSlots): 293 __slots__ = ('foo_bar', 'gogo') 294 295 def _memoized_method_gogo(self): 296 canary.method() 297 return "gogo" 298 299 def _memoized_attr_foo_bar(self): 300 canary.attr() 301 return "foobar" 302 303 f1 = Foob() 304 assert_raises(AttributeError, setattr, f1, "bar", "bat") 305 306 eq_(f1.foo_bar, "foobar") 307 308 eq_(f1.foo_bar, "foobar") 309 310 eq_(f1.gogo(), "gogo") 311 312 eq_(f1.gogo(), "gogo") 313 314 eq_(canary.mock_calls, [mock.call.attr(), mock.call.method()]) 315 316 317class ToListTest(fixtures.TestBase): 318 def test_from_string(self): 319 eq_( 320 util.to_list("xyz"), 321 ["xyz"] 322 ) 323 324 def test_from_set(self): 325 spec = util.to_list(set([1, 2, 3])) 326 assert isinstance(spec, list) 327 eq_( 328 sorted(spec), 329 [1, 2, 3] 330 ) 331 332 def test_from_dict(self): 333 spec = util.to_list({1: "a", 2: "b", 3: "c"}) 334 assert isinstance(spec, list) 335 eq_( 336 sorted(spec), 337 [1, 2, 3] 338 ) 339 340 def test_from_tuple(self): 341 eq_( 342 util.to_list((1, 2, 3)), 343 [1, 2, 3] 344 ) 345 346 def test_from_bytes(self): 347 348 eq_( 349 util.to_list(compat.b('abc')), 350 [compat.b('abc')] 351 ) 352 353 eq_( 354 util.to_list([ 355 compat.b('abc'), compat.b('def')]), 356 [compat.b('abc'), compat.b('def')] 357 ) 358 359 360class ColumnCollectionTest(fixtures.TestBase): 361 362 def test_in(self): 363 cc = sql.ColumnCollection() 364 cc.add(sql.column('col1')) 365 cc.add(sql.column('col2')) 366 cc.add(sql.column('col3')) 367 assert 'col1' in cc 368 assert 'col2' in cc 369 370 try: 371 cc['col1'] in cc 372 assert False 373 except exc.ArgumentError as e: 374 eq_(str(e), "__contains__ requires a string argument") 375 376 def test_compare(self): 377 cc1 = sql.ColumnCollection() 378 cc2 = sql.ColumnCollection() 379 cc3 = sql.ColumnCollection() 380 c1 = sql.column('col1') 381 c2 = c1.label('col2') 382 c3 = sql.column('col3') 383 cc1.add(c1) 384 cc2.add(c2) 385 cc3.add(c3) 386 assert (cc1 == cc2).compare(c1 == c2) 387 assert not (cc1 == cc3).compare(c2 == c3) 388 389 @testing.emits_warning("Column ") 390 def test_dupes_add(self): 391 cc = sql.ColumnCollection() 392 393 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2') 394 395 cc.add(c1) 396 cc.add(c2a) 397 cc.add(c3) 398 cc.add(c2b) 399 400 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 401 402 # for iter, c2a is replaced by c2b, ordering 403 # is maintained in that way. ideally, iter would be 404 # the same as the "_all_columns" collection. 405 eq_(list(cc), [c1, c2b, c3]) 406 407 assert cc.contains_column(c2a) 408 assert cc.contains_column(c2b) 409 410 ci = cc.as_immutable() 411 eq_(ci._all_columns, [c1, c2a, c3, c2b]) 412 eq_(list(ci), [c1, c2b, c3]) 413 414 def test_replace(self): 415 cc = sql.ColumnCollection() 416 417 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2') 418 419 cc.add(c1) 420 cc.add(c2a) 421 cc.add(c3) 422 423 cc.replace(c2b) 424 425 eq_(cc._all_columns, [c1, c2b, c3]) 426 eq_(list(cc), [c1, c2b, c3]) 427 428 assert not cc.contains_column(c2a) 429 assert cc.contains_column(c2b) 430 431 ci = cc.as_immutable() 432 eq_(ci._all_columns, [c1, c2b, c3]) 433 eq_(list(ci), [c1, c2b, c3]) 434 435 def test_replace_key_matches(self): 436 cc = sql.ColumnCollection() 437 438 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('X') 439 c2b.key = 'c2' 440 441 cc.add(c1) 442 cc.add(c2a) 443 cc.add(c3) 444 445 cc.replace(c2b) 446 447 assert not cc.contains_column(c2a) 448 assert cc.contains_column(c2b) 449 450 eq_(cc._all_columns, [c1, c2b, c3]) 451 eq_(list(cc), [c1, c2b, c3]) 452 453 ci = cc.as_immutable() 454 eq_(ci._all_columns, [c1, c2b, c3]) 455 eq_(list(ci), [c1, c2b, c3]) 456 457 def test_replace_name_matches(self): 458 cc = sql.ColumnCollection() 459 460 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2') 461 c2b.key = 'X' 462 463 cc.add(c1) 464 cc.add(c2a) 465 cc.add(c3) 466 467 cc.replace(c2b) 468 469 assert not cc.contains_column(c2a) 470 assert cc.contains_column(c2b) 471 472 eq_(cc._all_columns, [c1, c2b, c3]) 473 eq_(list(cc), [c1, c3, c2b]) 474 475 ci = cc.as_immutable() 476 eq_(ci._all_columns, [c1, c2b, c3]) 477 eq_(list(ci), [c1, c3, c2b]) 478 479 def test_replace_no_match(self): 480 cc = sql.ColumnCollection() 481 482 c1, c2, c3, c4 = column('c1'), column('c2'), column('c3'), column('c4') 483 c4.key = 'X' 484 485 cc.add(c1) 486 cc.add(c2) 487 cc.add(c3) 488 489 cc.replace(c4) 490 491 assert cc.contains_column(c2) 492 assert cc.contains_column(c4) 493 494 eq_(cc._all_columns, [c1, c2, c3, c4]) 495 eq_(list(cc), [c1, c2, c3, c4]) 496 497 ci = cc.as_immutable() 498 eq_(ci._all_columns, [c1, c2, c3, c4]) 499 eq_(list(ci), [c1, c2, c3, c4]) 500 501 def test_dupes_extend(self): 502 cc = sql.ColumnCollection() 503 504 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2') 505 506 cc.add(c1) 507 cc.add(c2a) 508 509 cc.extend([c3, c2b]) 510 511 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 512 513 # for iter, c2a is replaced by c2b, ordering 514 # is maintained in that way. ideally, iter would be 515 # the same as the "_all_columns" collection. 516 eq_(list(cc), [c1, c2b, c3]) 517 518 assert cc.contains_column(c2a) 519 assert cc.contains_column(c2b) 520 521 ci = cc.as_immutable() 522 eq_(ci._all_columns, [c1, c2a, c3, c2b]) 523 eq_(list(ci), [c1, c2b, c3]) 524 525 def test_dupes_update(self): 526 cc = sql.ColumnCollection() 527 528 c1, c2a, c3, c2b = column('c1'), column('c2'), column('c3'), column('c2') 529 530 cc.add(c1) 531 cc.add(c2a) 532 533 cc.update([(c3.key, c3), (c2b.key, c2b)]) 534 535 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 536 537 assert cc.contains_column(c2a) 538 assert cc.contains_column(c2b) 539 540 # for iter, c2a is replaced by c2b, ordering 541 # is maintained in that way. ideally, iter would be 542 # the same as the "_all_columns" collection. 543 eq_(list(cc), [c1, c2b, c3]) 544 545 def test_extend_existing(self): 546 cc = sql.ColumnCollection() 547 548 c1, c2, c3, c4, c5 = column('c1'), column('c2'), column('c3'), column('c4'), column('c5') 549 550 cc.extend([c1, c2]) 551 eq_(cc._all_columns, [c1, c2]) 552 553 cc.extend([c3]) 554 eq_(cc._all_columns, [c1, c2, c3]) 555 cc.extend([c4, c2, c5]) 556 557 eq_(cc._all_columns, [c1, c2, c3, c4, c5]) 558 559 def test_update_existing(self): 560 cc = sql.ColumnCollection() 561 562 c1, c2, c3, c4, c5 = column('c1'), column('c2'), column('c3'), column('c4'), column('c5') 563 564 cc.update([('c1', c1), ('c2', c2)]) 565 eq_(cc._all_columns, [c1, c2]) 566 567 cc.update([('c3', c3)]) 568 eq_(cc._all_columns, [c1, c2, c3]) 569 cc.update([('c4', c4), ('c2', c2), ('c5', c5)]) 570 571 eq_(cc._all_columns, [c1, c2, c3, c4, c5]) 572 573 574 575class LRUTest(fixtures.TestBase): 576 577 def test_lru(self): 578 class item(object): 579 def __init__(self, id): 580 self.id = id 581 582 def __str__(self): 583 return "item id %d" % self.id 584 585 l = util.LRUCache(10, threshold=.2) 586 587 for id in range(1, 20): 588 l[id] = item(id) 589 590 # first couple of items should be gone 591 assert 1 not in l 592 assert 2 not in l 593 594 # next batch over the threshold of 10 should be present 595 for id_ in range(11, 20): 596 assert id_ in l 597 598 l[12] 599 l[15] 600 l[23] = item(23) 601 l[24] = item(24) 602 l[25] = item(25) 603 l[26] = item(26) 604 l[27] = item(27) 605 606 assert 11 not in l 607 assert 13 not in l 608 609 for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15): 610 assert id_ in l 611 612 i1 = l[25] 613 i2 = item(25) 614 l[25] = i2 615 assert 25 in l 616 assert l[25] is i2 617 618 619class ImmutableSubclass(str): 620 pass 621 622 623class FlattenIteratorTest(fixtures.TestBase): 624 625 def test_flatten(self): 626 assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7, 627 8])) == [1, 2, 3, 4, 5, 6, 7, 8] 628 629 def test_str_with_iter(self): 630 """ensure that a str object with an __iter__ method (like in 631 PyPy) is not interpreted as an iterable. 632 633 """ 634 635 class IterString(str): 636 def __iter__(self): 637 return iter(self + '') 638 639 assert list(util.flatten_iterator([IterString('asdf'), 640 [IterString('x'), IterString('y')]])) == ['asdf', 641 'x', 'y'] 642 643 644class HashOverride(object): 645 646 def __init__(self, value=None): 647 self.value = value 648 649 def __hash__(self): 650 return hash(self.value) 651 652 653class EqOverride(object): 654 655 def __init__(self, value=None): 656 self.value = value 657 __hash__ = object.__hash__ 658 659 def __eq__(self, other): 660 if isinstance(other, EqOverride): 661 return self.value == other.value 662 else: 663 return False 664 665 def __ne__(self, other): 666 if isinstance(other, EqOverride): 667 return self.value != other.value 668 else: 669 return True 670 671 672class HashEqOverride(object): 673 674 def __init__(self, value=None): 675 self.value = value 676 677 def __hash__(self): 678 return hash(self.value) 679 680 def __eq__(self, other): 681 if isinstance(other, EqOverride): 682 return self.value == other.value 683 else: 684 return False 685 686 def __ne__(self, other): 687 if isinstance(other, EqOverride): 688 return self.value != other.value 689 else: 690 return True 691 692 693class IdentitySetTest(fixtures.TestBase): 694 695 def assert_eq(self, identityset, expected_iterable): 696 expected = sorted([id(o) for o in expected_iterable]) 697 found = sorted([id(o) for o in identityset]) 698 eq_(found, expected) 699 700 def test_init(self): 701 ids = util.IdentitySet([1, 2, 3, 2, 1]) 702 self.assert_eq(ids, [1, 2, 3]) 703 704 ids = util.IdentitySet(ids) 705 self.assert_eq(ids, [1, 2, 3]) 706 707 ids = util.IdentitySet() 708 self.assert_eq(ids, []) 709 710 ids = util.IdentitySet([]) 711 self.assert_eq(ids, []) 712 713 ids = util.IdentitySet(ids) 714 self.assert_eq(ids, []) 715 716 def test_add(self): 717 for type_ in (object, ImmutableSubclass): 718 data = [type_(), type_()] 719 ids = util.IdentitySet() 720 for i in list(range(2)) + list(range(2)): 721 ids.add(data[i]) 722 self.assert_eq(ids, data) 723 724 for type_ in (EqOverride, HashOverride, HashEqOverride): 725 data = [type_(1), type_(1), type_(2)] 726 ids = util.IdentitySet() 727 for i in list(range(3)) + list(range(3)): 728 ids.add(data[i]) 729 self.assert_eq(ids, data) 730 731 def test_dunder_sub2(self): 732 IdentitySet = util.IdentitySet 733 o1, o2, o3 = object(), object(), object() 734 ids1 = IdentitySet([o1]) 735 ids2 = IdentitySet([o1, o2, o3]) 736 eq_( 737 ids2 - ids1, 738 IdentitySet([o2, o3]) 739 ) 740 741 ids2 -= ids1 742 eq_(ids2, IdentitySet([o2, o3])) 743 744 def test_dunder_eq(self): 745 _, _, twin1, twin2, unique1, unique2 = self._create_sets() 746 747 # basic set math 748 eq_(twin1 == twin2, True) 749 eq_(unique1 == unique2, False) 750 751 # not an IdentitySet 752 not_an_identity_set = object() 753 eq_(unique1 == not_an_identity_set, False) 754 755 def test_dunder_ne(self): 756 _, _, twin1, twin2, unique1, unique2 = self._create_sets() 757 758 # basic set math 759 eq_(twin1 != twin2, False) 760 eq_(unique1 != unique2, True) 761 762 # not an IdentitySet 763 not_an_identity_set = object() 764 eq_(unique1 != not_an_identity_set, True) 765 766 def test_dunder_le(self): 767 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 768 769 # basic set math 770 eq_(sub_ <= super_, True) 771 eq_(super_ <= sub_, False) 772 773 # the same sets 774 eq_(twin1 <= twin2, True) 775 eq_(twin2 <= twin1, True) 776 777 # totally different sets 778 eq_(unique1 <= unique2, False) 779 eq_(unique2 <= unique1, False) 780 781 # not an IdentitySet 782 def should_raise(): 783 not_an_identity_set = object() 784 return unique1 <= not_an_identity_set 785 self._assert_unorderable_types(should_raise) 786 787 def test_dunder_lt(self): 788 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 789 790 # basic set math 791 eq_(sub_ < super_, True) 792 eq_(super_ < sub_, False) 793 794 # the same sets 795 eq_(twin1 < twin2, False) 796 eq_(twin2 < twin1, False) 797 798 # totally different sets 799 eq_(unique1 < unique2, False) 800 eq_(unique2 < unique1, False) 801 802 # not an IdentitySet 803 def should_raise(): 804 not_an_identity_set = object() 805 return unique1 < not_an_identity_set 806 self._assert_unorderable_types(should_raise) 807 808 def test_dunder_ge(self): 809 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 810 811 # basic set math 812 eq_(sub_ >= super_, False) 813 eq_(super_ >= sub_, True) 814 815 # the same sets 816 eq_(twin1 >= twin2, True) 817 eq_(twin2 >= twin1, True) 818 819 # totally different sets 820 eq_(unique1 >= unique2, False) 821 eq_(unique2 >= unique1, False) 822 823 # not an IdentitySet 824 def should_raise(): 825 not_an_identity_set = object() 826 return unique1 >= not_an_identity_set 827 self._assert_unorderable_types(should_raise) 828 829 def test_dunder_gt(self): 830 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 831 832 # basic set math 833 eq_(sub_ > super_, False) 834 eq_(super_ > sub_, True) 835 836 # the same sets 837 eq_(twin1 > twin2, False) 838 eq_(twin2 > twin1, False) 839 840 # totally different sets 841 eq_(unique1 > unique2, False) 842 eq_(unique2 > unique1, False) 843 844 # not an IdentitySet 845 def should_raise(): 846 not_an_identity_set = object() 847 return unique1 > not_an_identity_set 848 self._assert_unorderable_types(should_raise) 849 850 def test_issubset(self): 851 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 852 853 # basic set math 854 eq_(sub_.issubset(super_), True) 855 eq_(super_.issubset(sub_), False) 856 857 # the same sets 858 eq_(twin1.issubset(twin2), True) 859 eq_(twin2.issubset(twin1), True) 860 861 # totally different sets 862 eq_(unique1.issubset(unique2), False) 863 eq_(unique2.issubset(unique1), False) 864 865 # not an IdentitySet 866 not_an_identity_set = object() 867 assert_raises(TypeError, unique1.issubset, not_an_identity_set) 868 869 def test_issuperset(self): 870 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 871 872 # basic set math 873 eq_(sub_.issuperset(super_), False) 874 eq_(super_.issuperset(sub_), True) 875 876 # the same sets 877 eq_(twin1.issuperset(twin2), True) 878 eq_(twin2.issuperset(twin1), True) 879 880 # totally different sets 881 eq_(unique1.issuperset(unique2), False) 882 eq_(unique2.issuperset(unique1), False) 883 884 # not an IdentitySet 885 not_an_identity_set = object() 886 assert_raises(TypeError, unique1.issuperset, not_an_identity_set) 887 888 def test_union(self): 889 super_, sub_, twin1, twin2, _, _ = self._create_sets() 890 891 # basic set math 892 eq_(sub_.union(super_), super_) 893 eq_(super_.union(sub_), super_) 894 895 # the same sets 896 eq_(twin1.union(twin2), twin1) 897 eq_(twin2.union(twin1), twin1) 898 899 # empty sets 900 empty = util.IdentitySet([]) 901 eq_(empty.union(empty), empty) 902 903 # totally different sets 904 unique1 = util.IdentitySet([1]) 905 unique2 = util.IdentitySet([2]) 906 eq_(unique1.union(unique2), util.IdentitySet([1, 2])) 907 908 # not an IdentitySet 909 not_an_identity_set = object() 910 assert_raises(TypeError, unique1.union, not_an_identity_set) 911 912 def test_dunder_or(self): 913 super_, sub_, twin1, twin2, _, _ = self._create_sets() 914 915 # basic set math 916 eq_(sub_ | super_, super_) 917 eq_(super_ | sub_, super_) 918 919 # the same sets 920 eq_(twin1 | twin2, twin1) 921 eq_(twin2 | twin1, twin1) 922 923 # empty sets 924 empty = util.IdentitySet([]) 925 eq_(empty | empty, empty) 926 927 # totally different sets 928 unique1 = util.IdentitySet([1]) 929 unique2 = util.IdentitySet([2]) 930 eq_(unique1 | unique2, util.IdentitySet([1, 2])) 931 932 # not an IdentitySet 933 def should_raise(): 934 not_an_identity_set = object() 935 return unique1 | not_an_identity_set 936 assert_raises(TypeError, should_raise) 937 938 def test_update(self): 939 pass # TODO 940 941 def test_dunder_ior(self): 942 super_, sub_, _, _, _, _ = self._create_sets() 943 944 # basic set math 945 sub_ |= super_ 946 eq_(sub_, super_) 947 super_ |= sub_ 948 eq_(super_, super_) 949 950 # totally different sets 951 unique1 = util.IdentitySet([1]) 952 unique2 = util.IdentitySet([2]) 953 unique1 |= unique2 954 eq_(unique1, util.IdentitySet([1, 2])) 955 eq_(unique2, util.IdentitySet([2])) 956 957 # not an IdentitySet 958 def should_raise(): 959 unique = util.IdentitySet([1]) 960 not_an_identity_set = object() 961 unique |= not_an_identity_set 962 assert_raises(TypeError, should_raise) 963 964 def test_difference(self): 965 _, _, twin1, twin2, _, _ = self._create_sets() 966 967 # basic set math 968 set1 = util.IdentitySet([1, 2, 3]) 969 set2 = util.IdentitySet([2, 3, 4]) 970 eq_(set1.difference(set2), util.IdentitySet([1])) 971 eq_(set2.difference(set1), util.IdentitySet([4])) 972 973 # empty sets 974 empty = util.IdentitySet([]) 975 eq_(empty.difference(empty), empty) 976 977 # the same sets 978 eq_(twin1.difference(twin2), empty) 979 eq_(twin2.difference(twin1), empty) 980 981 # totally different sets 982 unique1 = util.IdentitySet([1]) 983 unique2 = util.IdentitySet([2]) 984 eq_(unique1.difference(unique2), util.IdentitySet([1])) 985 eq_(unique2.difference(unique1), util.IdentitySet([2])) 986 987 # not an IdentitySet 988 not_an_identity_set = object() 989 assert_raises(TypeError, unique1.difference, not_an_identity_set) 990 991 def test_dunder_sub(self): 992 _, _, twin1, twin2, _, _ = self._create_sets() 993 994 # basic set math 995 set1 = util.IdentitySet([1, 2, 3]) 996 set2 = util.IdentitySet([2, 3, 4]) 997 eq_(set1 - set2, util.IdentitySet([1])) 998 eq_(set2 - set1, util.IdentitySet([4])) 999 1000 # empty sets 1001 empty = util.IdentitySet([]) 1002 eq_(empty - empty, empty) 1003 1004 # the same sets 1005 eq_(twin1 - twin2, empty) 1006 eq_(twin2 - twin1, empty) 1007 1008 # totally different sets 1009 unique1 = util.IdentitySet([1]) 1010 unique2 = util.IdentitySet([2]) 1011 eq_(unique1 - unique2, util.IdentitySet([1])) 1012 eq_(unique2 - unique1, util.IdentitySet([2])) 1013 1014 # not an IdentitySet 1015 def should_raise(): 1016 not_an_identity_set = object() 1017 unique1 - not_an_identity_set 1018 assert_raises(TypeError, should_raise) 1019 1020 def test_difference_update(self): 1021 pass # TODO 1022 1023 def test_dunder_isub(self): 1024 pass # TODO 1025 1026 def test_intersection(self): 1027 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1028 1029 # basic set math 1030 eq_(sub_.intersection(super_), sub_) 1031 eq_(super_.intersection(sub_), sub_) 1032 1033 # the same sets 1034 eq_(twin1.intersection(twin2), twin1) 1035 eq_(twin2.intersection(twin1), twin1) 1036 1037 # empty sets 1038 empty = util.IdentitySet([]) 1039 eq_(empty.intersection(empty), empty) 1040 1041 # totally different sets 1042 eq_(unique1.intersection(unique2), empty) 1043 1044 # not an IdentitySet 1045 not_an_identity_set = object() 1046 assert_raises(TypeError, unique1.intersection, not_an_identity_set) 1047 1048 def test_dunder_and(self): 1049 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1050 1051 # basic set math 1052 eq_(sub_ & super_, sub_) 1053 eq_(super_ & sub_, sub_) 1054 1055 # the same sets 1056 eq_(twin1 & twin2, twin1) 1057 eq_(twin2 & twin1, twin1) 1058 1059 # empty sets 1060 empty = util.IdentitySet([]) 1061 eq_(empty & empty, empty) 1062 1063 # totally different sets 1064 eq_(unique1 & unique2, empty) 1065 1066 # not an IdentitySet 1067 def should_raise(): 1068 not_an_identity_set = object() 1069 return unique1 & not_an_identity_set 1070 assert_raises(TypeError, should_raise) 1071 1072 def test_intersection_update(self): 1073 pass # TODO 1074 1075 def test_dunder_iand(self): 1076 pass # TODO 1077 1078 def test_symmetric_difference(self): 1079 _, _, twin1, twin2, _, _ = self._create_sets() 1080 1081 # basic set math 1082 set1 = util.IdentitySet([1, 2, 3]) 1083 set2 = util.IdentitySet([2, 3, 4]) 1084 eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4])) 1085 eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4])) 1086 1087 # empty sets 1088 empty = util.IdentitySet([]) 1089 eq_(empty.symmetric_difference(empty), empty) 1090 1091 # the same sets 1092 eq_(twin1.symmetric_difference(twin2), empty) 1093 eq_(twin2.symmetric_difference(twin1), empty) 1094 1095 # totally different sets 1096 unique1 = util.IdentitySet([1]) 1097 unique2 = util.IdentitySet([2]) 1098 eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2])) 1099 eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2])) 1100 1101 # not an IdentitySet 1102 not_an_identity_set = object() 1103 assert_raises( 1104 TypeError, unique1.symmetric_difference, not_an_identity_set) 1105 1106 def test_dunder_xor(self): 1107 _, _, twin1, twin2, _, _ = self._create_sets() 1108 1109 # basic set math 1110 set1 = util.IdentitySet([1, 2, 3]) 1111 set2 = util.IdentitySet([2, 3, 4]) 1112 eq_(set1 ^ set2, util.IdentitySet([1, 4])) 1113 eq_(set2 ^ set1, util.IdentitySet([1, 4])) 1114 1115 # empty sets 1116 empty = util.IdentitySet([]) 1117 eq_(empty ^ empty, empty) 1118 1119 # the same sets 1120 eq_(twin1 ^ twin2, empty) 1121 eq_(twin2 ^ twin1, empty) 1122 1123 # totally different sets 1124 unique1 = util.IdentitySet([1]) 1125 unique2 = util.IdentitySet([2]) 1126 eq_(unique1 ^ unique2, util.IdentitySet([1, 2])) 1127 eq_(unique2 ^ unique1, util.IdentitySet([1, 2])) 1128 1129 # not an IdentitySet 1130 def should_raise(): 1131 not_an_identity_set = object() 1132 return unique1 ^ not_an_identity_set 1133 assert_raises(TypeError, should_raise) 1134 1135 def test_symmetric_difference_update(self): 1136 pass # TODO 1137 1138 def _create_sets(self): 1139 o1, o2, o3, o4, o5 = object(), object(), object(), object(), object() 1140 super_ = util.IdentitySet([o1, o2, o3]) 1141 sub_ = util.IdentitySet([o2]) 1142 twin1 = util.IdentitySet([o3]) 1143 twin2 = util.IdentitySet([o3]) 1144 unique1 = util.IdentitySet([o4]) 1145 unique2 = util.IdentitySet([o5]) 1146 return super_, sub_, twin1, twin2, unique1, unique2 1147 1148 def _assert_unorderable_types(self, callable_): 1149 if util.py36: 1150 assert_raises_message( 1151 TypeError, 'not supported between instances of', callable_) 1152 elif util.py3k: 1153 assert_raises_message( 1154 TypeError, 'unorderable types', callable_) 1155 else: 1156 assert_raises_message( 1157 TypeError, 'cannot compare sets using cmp()', callable_) 1158 1159 def test_basic_sanity(self): 1160 IdentitySet = util.IdentitySet 1161 1162 o1, o2, o3 = object(), object(), object() 1163 ids = IdentitySet([o1]) 1164 ids.discard(o1) 1165 ids.discard(o1) 1166 ids.add(o1) 1167 ids.remove(o1) 1168 assert_raises(KeyError, ids.remove, o1) 1169 1170 eq_(ids.copy(), ids) 1171 1172 # explicit __eq__ and __ne__ tests 1173 assert ids != None 1174 assert not(ids == None) 1175 1176 ne_(ids, IdentitySet([o1, o2, o3])) 1177 ids.clear() 1178 assert o1 not in ids 1179 ids.add(o2) 1180 assert o2 in ids 1181 eq_(ids.pop(), o2) 1182 ids.add(o1) 1183 eq_(len(ids), 1) 1184 1185 isuper = IdentitySet([o1, o2]) 1186 assert ids < isuper 1187 assert ids.issubset(isuper) 1188 assert isuper.issuperset(ids) 1189 assert isuper > ids 1190 1191 eq_(ids.union(isuper), isuper) 1192 eq_(ids | isuper, isuper) 1193 eq_(isuper - ids, IdentitySet([o2])) 1194 eq_(isuper.difference(ids), IdentitySet([o2])) 1195 eq_(ids.intersection(isuper), IdentitySet([o1])) 1196 eq_(ids & isuper, IdentitySet([o1])) 1197 eq_(ids.symmetric_difference(isuper), IdentitySet([o2])) 1198 eq_(ids ^ isuper, IdentitySet([o2])) 1199 1200 ids.update(isuper) 1201 ids |= isuper 1202 ids.difference_update(isuper) 1203 ids -= isuper 1204 ids.intersection_update(isuper) 1205 ids &= isuper 1206 ids.symmetric_difference_update(isuper) 1207 ids ^= isuper 1208 1209 ids.update('foobar') 1210 try: 1211 ids |= 'foobar' 1212 assert False 1213 except TypeError: 1214 assert True 1215 1216 try: 1217 s = set([o1, o2]) 1218 s |= ids 1219 assert False 1220 except TypeError: 1221 assert True 1222 1223 assert_raises(TypeError, util.cmp, ids) 1224 assert_raises(TypeError, hash, ids) 1225 1226 1227class OrderedIdentitySetTest(fixtures.TestBase): 1228 1229 def assert_eq(self, identityset, expected_iterable): 1230 expected = [id(o) for o in expected_iterable] 1231 found = [id(o) for o in identityset] 1232 eq_(found, expected) 1233 1234 def test_add(self): 1235 elem = object 1236 s = util.OrderedIdentitySet() 1237 s.add(elem()) 1238 s.add(elem()) 1239 1240 def test_intersection(self): 1241 elem = object 1242 eq_ = self.assert_eq 1243 1244 a, b, c, d, e, f, g = \ 1245 elem(), elem(), elem(), elem(), elem(), elem(), elem() 1246 1247 s1 = util.OrderedIdentitySet([a, b, c]) 1248 s2 = util.OrderedIdentitySet([d, e, f]) 1249 s3 = util.OrderedIdentitySet([a, d, f, g]) 1250 eq_(s1.intersection(s2), []) 1251 eq_(s1.intersection(s3), [a]) 1252 eq_(s1.union(s2).intersection(s3), [a, d, f]) 1253 1254 1255class DictlikeIteritemsTest(fixtures.TestBase): 1256 baseline = set([('a', 1), ('b', 2), ('c', 3)]) 1257 1258 def _ok(self, instance): 1259 iterator = util.dictlike_iteritems(instance) 1260 eq_(set(iterator), self.baseline) 1261 1262 def _notok(self, instance): 1263 assert_raises(TypeError, 1264 util.dictlike_iteritems, 1265 instance) 1266 1267 def test_dict(self): 1268 d = dict(a=1, b=2, c=3) 1269 self._ok(d) 1270 1271 def test_subdict(self): 1272 class subdict(dict): 1273 pass 1274 d = subdict(a=1, b=2, c=3) 1275 self._ok(d) 1276 1277 if util.py2k: 1278 def test_UserDict(self): 1279 import UserDict 1280 d = UserDict.UserDict(a=1, b=2, c=3) 1281 self._ok(d) 1282 1283 def test_object(self): 1284 self._notok(object()) 1285 1286 if util.py2k: 1287 def test_duck_1(self): 1288 class duck1(object): 1289 def iteritems(duck): 1290 return iter(self.baseline) 1291 self._ok(duck1()) 1292 1293 def test_duck_2(self): 1294 class duck2(object): 1295 def items(duck): 1296 return list(self.baseline) 1297 self._ok(duck2()) 1298 1299 if util.py2k: 1300 def test_duck_3(self): 1301 class duck3(object): 1302 def iterkeys(duck): 1303 return iter(['a', 'b', 'c']) 1304 1305 def __getitem__(duck, key): 1306 return dict(a=1, b=2, c=3).get(key) 1307 self._ok(duck3()) 1308 1309 def test_duck_4(self): 1310 class duck4(object): 1311 def iterkeys(duck): 1312 return iter(['a', 'b', 'c']) 1313 self._notok(duck4()) 1314 1315 def test_duck_5(self): 1316 class duck5(object): 1317 def keys(duck): 1318 return ['a', 'b', 'c'] 1319 1320 def get(duck, key): 1321 return dict(a=1, b=2, c=3).get(key) 1322 self._ok(duck5()) 1323 1324 def test_duck_6(self): 1325 class duck6(object): 1326 def keys(duck): 1327 return ['a', 'b', 'c'] 1328 self._notok(duck6()) 1329 1330 1331class DuckTypeCollectionTest(fixtures.TestBase): 1332 1333 def test_sets(self): 1334 class SetLike(object): 1335 def add(self): 1336 pass 1337 1338 class ForcedSet(list): 1339 __emulates__ = set 1340 1341 for type_ in (set, 1342 SetLike, 1343 ForcedSet): 1344 eq_(util.duck_type_collection(type_), set) 1345 instance = type_() 1346 eq_(util.duck_type_collection(instance), set) 1347 1348 for type_ in (frozenset, ): 1349 is_(util.duck_type_collection(type_), None) 1350 instance = type_() 1351 is_(util.duck_type_collection(instance), None) 1352 1353 1354class PublicFactoryTest(fixtures.TestBase): 1355 1356 def _fixture(self): 1357 class Thingy(object): 1358 def __init__(self, value): 1359 "make a thingy" 1360 self.value = value 1361 1362 @classmethod 1363 def foobar(cls, x, y): 1364 "do the foobar" 1365 return Thingy(x + y) 1366 1367 return Thingy 1368 1369 def test_classmethod(self): 1370 Thingy = self._fixture() 1371 foob = langhelpers.public_factory( 1372 Thingy.foobar, ".sql.elements.foob") 1373 eq_(foob(3, 4).value, 7) 1374 eq_(foob(x=3, y=4).value, 7) 1375 eq_(foob.__doc__, "do the foobar") 1376 eq_(foob.__module__, "sqlalchemy.sql.elements") 1377 assert Thingy.foobar.__doc__.startswith("This function is mirrored;") 1378 1379 def test_constructor(self): 1380 Thingy = self._fixture() 1381 foob = langhelpers.public_factory( 1382 Thingy, ".sql.elements.foob") 1383 eq_(foob(7).value, 7) 1384 eq_(foob(value=7).value, 7) 1385 eq_(foob.__doc__, "make a thingy") 1386 eq_(foob.__module__, "sqlalchemy.sql.elements") 1387 assert Thingy.__init__.__doc__.startswith( 1388 "Construct a new :class:`.Thingy` object.") 1389 1390 1391class ArgInspectionTest(fixtures.TestBase): 1392 1393 def test_get_cls_kwargs(self): 1394 1395 class A(object): 1396 def __init__(self, a): 1397 pass 1398 1399 class A1(A): 1400 def __init__(self, a1): 1401 pass 1402 1403 class A11(A1): 1404 def __init__(self, a11, **kw): 1405 pass 1406 1407 class B(object): 1408 def __init__(self, b, **kw): 1409 pass 1410 1411 class B1(B): 1412 def __init__(self, b1, **kw): 1413 pass 1414 1415 class B2(B): 1416 def __init__(self, b2): 1417 pass 1418 1419 class AB(A, B): 1420 def __init__(self, ab): 1421 pass 1422 1423 class BA(B, A): 1424 def __init__(self, ba, **kwargs): 1425 pass 1426 1427 class BA1(BA): 1428 pass 1429 1430 class CAB(A, B): 1431 pass 1432 1433 class CBA(B, A): 1434 pass 1435 1436 class CB1A1(B1, A1): 1437 pass 1438 1439 class CAB1(A, B1): 1440 pass 1441 1442 class CB1A(B1, A): 1443 pass 1444 1445 class CB2A(B2, A): 1446 pass 1447 1448 class D(object): 1449 pass 1450 1451 class BA2(B, A): 1452 pass 1453 1454 class A11B1(A11, B1): 1455 pass 1456 1457 def test(cls, *expected): 1458 eq_(set(util.get_cls_kwargs(cls)), set(expected)) 1459 1460 test(A, 'a') 1461 test(A1, 'a1') 1462 test(A11, 'a11', 'a1') 1463 test(B, 'b') 1464 test(B1, 'b1', 'b') 1465 test(AB, 'ab') 1466 test(BA, 'ba', 'b', 'a') 1467 test(BA1, 'ba', 'b', 'a') 1468 test(CAB, 'a') 1469 test(CBA, 'b', 'a') 1470 test(CAB1, 'a') 1471 test(CB1A, 'b1', 'b', 'a') 1472 test(CB2A, 'b2') 1473 test(CB1A1, "a1", "b1", "b") 1474 test(D) 1475 test(BA2, "a", "b") 1476 test(A11B1, "a1", "a11", "b", "b1") 1477 1478 def test_get_func_kwargs(self): 1479 1480 def f1(): 1481 pass 1482 1483 def f2(foo): 1484 pass 1485 1486 def f3(*foo): 1487 pass 1488 1489 def f4(**foo): 1490 pass 1491 1492 def test(fn, *expected): 1493 eq_(set(util.get_func_kwargs(fn)), set(expected)) 1494 1495 test(f1) 1496 test(f2, 'foo') 1497 test(f3) 1498 test(f4) 1499 1500 def test_callable_argspec_fn(self): 1501 def foo(x, y, **kw): 1502 pass 1503 eq_( 1504 get_callable_argspec(foo), 1505 (['x', 'y'], None, 'kw', None) 1506 ) 1507 1508 def test_callable_argspec_fn_no_self(self): 1509 def foo(x, y, **kw): 1510 pass 1511 eq_( 1512 get_callable_argspec(foo, no_self=True), 1513 (['x', 'y'], None, 'kw', None) 1514 ) 1515 1516 def test_callable_argspec_fn_no_self_but_self(self): 1517 def foo(self, x, y, **kw): 1518 pass 1519 eq_( 1520 get_callable_argspec(foo, no_self=True), 1521 (['self', 'x', 'y'], None, 'kw', None) 1522 ) 1523 1524 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1525 def test_callable_argspec_py_builtin(self): 1526 import datetime 1527 assert_raises( 1528 TypeError, 1529 get_callable_argspec, datetime.datetime.now 1530 ) 1531 1532 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1533 def test_callable_argspec_obj_init(self): 1534 assert_raises( 1535 TypeError, 1536 get_callable_argspec, object 1537 ) 1538 1539 def test_callable_argspec_method(self): 1540 class Foo(object): 1541 def foo(self, x, y, **kw): 1542 pass 1543 eq_( 1544 get_callable_argspec(Foo.foo), 1545 (['self', 'x', 'y'], None, 'kw', None) 1546 ) 1547 1548 def test_callable_argspec_instance_method_no_self(self): 1549 class Foo(object): 1550 def foo(self, x, y, **kw): 1551 pass 1552 eq_( 1553 get_callable_argspec(Foo().foo, no_self=True), 1554 (['x', 'y'], None, 'kw', None) 1555 ) 1556 1557 def test_callable_argspec_unbound_method_no_self(self): 1558 class Foo(object): 1559 def foo(self, x, y, **kw): 1560 pass 1561 eq_( 1562 get_callable_argspec(Foo.foo, no_self=True), 1563 (['self', 'x', 'y'], None, 'kw', None) 1564 ) 1565 1566 def test_callable_argspec_init(self): 1567 class Foo(object): 1568 def __init__(self, x, y): 1569 pass 1570 1571 eq_( 1572 get_callable_argspec(Foo), 1573 (['self', 'x', 'y'], None, None, None) 1574 ) 1575 1576 def test_callable_argspec_init_no_self(self): 1577 class Foo(object): 1578 def __init__(self, x, y): 1579 pass 1580 1581 eq_( 1582 get_callable_argspec(Foo, no_self=True), 1583 (['x', 'y'], None, None, None) 1584 ) 1585 1586 def test_callable_argspec_call(self): 1587 class Foo(object): 1588 def __call__(self, x, y): 1589 pass 1590 eq_( 1591 get_callable_argspec(Foo()), 1592 (['self', 'x', 'y'], None, None, None) 1593 ) 1594 1595 def test_callable_argspec_call_no_self(self): 1596 class Foo(object): 1597 def __call__(self, x, y): 1598 pass 1599 eq_( 1600 get_callable_argspec(Foo(), no_self=True), 1601 (['x', 'y'], None, None, None) 1602 ) 1603 1604 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1605 def test_callable_argspec_partial(self): 1606 from functools import partial 1607 def foo(x, y, z, **kw): 1608 pass 1609 bar = partial(foo, 5) 1610 1611 assert_raises( 1612 TypeError, 1613 get_callable_argspec, bar 1614 ) 1615 1616class SymbolTest(fixtures.TestBase): 1617 1618 def test_basic(self): 1619 sym1 = util.symbol('foo') 1620 assert sym1.name == 'foo' 1621 sym2 = util.symbol('foo') 1622 1623 assert sym1 is sym2 1624 assert sym1 == sym2 1625 1626 sym3 = util.symbol('bar') 1627 assert sym1 is not sym3 1628 assert sym1 != sym3 1629 1630 def test_pickle(self): 1631 sym1 = util.symbol('foo') 1632 sym2 = util.symbol('foo') 1633 1634 assert sym1 is sym2 1635 1636 # default 1637 s = util.pickle.dumps(sym1) 1638 sym3 = util.pickle.loads(s) 1639 1640 for protocol in 0, 1, 2: 1641 print(protocol) 1642 serial = util.pickle.dumps(sym1) 1643 rt = util.pickle.loads(serial) 1644 assert rt is sym1 1645 assert rt is sym2 1646 1647 def test_bitflags(self): 1648 sym1 = util.symbol('sym1', canonical=1) 1649 sym2 = util.symbol('sym2', canonical=2) 1650 1651 assert sym1 & sym1 1652 assert not sym1 & sym2 1653 assert not sym1 & sym1 & sym2 1654 1655 def test_composites(self): 1656 sym1 = util.symbol('sym1', canonical=1) 1657 sym2 = util.symbol('sym2', canonical=2) 1658 sym3 = util.symbol('sym3', canonical=4) 1659 sym4 = util.symbol('sym4', canonical=8) 1660 1661 assert sym1 & (sym2 | sym1 | sym4) 1662 assert not sym1 & (sym2 | sym3) 1663 1664 assert not (sym1 | sym2) & (sym3 | sym4) 1665 assert (sym1 | sym2) & (sym2 | sym4) 1666 1667 1668class TestFormatArgspec(fixtures.TestBase): 1669 1670 def test_specs(self): 1671 def test(fn, wanted, grouped=None): 1672 if grouped is None: 1673 parsed = util.format_argspec_plus(fn) 1674 else: 1675 parsed = util.format_argspec_plus(fn, grouped=grouped) 1676 eq_(parsed, wanted) 1677 1678 test(lambda: None, 1679 {'args': '()', 'self_arg': None, 1680 'apply_kw': '()', 'apply_pos': '()'}) 1681 1682 test(lambda: None, 1683 {'args': '', 'self_arg': None, 1684 'apply_kw': '', 'apply_pos': ''}, 1685 grouped=False) 1686 1687 test(lambda self: None, 1688 {'args': '(self)', 'self_arg': 'self', 1689 'apply_kw': '(self)', 'apply_pos': '(self)'}) 1690 1691 test(lambda self: None, 1692 {'args': 'self', 'self_arg': 'self', 1693 'apply_kw': 'self', 'apply_pos': 'self'}, 1694 grouped=False) 1695 1696 test(lambda *a: None, 1697 {'args': '(*a)', 'self_arg': 'a[0]', 1698 'apply_kw': '(*a)', 'apply_pos': '(*a)'}) 1699 1700 test(lambda **kw: None, 1701 {'args': '(**kw)', 'self_arg': None, 1702 'apply_kw': '(**kw)', 'apply_pos': '(**kw)'}) 1703 1704 test(lambda *a, **kw: None, 1705 {'args': '(*a, **kw)', 'self_arg': 'a[0]', 1706 'apply_kw': '(*a, **kw)', 'apply_pos': '(*a, **kw)'}) 1707 1708 test(lambda a, *b: None, 1709 {'args': '(a, *b)', 'self_arg': 'a', 1710 'apply_kw': '(a, *b)', 'apply_pos': '(a, *b)'}) 1711 1712 test(lambda a, **b: None, 1713 {'args': '(a, **b)', 'self_arg': 'a', 1714 'apply_kw': '(a, **b)', 'apply_pos': '(a, **b)'}) 1715 1716 test(lambda a, *b, **c: None, 1717 {'args': '(a, *b, **c)', 'self_arg': 'a', 1718 'apply_kw': '(a, *b, **c)', 'apply_pos': '(a, *b, **c)'}) 1719 1720 test(lambda a, b=1, **c: None, 1721 {'args': '(a, b=1, **c)', 'self_arg': 'a', 1722 'apply_kw': '(a, b=b, **c)', 'apply_pos': '(a, b, **c)'}) 1723 1724 test(lambda a=1, b=2: None, 1725 {'args': '(a=1, b=2)', 'self_arg': 'a', 1726 'apply_kw': '(a=a, b=b)', 'apply_pos': '(a, b)'}) 1727 1728 test(lambda a=1, b=2: None, 1729 {'args': 'a=1, b=2', 'self_arg': 'a', 1730 'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b'}, 1731 grouped=False) 1732 1733 @testing.fails_if(lambda: util.pypy, 1734 "pypy doesn't report O.__init__ as object.__init__") 1735 def test_init_grouped(self): 1736 object_spec = { 1737 'args': '(self)', 'self_arg': 'self', 1738 'apply_pos': '(self)', 'apply_kw': '(self)'} 1739 wrapper_spec = { 1740 'args': '(self, *args, **kwargs)', 'self_arg': 'self', 1741 'apply_pos': '(self, *args, **kwargs)', 1742 'apply_kw': '(self, *args, **kwargs)'} 1743 custom_spec = { 1744 'args': '(slef, a=123)', 'self_arg': 'slef', # yes, slef 1745 'apply_pos': '(slef, a)', 'apply_kw': '(slef, a=a)'} 1746 1747 self._test_init(None, object_spec, wrapper_spec, custom_spec) 1748 self._test_init(True, object_spec, wrapper_spec, custom_spec) 1749 1750 @testing.fails_if(lambda: util.pypy, 1751 "pypy doesn't report O.__init__ as object.__init__") 1752 def test_init_bare(self): 1753 object_spec = { 1754 'args': 'self', 'self_arg': 'self', 1755 'apply_pos': 'self', 'apply_kw': 'self'} 1756 wrapper_spec = { 1757 'args': 'self, *args, **kwargs', 'self_arg': 'self', 1758 'apply_pos': 'self, *args, **kwargs', 1759 'apply_kw': 'self, *args, **kwargs'} 1760 custom_spec = { 1761 'args': 'slef, a=123', 'self_arg': 'slef', # yes, slef 1762 'apply_pos': 'slef, a', 'apply_kw': 'slef, a=a'} 1763 1764 self._test_init(False, object_spec, wrapper_spec, custom_spec) 1765 1766 def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec): 1767 def test(fn, wanted): 1768 if grouped is None: 1769 parsed = util.format_argspec_init(fn) 1770 else: 1771 parsed = util.format_argspec_init(fn, grouped=grouped) 1772 eq_(parsed, wanted) 1773 1774 class O(object): 1775 pass 1776 1777 test(O.__init__, object_spec) 1778 1779 class O(object): 1780 def __init__(self): 1781 pass 1782 1783 test(O.__init__, object_spec) 1784 1785 class O(object): 1786 def __init__(slef, a=123): 1787 pass 1788 1789 test(O.__init__, custom_spec) 1790 1791 class O(list): 1792 pass 1793 1794 test(O.__init__, wrapper_spec) 1795 1796 class O(list): 1797 def __init__(self, *args, **kwargs): 1798 pass 1799 1800 test(O.__init__, wrapper_spec) 1801 1802 class O(list): 1803 def __init__(self): 1804 pass 1805 1806 test(O.__init__, object_spec) 1807 1808 class O(list): 1809 def __init__(slef, a=123): 1810 pass 1811 1812 test(O.__init__, custom_spec) 1813 1814 1815class GenericReprTest(fixtures.TestBase): 1816 1817 def test_all_positional(self): 1818 class Foo(object): 1819 def __init__(self, a, b, c): 1820 self.a = a 1821 self.b = b 1822 self.c = c 1823 eq_( 1824 util.generic_repr(Foo(1, 2, 3)), 1825 "Foo(1, 2, 3)" 1826 ) 1827 1828 def test_positional_plus_kw(self): 1829 class Foo(object): 1830 def __init__(self, a, b, c=5, d=4): 1831 self.a = a 1832 self.b = b 1833 self.c = c 1834 self.d = d 1835 eq_( 1836 util.generic_repr(Foo(1, 2, 3, 6)), 1837 "Foo(1, 2, c=3, d=6)" 1838 ) 1839 1840 def test_kw_defaults(self): 1841 class Foo(object): 1842 def __init__(self, a=1, b=2, c=3, d=4): 1843 self.a = a 1844 self.b = b 1845 self.c = c 1846 self.d = d 1847 eq_( 1848 util.generic_repr(Foo(1, 5, 3, 7)), 1849 "Foo(b=5, d=7)" 1850 ) 1851 1852 def test_multi_kw(self): 1853 class Foo(object): 1854 def __init__(self, a, b, c=3, d=4): 1855 self.a = a 1856 self.b = b 1857 self.c = c 1858 self.d = d 1859 class Bar(Foo): 1860 def __init__(self, e, f, g=5, **kw): 1861 self.e = e 1862 self.f = f 1863 self.g = g 1864 super(Bar, self).__init__(**kw) 1865 1866 eq_( 1867 util.generic_repr( 1868 Bar('e', 'f', g=7, a=6, b=5, d=9), 1869 to_inspect=[Bar, Foo] 1870 ), 1871 "Bar('e', 'f', g=7, a=6, b=5, d=9)" 1872 ) 1873 1874 eq_( 1875 util.generic_repr( 1876 Bar('e', 'f', a=6, b=5), 1877 to_inspect=[Bar, Foo] 1878 ), 1879 "Bar('e', 'f', a=6, b=5)" 1880 ) 1881 1882 def test_multi_kw_repeated(self): 1883 class Foo(object): 1884 def __init__(self, a=1, b=2): 1885 self.a = a 1886 self.b = b 1887 class Bar(Foo): 1888 def __init__(self, b=3, c=4, **kw): 1889 self.c = c 1890 super(Bar, self).__init__(b=b, **kw) 1891 1892 eq_( 1893 util.generic_repr( 1894 Bar(a='a', b='b', c='c'), 1895 to_inspect=[Bar, Foo] 1896 ), 1897 "Bar(b='b', c='c', a='a')" 1898 ) 1899 1900 1901 def test_discard_vargs(self): 1902 class Foo(object): 1903 def __init__(self, a, b, *args): 1904 self.a = a 1905 self.b = b 1906 self.c, self.d = args[0:2] 1907 eq_( 1908 util.generic_repr(Foo(1, 2, 3, 4)), 1909 "Foo(1, 2)" 1910 ) 1911 1912 def test_discard_vargs_kwargs(self): 1913 class Foo(object): 1914 def __init__(self, a, b, *args, **kw): 1915 self.a = a 1916 self.b = b 1917 self.c, self.d = args[0:2] 1918 eq_( 1919 util.generic_repr(Foo(1, 2, 3, 4, x=7, y=4)), 1920 "Foo(1, 2)" 1921 ) 1922 1923 def test_significant_vargs(self): 1924 class Foo(object): 1925 def __init__(self, a, b, *args): 1926 self.a = a 1927 self.b = b 1928 self.args = args 1929 eq_( 1930 util.generic_repr(Foo(1, 2, 3, 4)), 1931 "Foo(1, 2, 3, 4)" 1932 ) 1933 1934 def test_no_args(self): 1935 class Foo(object): 1936 def __init__(self): 1937 pass 1938 eq_( 1939 util.generic_repr(Foo()), 1940 "Foo()" 1941 ) 1942 1943 def test_no_init(self): 1944 class Foo(object): 1945 pass 1946 eq_( 1947 util.generic_repr(Foo()), 1948 "Foo()" 1949 ) 1950 1951 1952class AsInterfaceTest(fixtures.TestBase): 1953 1954 class Something(object): 1955 1956 def _ignoreme(self): 1957 pass 1958 1959 def foo(self): 1960 pass 1961 1962 def bar(self): 1963 pass 1964 1965 class Partial(object): 1966 1967 def bar(self): 1968 pass 1969 1970 class Object(object): 1971 pass 1972 1973 def test_instance(self): 1974 obj = object() 1975 assert_raises(TypeError, util.as_interface, obj, 1976 cls=self.Something) 1977 1978 assert_raises(TypeError, util.as_interface, obj, 1979 methods=('foo')) 1980 1981 assert_raises(TypeError, util.as_interface, obj, 1982 cls=self.Something, required=('foo')) 1983 1984 obj = self.Something() 1985 eq_(obj, util.as_interface(obj, cls=self.Something)) 1986 eq_(obj, util.as_interface(obj, methods=('foo',))) 1987 eq_( 1988 obj, util.as_interface(obj, cls=self.Something, 1989 required=('outofband',))) 1990 partial = self.Partial() 1991 1992 slotted = self.Object() 1993 slotted.bar = lambda self: 123 1994 1995 for obj in partial, slotted: 1996 eq_(obj, util.as_interface(obj, cls=self.Something)) 1997 assert_raises(TypeError, util.as_interface, obj, 1998 methods=('foo')) 1999 eq_(obj, util.as_interface(obj, methods=('bar',))) 2000 eq_(obj, util.as_interface(obj, cls=self.Something, 2001 required=('bar',))) 2002 assert_raises(TypeError, util.as_interface, obj, 2003 cls=self.Something, required=('foo',)) 2004 2005 assert_raises(TypeError, util.as_interface, obj, 2006 cls=self.Something, required=self.Something) 2007 2008 def test_dict(self): 2009 obj = {} 2010 assert_raises(TypeError, util.as_interface, obj, 2011 cls=self.Something) 2012 assert_raises(TypeError, util.as_interface, obj, methods='foo') 2013 assert_raises(TypeError, util.as_interface, obj, 2014 cls=self.Something, required='foo') 2015 2016 def assertAdapted(obj, *methods): 2017 assert isinstance(obj, type) 2018 found = set([m for m in dir(obj) if not m.startswith('_')]) 2019 for method in methods: 2020 assert method in found 2021 found.remove(method) 2022 assert not found 2023 2024 fn = lambda self: 123 2025 obj = {'foo': fn, 'bar': fn} 2026 res = util.as_interface(obj, cls=self.Something) 2027 assertAdapted(res, 'foo', 'bar') 2028 res = util.as_interface(obj, cls=self.Something, 2029 required=self.Something) 2030 assertAdapted(res, 'foo', 'bar') 2031 res = util.as_interface(obj, cls=self.Something, required=('foo',)) 2032 assertAdapted(res, 'foo', 'bar') 2033 res = util.as_interface(obj, methods=('foo', 'bar')) 2034 assertAdapted(res, 'foo', 'bar') 2035 res = util.as_interface(obj, methods=('foo', 'bar', 'baz')) 2036 assertAdapted(res, 'foo', 'bar') 2037 res = util.as_interface(obj, methods=('foo', 'bar'), required=('foo',)) 2038 assertAdapted(res, 'foo', 'bar') 2039 assert_raises(TypeError, util.as_interface, obj, methods=('foo',)) 2040 assert_raises(TypeError, util.as_interface, obj, 2041 methods=('foo', 'bar', 'baz'), required=('baz', )) 2042 obj = {'foo': 123} 2043 assert_raises(TypeError, util.as_interface, obj, cls=self.Something) 2044 2045 2046class TestClassHierarchy(fixtures.TestBase): 2047 2048 def test_object(self): 2049 eq_(set(util.class_hierarchy(object)), set((object,))) 2050 2051 def test_single(self): 2052 class A(object): 2053 pass 2054 2055 class B(object): 2056 pass 2057 2058 eq_(set(util.class_hierarchy(A)), set((A, object))) 2059 eq_(set(util.class_hierarchy(B)), set((B, object))) 2060 2061 class C(A, B): 2062 pass 2063 2064 eq_(set(util.class_hierarchy(A)), set((A, B, C, object))) 2065 eq_(set(util.class_hierarchy(B)), set((A, B, C, object))) 2066 2067 if util.py2k: 2068 def test_oldstyle_mixin(self): 2069 class A(object): 2070 pass 2071 2072 class Mixin: 2073 pass 2074 2075 class B(A, Mixin): 2076 pass 2077 2078 eq_(set(util.class_hierarchy(B)), set((A, B, object))) 2079 eq_(set(util.class_hierarchy(Mixin)), set()) 2080 eq_(set(util.class_hierarchy(A)), set((A, B, object))) 2081 2082 2083class ReraiseTest(fixtures.TestBase): 2084 @testing.requires.python3 2085 def test_raise_from_cause_same_cause(self): 2086 class MyException(Exception): 2087 pass 2088 2089 def go(): 2090 try: 2091 raise MyException("exc one") 2092 except Exception as err: 2093 util.raise_from_cause(err) 2094 2095 try: 2096 go() 2097 assert False 2098 except MyException as err: 2099 is_(err.__cause__, None) 2100 2101 def test_reraise_disallow_same_cause(self): 2102 class MyException(Exception): 2103 pass 2104 2105 def go(): 2106 try: 2107 raise MyException("exc one") 2108 except Exception as err: 2109 type_, value, tb = sys.exc_info() 2110 util.reraise(type_, err, tb, value) 2111 2112 assert_raises_message( 2113 AssertionError, 2114 "Same cause emitted", 2115 go 2116 ) 2117 2118 def test_raise_from_cause(self): 2119 class MyException(Exception): 2120 pass 2121 2122 class MyOtherException(Exception): 2123 pass 2124 2125 me = MyException("exc on") 2126 2127 def go(): 2128 try: 2129 raise me 2130 except Exception: 2131 util.raise_from_cause(MyOtherException("exc two")) 2132 2133 try: 2134 go() 2135 assert False 2136 except MyOtherException as moe: 2137 if testing.requires.python3.enabled: 2138 is_(moe.__cause__, me) 2139 2140 @testing.requires.python2 2141 def test_safe_reraise_py2k_warning(self): 2142 class MyException(Exception): 2143 pass 2144 2145 class MyOtherException(Exception): 2146 pass 2147 2148 m1 = MyException("exc one") 2149 m2 = MyOtherException("exc two") 2150 2151 def go2(): 2152 raise m2 2153 2154 def go(): 2155 try: 2156 raise m1 2157 except: 2158 with util.safe_reraise(): 2159 go2() 2160 2161 with expect_warnings( 2162 "An exception has occurred during handling of a previous " 2163 "exception. The previous exception " 2164 "is:.*MyException.*exc one" 2165 ): 2166 try: 2167 go() 2168 assert False 2169 except MyOtherException: 2170 pass 2171 2172 2173class TestClassProperty(fixtures.TestBase): 2174 2175 def test_simple(self): 2176 class A(object): 2177 something = {'foo': 1} 2178 2179 class B(A): 2180 2181 @classproperty 2182 def something(cls): 2183 d = dict(super(B, cls).something) 2184 d.update({'bazz': 2}) 2185 return d 2186 2187 eq_(B.something, {'foo': 1, 'bazz': 2}) 2188 2189 2190class TestProperties(fixtures.TestBase): 2191 2192 def test_pickle(self): 2193 data = {'hello': 'bla'} 2194 props = util.Properties(data) 2195 2196 for loader, dumper in picklers(): 2197 s = dumper(props) 2198 p = loader(s) 2199 2200 eq_(props._data, p._data) 2201 eq_(props.keys(), p.keys()) 2202 2203 def test_pickle_immuatbleprops(self): 2204 data = {'hello': 'bla'} 2205 props = util.Properties(data).as_immutable() 2206 2207 for loader, dumper in picklers(): 2208 s = dumper(props) 2209 p = loader(s) 2210 2211 eq_(props._data, p._data) 2212 eq_(props.keys(), p.keys()) 2213 2214 def test_pickle_orderedprops(self): 2215 data = {'hello': 'bla'} 2216 props = util.OrderedProperties() 2217 props.update(data) 2218 2219 for loader, dumper in picklers(): 2220 s = dumper(props) 2221 p = loader(s) 2222 2223 eq_(props._data, p._data) 2224 eq_(props.keys(), p.keys()) 2225