1#! coding: utf-8 2 3import copy 4import inspect 5import sys 6 7from sqlalchemy import exc 8from sqlalchemy import sql 9from sqlalchemy import testing 10from sqlalchemy import util 11from sqlalchemy.sql import column 12from sqlalchemy.testing import assert_raises 13from sqlalchemy.testing import assert_raises_message 14from sqlalchemy.testing import eq_ 15from sqlalchemy.testing import expect_warnings 16from sqlalchemy.testing import fails_if 17from sqlalchemy.testing import fixtures 18from sqlalchemy.testing import is_ 19from sqlalchemy.testing import mock 20from sqlalchemy.testing import ne_ 21from sqlalchemy.testing.util import gc_collect 22from sqlalchemy.testing.util import picklers 23from sqlalchemy.util import classproperty 24from sqlalchemy.util import compat 25from sqlalchemy.util import get_callable_argspec 26from sqlalchemy.util import langhelpers 27from sqlalchemy.util import WeakSequence 28 29 30class _KeyedTupleTest(object): 31 def _fixture(self, values, labels): 32 raise NotImplementedError() 33 34 def test_empty(self): 35 keyed_tuple = self._fixture([], []) 36 eq_(str(keyed_tuple), "()") 37 eq_(len(keyed_tuple), 0) 38 39 eq_(list(keyed_tuple.keys()), []) 40 eq_(keyed_tuple._fields, ()) 41 eq_(keyed_tuple._asdict(), {}) 42 43 def test_values_but_no_labels(self): 44 keyed_tuple = self._fixture([1, 2], []) 45 eq_(str(keyed_tuple), "(1, 2)") 46 eq_(len(keyed_tuple), 2) 47 48 eq_(list(keyed_tuple.keys()), []) 49 eq_(keyed_tuple._fields, ()) 50 eq_(keyed_tuple._asdict(), {}) 51 52 eq_(keyed_tuple[0], 1) 53 eq_(keyed_tuple[1], 2) 54 55 def test_basic_creation(self): 56 keyed_tuple = self._fixture([1, 2], ["a", "b"]) 57 eq_(str(keyed_tuple), "(1, 2)") 58 eq_(list(keyed_tuple.keys()), ["a", "b"]) 59 eq_(keyed_tuple._fields, ("a", "b")) 60 eq_(keyed_tuple._asdict(), {"a": 1, "b": 2}) 61 62 def test_basic_index_access(self): 63 keyed_tuple = self._fixture([1, 2], ["a", "b"]) 64 eq_(keyed_tuple[0], 1) 65 eq_(keyed_tuple[1], 2) 66 67 def should_raise(): 68 keyed_tuple[2] 69 70 assert_raises(IndexError, should_raise) 71 72 def test_basic_attribute_access(self): 73 keyed_tuple = self._fixture([1, 2], ["a", "b"]) 74 eq_(keyed_tuple.a, 1) 75 eq_(keyed_tuple.b, 2) 76 77 def should_raise(): 78 keyed_tuple.c 79 80 assert_raises(AttributeError, should_raise) 81 82 def test_none_label(self): 83 keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"]) 84 eq_(str(keyed_tuple), "(1, 2, 3)") 85 86 eq_(list(keyed_tuple.keys()), ["a", "b"]) 87 eq_(keyed_tuple._fields, ("a", "b")) 88 eq_(keyed_tuple._asdict(), {"a": 1, "b": 3}) 89 90 # attribute access: can't get at value 2 91 eq_(keyed_tuple.a, 1) 92 eq_(keyed_tuple.b, 3) 93 94 # index access: can get at value 2 95 eq_(keyed_tuple[0], 1) 96 eq_(keyed_tuple[1], 2) 97 eq_(keyed_tuple[2], 3) 98 99 def test_duplicate_labels(self): 100 keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "b"]) 101 eq_(str(keyed_tuple), "(1, 2, 3)") 102 103 eq_(list(keyed_tuple.keys()), ["a", "b", "b"]) 104 eq_(keyed_tuple._fields, ("a", "b", "b")) 105 eq_(keyed_tuple._asdict(), {"a": 1, "b": 3}) 106 107 # attribute access: can't get at value 2 108 eq_(keyed_tuple.a, 1) 109 eq_(keyed_tuple.b, 3) 110 111 # index access: can get at value 2 112 eq_(keyed_tuple[0], 1) 113 eq_(keyed_tuple[1], 2) 114 eq_(keyed_tuple[2], 3) 115 116 def test_immutable(self): 117 keyed_tuple = self._fixture([1, 2], ["a", "b"]) 118 eq_(str(keyed_tuple), "(1, 2)") 119 120 eq_(keyed_tuple.a, 1) 121 122 assert_raises(AttributeError, setattr, keyed_tuple, "a", 5) 123 124 def should_raise(): 125 keyed_tuple[0] = 100 126 127 assert_raises(TypeError, should_raise) 128 129 def test_serialize(self): 130 131 keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"]) 132 133 for loads, dumps in picklers(): 134 kt = loads(dumps(keyed_tuple)) 135 136 eq_(str(kt), "(1, 2, 3)") 137 138 eq_(list(kt.keys()), ["a", "b"]) 139 eq_(kt._fields, ("a", "b")) 140 eq_(kt._asdict(), {"a": 1, "b": 3}) 141 142 143class KeyedTupleTest(_KeyedTupleTest, fixtures.TestBase): 144 def _fixture(self, values, labels): 145 return util.KeyedTuple(values, labels) 146 147 148class LWKeyedTupleTest(_KeyedTupleTest, fixtures.TestBase): 149 def _fixture(self, values, labels): 150 return util.lightweight_named_tuple("n", labels)(values) 151 152 153class WeakSequenceTest(fixtures.TestBase): 154 @testing.requires.predictable_gc 155 def test_cleanout_elements(self): 156 class Foo(object): 157 pass 158 159 f1, f2, f3 = Foo(), Foo(), Foo() 160 w = WeakSequence([f1, f2, f3]) 161 eq_(len(w), 3) 162 eq_(len(w._storage), 3) 163 del f2 164 gc_collect() 165 eq_(len(w), 2) 166 eq_(len(w._storage), 2) 167 168 @testing.requires.predictable_gc 169 def test_cleanout_appended(self): 170 class Foo(object): 171 pass 172 173 f1, f2, f3 = Foo(), Foo(), Foo() 174 w = WeakSequence() 175 w.append(f1) 176 w.append(f2) 177 w.append(f3) 178 eq_(len(w), 3) 179 eq_(len(w._storage), 3) 180 del f2 181 gc_collect() 182 eq_(len(w), 2) 183 eq_(len(w._storage), 2) 184 185 186class OrderedDictTest(fixtures.TestBase): 187 def test_odict(self): 188 o = util.OrderedDict() 189 o["a"] = 1 190 o["b"] = 2 191 o["snack"] = "attack" 192 o["c"] = 3 193 194 eq_(list(o.keys()), ["a", "b", "snack", "c"]) 195 eq_(list(o.values()), [1, 2, "attack", 3]) 196 197 o.pop("snack") 198 eq_(list(o.keys()), ["a", "b", "c"]) 199 eq_(list(o.values()), [1, 2, 3]) 200 201 try: 202 o.pop("eep") 203 assert False 204 except KeyError: 205 pass 206 207 eq_(o.pop("eep", "woot"), "woot") 208 209 try: 210 o.pop("whiff", "bang", "pow") 211 assert False 212 except TypeError: 213 pass 214 215 eq_(list(o.keys()), ["a", "b", "c"]) 216 eq_(list(o.values()), [1, 2, 3]) 217 218 o2 = util.OrderedDict(d=4) 219 o2["e"] = 5 220 221 eq_(list(o2.keys()), ["d", "e"]) 222 eq_(list(o2.values()), [4, 5]) 223 224 o.update(o2) 225 eq_(list(o.keys()), ["a", "b", "c", "d", "e"]) 226 eq_(list(o.values()), [1, 2, 3, 4, 5]) 227 228 o.setdefault("c", "zzz") 229 o.setdefault("f", 6) 230 eq_(list(o.keys()), ["a", "b", "c", "d", "e", "f"]) 231 eq_(list(o.values()), [1, 2, 3, 4, 5, 6]) 232 233 def test_odict_constructor(self): 234 o = util.OrderedDict( 235 [("name", "jbe"), ("fullname", "jonathan"), ("password", "")] 236 ) 237 eq_(list(o.keys()), ["name", "fullname", "password"]) 238 239 def test_odict_copy(self): 240 o = util.OrderedDict() 241 o["zzz"] = 1 242 o["aaa"] = 2 243 eq_(list(o.keys()), ["zzz", "aaa"]) 244 245 o2 = o.copy() 246 eq_(list(o2.keys()), list(o.keys())) 247 248 o3 = copy.copy(o) 249 eq_(list(o3.keys()), list(o.keys())) 250 251 252class OrderedSetTest(fixtures.TestBase): 253 def test_mutators_against_iter(self): 254 # testing a set modified against an iterator 255 o = util.OrderedSet([3, 2, 4, 5]) 256 257 eq_(o.difference(iter([3, 4])), util.OrderedSet([2, 5])) 258 eq_(o.intersection(iter([3, 4, 6])), util.OrderedSet([3, 4])) 259 eq_(o.union(iter([3, 4, 6])), util.OrderedSet([2, 3, 4, 5, 6])) 260 261 262class FrozenDictTest(fixtures.TestBase): 263 def test_serialize(self): 264 d = util.immutabledict({1: 2, 3: 4}) 265 for loads, dumps in picklers(): 266 print(loads(dumps(d))) 267 268 269class MemoizedAttrTest(fixtures.TestBase): 270 def test_memoized_property(self): 271 val = [20] 272 273 class Foo(object): 274 @util.memoized_property 275 def bar(self): 276 v = val[0] 277 val[0] += 1 278 return v 279 280 ne_(Foo.bar, None) 281 f1 = Foo() 282 assert "bar" not in f1.__dict__ 283 eq_(f1.bar, 20) 284 eq_(f1.bar, 20) 285 eq_(val[0], 21) 286 eq_(f1.__dict__["bar"], 20) 287 288 def test_memoized_instancemethod(self): 289 val = [20] 290 291 class Foo(object): 292 @util.memoized_instancemethod 293 def bar(self): 294 v = val[0] 295 val[0] += 1 296 return v 297 298 assert inspect.ismethod(Foo().bar) 299 ne_(Foo.bar, None) 300 f1 = Foo() 301 assert "bar" not in f1.__dict__ 302 eq_(f1.bar(), 20) 303 eq_(f1.bar(), 20) 304 eq_(val[0], 21) 305 306 def test_memoized_slots(self): 307 canary = mock.Mock() 308 309 class Foob(util.MemoizedSlots): 310 __slots__ = ("foo_bar", "gogo") 311 312 def _memoized_method_gogo(self): 313 canary.method() 314 return "gogo" 315 316 def _memoized_attr_foo_bar(self): 317 canary.attr() 318 return "foobar" 319 320 f1 = Foob() 321 assert_raises(AttributeError, setattr, f1, "bar", "bat") 322 323 eq_(f1.foo_bar, "foobar") 324 325 eq_(f1.foo_bar, "foobar") 326 327 eq_(f1.gogo(), "gogo") 328 329 eq_(f1.gogo(), "gogo") 330 331 eq_(canary.mock_calls, [mock.call.attr(), mock.call.method()]) 332 333 334class WrapCallableTest(fixtures.TestBase): 335 def test_wrapping_update_wrapper_fn(self): 336 def my_fancy_default(): 337 """run the fancy default""" 338 return 10 339 340 c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default) 341 342 eq_(c.__name__, "my_fancy_default") 343 eq_(c.__doc__, "run the fancy default") 344 345 def test_wrapping_update_wrapper_fn_nodocstring(self): 346 def my_fancy_default(): 347 return 10 348 349 c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default) 350 eq_(c.__name__, "my_fancy_default") 351 eq_(c.__doc__, None) 352 353 def test_wrapping_update_wrapper_cls(self): 354 class MyFancyDefault(object): 355 """a fancy default""" 356 357 def __call__(self): 358 """run the fancy default""" 359 return 10 360 361 def_ = MyFancyDefault() 362 c = util.wrap_callable(lambda: def_(), def_) 363 364 eq_(c.__name__, "MyFancyDefault") 365 eq_(c.__doc__, "run the fancy default") 366 367 def test_wrapping_update_wrapper_cls_noclsdocstring(self): 368 class MyFancyDefault(object): 369 def __call__(self): 370 """run the fancy default""" 371 return 10 372 373 def_ = MyFancyDefault() 374 c = util.wrap_callable(lambda: def_(), def_) 375 eq_(c.__name__, "MyFancyDefault") 376 eq_(c.__doc__, "run the fancy default") 377 378 def test_wrapping_update_wrapper_cls_nomethdocstring(self): 379 class MyFancyDefault(object): 380 """a fancy default""" 381 382 def __call__(self): 383 return 10 384 385 def_ = MyFancyDefault() 386 c = util.wrap_callable(lambda: def_(), def_) 387 eq_(c.__name__, "MyFancyDefault") 388 eq_(c.__doc__, "a fancy default") 389 390 def test_wrapping_update_wrapper_cls_noclsdocstring_nomethdocstring(self): 391 class MyFancyDefault(object): 392 def __call__(self): 393 return 10 394 395 def_ = MyFancyDefault() 396 c = util.wrap_callable(lambda: def_(), def_) 397 eq_(c.__name__, "MyFancyDefault") 398 eq_(c.__doc__, None) 399 400 def test_wrapping_update_wrapper_functools_parial(self): 401 def my_default(x): 402 return x 403 404 import functools 405 406 my_functools_default = functools.partial(my_default, 5) 407 408 c = util.wrap_callable( 409 lambda: my_functools_default(), my_functools_default 410 ) 411 eq_(c.__name__, "partial") 412 eq_(c.__doc__, my_functools_default.__call__.__doc__) 413 eq_(c(), 5) 414 415 416class ToListTest(fixtures.TestBase): 417 def test_from_string(self): 418 eq_(util.to_list("xyz"), ["xyz"]) 419 420 def test_from_set(self): 421 spec = util.to_list(set([1, 2, 3])) 422 assert isinstance(spec, list) 423 eq_(sorted(spec), [1, 2, 3]) 424 425 def test_from_dict(self): 426 spec = util.to_list({1: "a", 2: "b", 3: "c"}) 427 assert isinstance(spec, list) 428 eq_(sorted(spec), [1, 2, 3]) 429 430 def test_from_tuple(self): 431 eq_(util.to_list((1, 2, 3)), [1, 2, 3]) 432 433 def test_from_bytes(self): 434 435 eq_(util.to_list(compat.b("abc")), [compat.b("abc")]) 436 437 eq_( 438 util.to_list([compat.b("abc"), compat.b("def")]), 439 [compat.b("abc"), compat.b("def")], 440 ) 441 442 443class ColumnCollectionTest(testing.AssertsCompiledSQL, fixtures.TestBase): 444 def test_in(self): 445 cc = sql.ColumnCollection() 446 cc.add(sql.column("col1")) 447 cc.add(sql.column("col2")) 448 cc.add(sql.column("col3")) 449 assert "col1" in cc 450 assert "col2" in cc 451 452 try: 453 cc["col1"] in cc 454 assert False 455 except exc.ArgumentError as e: 456 eq_(str(e), "__contains__ requires a string argument") 457 458 def test_compare(self): 459 cc1 = sql.ColumnCollection() 460 cc2 = sql.ColumnCollection() 461 cc3 = sql.ColumnCollection() 462 c1 = sql.column("col1") 463 c2 = c1.label("col2") 464 c3 = sql.column("col3") 465 cc1.add(c1) 466 cc2.add(c2) 467 cc3.add(c3) 468 assert (cc1 == cc2).compare(c1 == c2) 469 assert not (cc1 == cc3).compare(c2 == c3) 470 471 @testing.emits_warning("Column ") 472 def test_dupes_add(self): 473 cc = sql.ColumnCollection() 474 475 c1, c2a, c3, c2b = ( 476 column("c1"), 477 column("c2"), 478 column("c3"), 479 column("c2"), 480 ) 481 482 cc.add(c1) 483 cc.add(c2a) 484 cc.add(c3) 485 cc.add(c2b) 486 487 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 488 489 # for iter, c2a is replaced by c2b, ordering 490 # is maintained in that way. ideally, iter would be 491 # the same as the "_all_columns" collection. 492 eq_(list(cc), [c1, c2b, c3]) 493 494 assert cc.contains_column(c2a) 495 assert cc.contains_column(c2b) 496 497 ci = cc.as_immutable() 498 eq_(ci._all_columns, [c1, c2a, c3, c2b]) 499 eq_(list(ci), [c1, c2b, c3]) 500 501 def test_identical_dupe_add(self): 502 cc = sql.ColumnCollection() 503 504 c1, c2, c3 = (column("c1"), column("c2"), column("c3")) 505 506 cc.add(c1) 507 cc.add(c2) 508 cc.add(c3) 509 cc.add(c2) 510 511 eq_(cc._all_columns, [c1, c2, c3]) 512 513 self.assert_compile( 514 cc == [c1, c2, c3], "c1 = c1 AND c2 = c2 AND c3 = c3" 515 ) 516 517 # for iter, c2a is replaced by c2b, ordering 518 # is maintained in that way. ideally, iter would be 519 # the same as the "_all_columns" collection. 520 eq_(list(cc), [c1, c2, c3]) 521 522 assert cc.contains_column(c2) 523 524 ci = cc.as_immutable() 525 eq_(ci._all_columns, [c1, c2, c3]) 526 eq_(list(ci), [c1, c2, c3]) 527 528 self.assert_compile( 529 ci == [c1, c2, c3], "c1 = c1 AND c2 = c2 AND c3 = c3" 530 ) 531 532 def test_replace(self): 533 cc = sql.ColumnCollection() 534 535 c1, c2a, c3, c2b = ( 536 column("c1"), 537 column("c2"), 538 column("c3"), 539 column("c2"), 540 ) 541 542 cc.add(c1) 543 cc.add(c2a) 544 cc.add(c3) 545 546 cc.replace(c2b) 547 548 eq_(cc._all_columns, [c1, c2b, c3]) 549 eq_(list(cc), [c1, c2b, c3]) 550 551 assert not cc.contains_column(c2a) 552 assert cc.contains_column(c2b) 553 554 ci = cc.as_immutable() 555 eq_(ci._all_columns, [c1, c2b, c3]) 556 eq_(list(ci), [c1, c2b, c3]) 557 558 def test_replace_key_matches(self): 559 cc = sql.ColumnCollection() 560 561 c1, c2a, c3, c2b = ( 562 column("c1"), 563 column("c2"), 564 column("c3"), 565 column("X"), 566 ) 567 c2b.key = "c2" 568 569 cc.add(c1) 570 cc.add(c2a) 571 cc.add(c3) 572 573 cc.replace(c2b) 574 575 assert not cc.contains_column(c2a) 576 assert cc.contains_column(c2b) 577 578 eq_(cc._all_columns, [c1, c2b, c3]) 579 eq_(list(cc), [c1, c2b, c3]) 580 581 ci = cc.as_immutable() 582 eq_(ci._all_columns, [c1, c2b, c3]) 583 eq_(list(ci), [c1, c2b, c3]) 584 585 def test_replace_name_matches(self): 586 cc = sql.ColumnCollection() 587 588 c1, c2a, c3, c2b = ( 589 column("c1"), 590 column("c2"), 591 column("c3"), 592 column("c2"), 593 ) 594 c2b.key = "X" 595 596 cc.add(c1) 597 cc.add(c2a) 598 cc.add(c3) 599 600 cc.replace(c2b) 601 602 assert not cc.contains_column(c2a) 603 assert cc.contains_column(c2b) 604 605 eq_(cc._all_columns, [c1, c2b, c3]) 606 eq_(list(cc), [c1, c3, c2b]) 607 608 ci = cc.as_immutable() 609 eq_(ci._all_columns, [c1, c2b, c3]) 610 eq_(list(ci), [c1, c3, c2b]) 611 612 def test_replace_no_match(self): 613 cc = sql.ColumnCollection() 614 615 c1, c2, c3, c4 = column("c1"), column("c2"), column("c3"), column("c4") 616 c4.key = "X" 617 618 cc.add(c1) 619 cc.add(c2) 620 cc.add(c3) 621 622 cc.replace(c4) 623 624 assert cc.contains_column(c2) 625 assert cc.contains_column(c4) 626 627 eq_(cc._all_columns, [c1, c2, c3, c4]) 628 eq_(list(cc), [c1, c2, c3, c4]) 629 630 ci = cc.as_immutable() 631 eq_(ci._all_columns, [c1, c2, c3, c4]) 632 eq_(list(ci), [c1, c2, c3, c4]) 633 634 def test_dupes_extend(self): 635 cc = sql.ColumnCollection() 636 637 c1, c2a, c3, c2b = ( 638 column("c1"), 639 column("c2"), 640 column("c3"), 641 column("c2"), 642 ) 643 644 cc.add(c1) 645 cc.add(c2a) 646 647 cc.extend([c3, c2b]) 648 649 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 650 651 # for iter, c2a is replaced by c2b, ordering 652 # is maintained in that way. ideally, iter would be 653 # the same as the "_all_columns" collection. 654 eq_(list(cc), [c1, c2b, c3]) 655 656 assert cc.contains_column(c2a) 657 assert cc.contains_column(c2b) 658 659 ci = cc.as_immutable() 660 eq_(ci._all_columns, [c1, c2a, c3, c2b]) 661 eq_(list(ci), [c1, c2b, c3]) 662 663 def test_dupes_update(self): 664 cc = sql.ColumnCollection() 665 666 c1, c2a, c3, c2b = ( 667 column("c1"), 668 column("c2"), 669 column("c3"), 670 column("c2"), 671 ) 672 673 cc.add(c1) 674 cc.add(c2a) 675 676 cc.update([(c3.key, c3), (c2b.key, c2b)]) 677 678 eq_(cc._all_columns, [c1, c2a, c3, c2b]) 679 680 assert cc.contains_column(c2a) 681 assert cc.contains_column(c2b) 682 683 # for iter, c2a is replaced by c2b, ordering 684 # is maintained in that way. ideally, iter would be 685 # the same as the "_all_columns" collection. 686 eq_(list(cc), [c1, c2b, c3]) 687 688 def test_extend_existing(self): 689 cc = sql.ColumnCollection() 690 691 c1, c2, c3, c4, c5 = ( 692 column("c1"), 693 column("c2"), 694 column("c3"), 695 column("c4"), 696 column("c5"), 697 ) 698 699 cc.extend([c1, c2]) 700 eq_(cc._all_columns, [c1, c2]) 701 702 cc.extend([c3]) 703 eq_(cc._all_columns, [c1, c2, c3]) 704 cc.extend([c4, c2, c5]) 705 706 eq_(cc._all_columns, [c1, c2, c3, c4, c5]) 707 708 def test_update_existing(self): 709 cc = sql.ColumnCollection() 710 711 c1, c2, c3, c4, c5 = ( 712 column("c1"), 713 column("c2"), 714 column("c3"), 715 column("c4"), 716 column("c5"), 717 ) 718 719 cc.update([("c1", c1), ("c2", c2)]) 720 eq_(cc._all_columns, [c1, c2]) 721 722 cc.update([("c3", c3)]) 723 eq_(cc._all_columns, [c1, c2, c3]) 724 cc.update([("c4", c4), ("c2", c2), ("c5", c5)]) 725 726 eq_(cc._all_columns, [c1, c2, c3, c4, c5]) 727 728 729class LRUTest(fixtures.TestBase): 730 def test_lru(self): 731 class item(object): 732 def __init__(self, id_): 733 self.id = id_ 734 735 def __str__(self): 736 return "item id %d" % self.id 737 738 lru = util.LRUCache(10, threshold=0.2) 739 740 for id_ in range(1, 20): 741 lru[id_] = item(id_) 742 743 # first couple of items should be gone 744 assert 1 not in lru 745 assert 2 not in lru 746 747 # next batch over the threshold of 10 should be present 748 for id_ in range(11, 20): 749 assert id_ in lru 750 751 lru[12] 752 lru[15] 753 lru[23] = item(23) 754 lru[24] = item(24) 755 lru[25] = item(25) 756 lru[26] = item(26) 757 lru[27] = item(27) 758 759 assert 11 not in lru 760 assert 13 not in lru 761 762 for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15): 763 assert id_ in lru 764 765 i1 = lru[25] 766 i2 = item(25) 767 lru[25] = i2 768 assert 25 in lru 769 assert lru[25] is i2 770 771 772class ImmutableSubclass(str): 773 pass 774 775 776class FlattenIteratorTest(fixtures.TestBase): 777 def test_flatten(self): 778 assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7, 8])) == [ 779 1, 780 2, 781 3, 782 4, 783 5, 784 6, 785 7, 786 8, 787 ] 788 789 def test_str_with_iter(self): 790 """ensure that a str object with an __iter__ method (like in 791 PyPy) is not interpreted as an iterable. 792 793 """ 794 795 class IterString(str): 796 def __iter__(self): 797 return iter(self + "") 798 799 iter_list = [IterString("asdf"), [IterString("x"), IterString("y")]] 800 801 assert list(util.flatten_iterator(iter_list)) == ["asdf", "x", "y"] 802 803 804class HashOverride(object): 805 def __init__(self, value=None): 806 self.value = value 807 808 def __hash__(self): 809 return hash(self.value) 810 811 812class EqOverride(object): 813 def __init__(self, value=None): 814 self.value = value 815 816 __hash__ = object.__hash__ 817 818 def __eq__(self, other): 819 if isinstance(other, EqOverride): 820 return self.value == other.value 821 else: 822 return False 823 824 def __ne__(self, other): 825 if isinstance(other, EqOverride): 826 return self.value != other.value 827 else: 828 return True 829 830 831class HashEqOverride(object): 832 def __init__(self, value=None): 833 self.value = value 834 835 def __hash__(self): 836 return hash(self.value) 837 838 def __eq__(self, other): 839 if isinstance(other, EqOverride): 840 return self.value == other.value 841 else: 842 return False 843 844 def __ne__(self, other): 845 if isinstance(other, EqOverride): 846 return self.value != other.value 847 else: 848 return True 849 850 851class IdentitySetTest(fixtures.TestBase): 852 def assert_eq(self, identityset, expected_iterable): 853 expected = sorted([id(o) for o in expected_iterable]) 854 found = sorted([id(o) for o in identityset]) 855 eq_(found, expected) 856 857 def test_init(self): 858 ids = util.IdentitySet([1, 2, 3, 2, 1]) 859 self.assert_eq(ids, [1, 2, 3]) 860 861 ids = util.IdentitySet(ids) 862 self.assert_eq(ids, [1, 2, 3]) 863 864 ids = util.IdentitySet() 865 self.assert_eq(ids, []) 866 867 ids = util.IdentitySet([]) 868 self.assert_eq(ids, []) 869 870 ids = util.IdentitySet(ids) 871 self.assert_eq(ids, []) 872 873 def test_add(self): 874 for type_ in (object, ImmutableSubclass): 875 data = [type_(), type_()] 876 ids = util.IdentitySet() 877 for i in list(range(2)) + list(range(2)): 878 ids.add(data[i]) 879 self.assert_eq(ids, data) 880 881 for type_ in (EqOverride, HashOverride, HashEqOverride): 882 data = [type_(1), type_(1), type_(2)] 883 ids = util.IdentitySet() 884 for i in list(range(3)) + list(range(3)): 885 ids.add(data[i]) 886 self.assert_eq(ids, data) 887 888 def test_dunder_sub2(self): 889 IdentitySet = util.IdentitySet 890 o1, o2, o3 = object(), object(), object() 891 ids1 = IdentitySet([o1]) 892 ids2 = IdentitySet([o1, o2, o3]) 893 eq_(ids2 - ids1, IdentitySet([o2, o3])) 894 895 ids2 -= ids1 896 eq_(ids2, IdentitySet([o2, o3])) 897 898 def test_dunder_eq(self): 899 _, _, twin1, twin2, unique1, unique2 = self._create_sets() 900 901 # basic set math 902 eq_(twin1 == twin2, True) 903 eq_(unique1 == unique2, False) 904 905 # not an IdentitySet 906 not_an_identity_set = object() 907 eq_(unique1 == not_an_identity_set, False) 908 909 def test_dunder_ne(self): 910 _, _, twin1, twin2, unique1, unique2 = self._create_sets() 911 912 # basic set math 913 eq_(twin1 != twin2, False) 914 eq_(unique1 != unique2, True) 915 916 # not an IdentitySet 917 not_an_identity_set = object() 918 eq_(unique1 != not_an_identity_set, True) 919 920 def test_dunder_le(self): 921 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 922 923 # basic set math 924 eq_(sub_ <= super_, True) 925 eq_(super_ <= sub_, False) 926 927 # the same sets 928 eq_(twin1 <= twin2, True) 929 eq_(twin2 <= twin1, True) 930 931 # totally different sets 932 eq_(unique1 <= unique2, False) 933 eq_(unique2 <= unique1, False) 934 935 # not an IdentitySet 936 def should_raise(): 937 not_an_identity_set = object() 938 return unique1 <= not_an_identity_set 939 940 self._assert_unorderable_types(should_raise) 941 942 def test_dunder_lt(self): 943 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 944 945 # basic set math 946 eq_(sub_ < super_, True) 947 eq_(super_ < sub_, False) 948 949 # the same sets 950 eq_(twin1 < twin2, False) 951 eq_(twin2 < twin1, False) 952 953 # totally different sets 954 eq_(unique1 < unique2, False) 955 eq_(unique2 < unique1, False) 956 957 # not an IdentitySet 958 def should_raise(): 959 not_an_identity_set = object() 960 return unique1 < not_an_identity_set 961 962 self._assert_unorderable_types(should_raise) 963 964 def test_dunder_ge(self): 965 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 966 967 # basic set math 968 eq_(sub_ >= super_, False) 969 eq_(super_ >= sub_, True) 970 971 # the same sets 972 eq_(twin1 >= twin2, True) 973 eq_(twin2 >= twin1, True) 974 975 # totally different sets 976 eq_(unique1 >= unique2, False) 977 eq_(unique2 >= unique1, False) 978 979 # not an IdentitySet 980 def should_raise(): 981 not_an_identity_set = object() 982 return unique1 >= not_an_identity_set 983 984 self._assert_unorderable_types(should_raise) 985 986 def test_dunder_gt(self): 987 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 988 989 # basic set math 990 eq_(sub_ > super_, False) 991 eq_(super_ > sub_, True) 992 993 # the same sets 994 eq_(twin1 > twin2, False) 995 eq_(twin2 > twin1, False) 996 997 # totally different sets 998 eq_(unique1 > unique2, False) 999 eq_(unique2 > unique1, False) 1000 1001 # not an IdentitySet 1002 def should_raise(): 1003 not_an_identity_set = object() 1004 return unique1 > not_an_identity_set 1005 1006 self._assert_unorderable_types(should_raise) 1007 1008 def test_issubset(self): 1009 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1010 1011 # basic set math 1012 eq_(sub_.issubset(super_), True) 1013 eq_(super_.issubset(sub_), False) 1014 1015 # the same sets 1016 eq_(twin1.issubset(twin2), True) 1017 eq_(twin2.issubset(twin1), True) 1018 1019 # totally different sets 1020 eq_(unique1.issubset(unique2), False) 1021 eq_(unique2.issubset(unique1), False) 1022 1023 # not an IdentitySet 1024 not_an_identity_set = object() 1025 assert_raises(TypeError, unique1.issubset, not_an_identity_set) 1026 1027 def test_issuperset(self): 1028 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1029 1030 # basic set math 1031 eq_(sub_.issuperset(super_), False) 1032 eq_(super_.issuperset(sub_), True) 1033 1034 # the same sets 1035 eq_(twin1.issuperset(twin2), True) 1036 eq_(twin2.issuperset(twin1), True) 1037 1038 # totally different sets 1039 eq_(unique1.issuperset(unique2), False) 1040 eq_(unique2.issuperset(unique1), False) 1041 1042 # not an IdentitySet 1043 not_an_identity_set = object() 1044 assert_raises(TypeError, unique1.issuperset, not_an_identity_set) 1045 1046 def test_union(self): 1047 super_, sub_, twin1, twin2, _, _ = self._create_sets() 1048 1049 # basic set math 1050 eq_(sub_.union(super_), super_) 1051 eq_(super_.union(sub_), super_) 1052 1053 # the same sets 1054 eq_(twin1.union(twin2), twin1) 1055 eq_(twin2.union(twin1), twin1) 1056 1057 # empty sets 1058 empty = util.IdentitySet([]) 1059 eq_(empty.union(empty), empty) 1060 1061 # totally different sets 1062 unique1 = util.IdentitySet([1]) 1063 unique2 = util.IdentitySet([2]) 1064 eq_(unique1.union(unique2), util.IdentitySet([1, 2])) 1065 1066 # not an IdentitySet 1067 not_an_identity_set = object() 1068 assert_raises(TypeError, unique1.union, not_an_identity_set) 1069 1070 def test_dunder_or(self): 1071 super_, sub_, twin1, twin2, _, _ = self._create_sets() 1072 1073 # basic set math 1074 eq_(sub_ | super_, super_) 1075 eq_(super_ | sub_, super_) 1076 1077 # the same sets 1078 eq_(twin1 | twin2, twin1) 1079 eq_(twin2 | twin1, twin1) 1080 1081 # empty sets 1082 empty = util.IdentitySet([]) 1083 eq_(empty | empty, empty) 1084 1085 # totally different sets 1086 unique1 = util.IdentitySet([1]) 1087 unique2 = util.IdentitySet([2]) 1088 eq_(unique1 | unique2, util.IdentitySet([1, 2])) 1089 1090 # not an IdentitySet 1091 def should_raise(): 1092 not_an_identity_set = object() 1093 return unique1 | not_an_identity_set 1094 1095 assert_raises(TypeError, should_raise) 1096 1097 def test_update(self): 1098 pass # TODO 1099 1100 def test_dunder_ior(self): 1101 super_, sub_, _, _, _, _ = self._create_sets() 1102 1103 # basic set math 1104 sub_ |= super_ 1105 eq_(sub_, super_) 1106 super_ |= sub_ 1107 eq_(super_, super_) 1108 1109 # totally different sets 1110 unique1 = util.IdentitySet([1]) 1111 unique2 = util.IdentitySet([2]) 1112 unique1 |= unique2 1113 eq_(unique1, util.IdentitySet([1, 2])) 1114 eq_(unique2, util.IdentitySet([2])) 1115 1116 # not an IdentitySet 1117 def should_raise(): 1118 unique = util.IdentitySet([1]) 1119 not_an_identity_set = object() 1120 unique |= not_an_identity_set 1121 1122 assert_raises(TypeError, should_raise) 1123 1124 def test_difference(self): 1125 _, _, twin1, twin2, _, _ = self._create_sets() 1126 1127 # basic set math 1128 set1 = util.IdentitySet([1, 2, 3]) 1129 set2 = util.IdentitySet([2, 3, 4]) 1130 eq_(set1.difference(set2), util.IdentitySet([1])) 1131 eq_(set2.difference(set1), util.IdentitySet([4])) 1132 1133 # empty sets 1134 empty = util.IdentitySet([]) 1135 eq_(empty.difference(empty), empty) 1136 1137 # the same sets 1138 eq_(twin1.difference(twin2), empty) 1139 eq_(twin2.difference(twin1), empty) 1140 1141 # totally different sets 1142 unique1 = util.IdentitySet([1]) 1143 unique2 = util.IdentitySet([2]) 1144 eq_(unique1.difference(unique2), util.IdentitySet([1])) 1145 eq_(unique2.difference(unique1), util.IdentitySet([2])) 1146 1147 # not an IdentitySet 1148 not_an_identity_set = object() 1149 assert_raises(TypeError, unique1.difference, not_an_identity_set) 1150 1151 def test_dunder_sub(self): 1152 _, _, twin1, twin2, _, _ = self._create_sets() 1153 1154 # basic set math 1155 set1 = util.IdentitySet([1, 2, 3]) 1156 set2 = util.IdentitySet([2, 3, 4]) 1157 eq_(set1 - set2, util.IdentitySet([1])) 1158 eq_(set2 - set1, util.IdentitySet([4])) 1159 1160 # empty sets 1161 empty = util.IdentitySet([]) 1162 eq_(empty - empty, empty) 1163 1164 # the same sets 1165 eq_(twin1 - twin2, empty) 1166 eq_(twin2 - twin1, empty) 1167 1168 # totally different sets 1169 unique1 = util.IdentitySet([1]) 1170 unique2 = util.IdentitySet([2]) 1171 eq_(unique1 - unique2, util.IdentitySet([1])) 1172 eq_(unique2 - unique1, util.IdentitySet([2])) 1173 1174 # not an IdentitySet 1175 def should_raise(): 1176 not_an_identity_set = object() 1177 unique1 - not_an_identity_set 1178 1179 assert_raises(TypeError, should_raise) 1180 1181 def test_difference_update(self): 1182 pass # TODO 1183 1184 def test_dunder_isub(self): 1185 pass # TODO 1186 1187 def test_intersection(self): 1188 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1189 1190 # basic set math 1191 eq_(sub_.intersection(super_), sub_) 1192 eq_(super_.intersection(sub_), sub_) 1193 1194 # the same sets 1195 eq_(twin1.intersection(twin2), twin1) 1196 eq_(twin2.intersection(twin1), twin1) 1197 1198 # empty sets 1199 empty = util.IdentitySet([]) 1200 eq_(empty.intersection(empty), empty) 1201 1202 # totally different sets 1203 eq_(unique1.intersection(unique2), empty) 1204 1205 # not an IdentitySet 1206 not_an_identity_set = object() 1207 assert_raises(TypeError, unique1.intersection, not_an_identity_set) 1208 1209 def test_dunder_and(self): 1210 super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets() 1211 1212 # basic set math 1213 eq_(sub_ & super_, sub_) 1214 eq_(super_ & sub_, sub_) 1215 1216 # the same sets 1217 eq_(twin1 & twin2, twin1) 1218 eq_(twin2 & twin1, twin1) 1219 1220 # empty sets 1221 empty = util.IdentitySet([]) 1222 eq_(empty & empty, empty) 1223 1224 # totally different sets 1225 eq_(unique1 & unique2, empty) 1226 1227 # not an IdentitySet 1228 def should_raise(): 1229 not_an_identity_set = object() 1230 return unique1 & not_an_identity_set 1231 1232 assert_raises(TypeError, should_raise) 1233 1234 def test_intersection_update(self): 1235 pass # TODO 1236 1237 def test_dunder_iand(self): 1238 pass # TODO 1239 1240 def test_symmetric_difference(self): 1241 _, _, twin1, twin2, _, _ = self._create_sets() 1242 1243 # basic set math 1244 set1 = util.IdentitySet([1, 2, 3]) 1245 set2 = util.IdentitySet([2, 3, 4]) 1246 eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4])) 1247 eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4])) 1248 1249 # empty sets 1250 empty = util.IdentitySet([]) 1251 eq_(empty.symmetric_difference(empty), empty) 1252 1253 # the same sets 1254 eq_(twin1.symmetric_difference(twin2), empty) 1255 eq_(twin2.symmetric_difference(twin1), empty) 1256 1257 # totally different sets 1258 unique1 = util.IdentitySet([1]) 1259 unique2 = util.IdentitySet([2]) 1260 eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2])) 1261 eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2])) 1262 1263 # not an IdentitySet 1264 not_an_identity_set = object() 1265 assert_raises( 1266 TypeError, unique1.symmetric_difference, not_an_identity_set 1267 ) 1268 1269 def test_dunder_xor(self): 1270 _, _, twin1, twin2, _, _ = self._create_sets() 1271 1272 # basic set math 1273 set1 = util.IdentitySet([1, 2, 3]) 1274 set2 = util.IdentitySet([2, 3, 4]) 1275 eq_(set1 ^ set2, util.IdentitySet([1, 4])) 1276 eq_(set2 ^ set1, util.IdentitySet([1, 4])) 1277 1278 # empty sets 1279 empty = util.IdentitySet([]) 1280 eq_(empty ^ empty, empty) 1281 1282 # the same sets 1283 eq_(twin1 ^ twin2, empty) 1284 eq_(twin2 ^ twin1, empty) 1285 1286 # totally different sets 1287 unique1 = util.IdentitySet([1]) 1288 unique2 = util.IdentitySet([2]) 1289 eq_(unique1 ^ unique2, util.IdentitySet([1, 2])) 1290 eq_(unique2 ^ unique1, util.IdentitySet([1, 2])) 1291 1292 # not an IdentitySet 1293 def should_raise(): 1294 not_an_identity_set = object() 1295 return unique1 ^ not_an_identity_set 1296 1297 assert_raises(TypeError, should_raise) 1298 1299 def test_symmetric_difference_update(self): 1300 pass # TODO 1301 1302 def _create_sets(self): 1303 o1, o2, o3, o4, o5 = object(), object(), object(), object(), object() 1304 super_ = util.IdentitySet([o1, o2, o3]) 1305 sub_ = util.IdentitySet([o2]) 1306 twin1 = util.IdentitySet([o3]) 1307 twin2 = util.IdentitySet([o3]) 1308 unique1 = util.IdentitySet([o4]) 1309 unique2 = util.IdentitySet([o5]) 1310 return super_, sub_, twin1, twin2, unique1, unique2 1311 1312 def _assert_unorderable_types(self, callable_): 1313 if util.py36: 1314 assert_raises_message( 1315 TypeError, "not supported between instances of", callable_ 1316 ) 1317 elif util.py3k: 1318 assert_raises_message(TypeError, "unorderable types", callable_) 1319 else: 1320 assert_raises_message( 1321 TypeError, "cannot compare sets using cmp()", callable_ 1322 ) 1323 1324 def test_basic_sanity(self): 1325 IdentitySet = util.IdentitySet 1326 1327 o1, o2, o3 = object(), object(), object() 1328 ids = IdentitySet([o1]) 1329 ids.discard(o1) 1330 ids.discard(o1) 1331 ids.add(o1) 1332 ids.remove(o1) 1333 assert_raises(KeyError, ids.remove, o1) 1334 1335 eq_(ids.copy(), ids) 1336 1337 # explicit __eq__ and __ne__ tests 1338 assert ids != None # noqa 1339 assert not (ids == None) # noqa 1340 1341 ne_(ids, IdentitySet([o1, o2, o3])) 1342 ids.clear() 1343 assert o1 not in ids 1344 ids.add(o2) 1345 assert o2 in ids 1346 eq_(ids.pop(), o2) 1347 ids.add(o1) 1348 eq_(len(ids), 1) 1349 1350 isuper = IdentitySet([o1, o2]) 1351 assert ids < isuper 1352 assert ids.issubset(isuper) 1353 assert isuper.issuperset(ids) 1354 assert isuper > ids 1355 1356 eq_(ids.union(isuper), isuper) 1357 eq_(ids | isuper, isuper) 1358 eq_(isuper - ids, IdentitySet([o2])) 1359 eq_(isuper.difference(ids), IdentitySet([o2])) 1360 eq_(ids.intersection(isuper), IdentitySet([o1])) 1361 eq_(ids & isuper, IdentitySet([o1])) 1362 eq_(ids.symmetric_difference(isuper), IdentitySet([o2])) 1363 eq_(ids ^ isuper, IdentitySet([o2])) 1364 1365 ids.update(isuper) 1366 ids |= isuper 1367 ids.difference_update(isuper) 1368 ids -= isuper 1369 ids.intersection_update(isuper) 1370 ids &= isuper 1371 ids.symmetric_difference_update(isuper) 1372 ids ^= isuper 1373 1374 ids.update("foobar") 1375 try: 1376 ids |= "foobar" 1377 assert False 1378 except TypeError: 1379 assert True 1380 1381 try: 1382 s = set([o1, o2]) 1383 s |= ids 1384 assert False 1385 except TypeError: 1386 assert True 1387 1388 assert_raises(TypeError, util.cmp, ids) 1389 assert_raises(TypeError, hash, ids) 1390 1391 1392class OrderedIdentitySetTest(fixtures.TestBase): 1393 def assert_eq(self, identityset, expected_iterable): 1394 expected = [id(o) for o in expected_iterable] 1395 found = [id(o) for o in identityset] 1396 eq_(found, expected) 1397 1398 def test_add(self): 1399 elem = object 1400 s = util.OrderedIdentitySet() 1401 s.add(elem()) 1402 s.add(elem()) 1403 1404 def test_intersection(self): 1405 elem = object 1406 eq_ = self.assert_eq 1407 1408 a, b, c, d, e, f, g = ( 1409 elem(), 1410 elem(), 1411 elem(), 1412 elem(), 1413 elem(), 1414 elem(), 1415 elem(), 1416 ) 1417 1418 s1 = util.OrderedIdentitySet([a, b, c]) 1419 s2 = util.OrderedIdentitySet([d, e, f]) 1420 s3 = util.OrderedIdentitySet([a, d, f, g]) 1421 eq_(s1.intersection(s2), []) 1422 eq_(s1.intersection(s3), [a]) 1423 eq_(s1.union(s2).intersection(s3), [a, d, f]) 1424 1425 1426class DictlikeIteritemsTest(fixtures.TestBase): 1427 baseline = set([("a", 1), ("b", 2), ("c", 3)]) 1428 1429 def _ok(self, instance): 1430 iterator = util.dictlike_iteritems(instance) 1431 eq_(set(iterator), self.baseline) 1432 1433 def _notok(self, instance): 1434 assert_raises(TypeError, util.dictlike_iteritems, instance) 1435 1436 def test_dict(self): 1437 d = dict(a=1, b=2, c=3) 1438 self._ok(d) 1439 1440 def test_subdict(self): 1441 class subdict(dict): 1442 pass 1443 1444 d = subdict(a=1, b=2, c=3) 1445 self._ok(d) 1446 1447 if util.py2k: 1448 1449 def test_UserDict(self): 1450 import UserDict 1451 1452 d = UserDict.UserDict(a=1, b=2, c=3) 1453 self._ok(d) 1454 1455 def test_object(self): 1456 self._notok(object()) 1457 1458 if util.py2k: 1459 1460 def test_duck_1(self): 1461 class duck1(object): 1462 def iteritems(duck): 1463 return iter(self.baseline) 1464 1465 self._ok(duck1()) 1466 1467 def test_duck_2(self): 1468 class duck2(object): 1469 def items(duck): 1470 return list(self.baseline) 1471 1472 self._ok(duck2()) 1473 1474 if util.py2k: 1475 1476 def test_duck_3(self): 1477 class duck3(object): 1478 def iterkeys(duck): 1479 return iter(["a", "b", "c"]) 1480 1481 def __getitem__(duck, key): 1482 return dict(a=1, b=2, c=3).get(key) 1483 1484 self._ok(duck3()) 1485 1486 def test_duck_4(self): 1487 class duck4(object): 1488 def iterkeys(duck): 1489 return iter(["a", "b", "c"]) 1490 1491 self._notok(duck4()) 1492 1493 def test_duck_5(self): 1494 class duck5(object): 1495 def keys(duck): 1496 return ["a", "b", "c"] 1497 1498 def get(duck, key): 1499 return dict(a=1, b=2, c=3).get(key) 1500 1501 self._ok(duck5()) 1502 1503 def test_duck_6(self): 1504 class duck6(object): 1505 def keys(duck): 1506 return ["a", "b", "c"] 1507 1508 self._notok(duck6()) 1509 1510 1511class DuckTypeCollectionTest(fixtures.TestBase): 1512 def test_sets(self): 1513 class SetLike(object): 1514 def add(self): 1515 pass 1516 1517 class ForcedSet(list): 1518 __emulates__ = set 1519 1520 for type_ in (set, SetLike, ForcedSet): 1521 eq_(util.duck_type_collection(type_), set) 1522 instance = type_() 1523 eq_(util.duck_type_collection(instance), set) 1524 1525 for type_ in (frozenset,): 1526 is_(util.duck_type_collection(type_), None) 1527 instance = type_() 1528 is_(util.duck_type_collection(instance), None) 1529 1530 1531class PublicFactoryTest(fixtures.TestBase): 1532 def _fixture(self): 1533 class Thingy(object): 1534 def __init__(self, value): 1535 "make a thingy" 1536 self.value = value 1537 1538 @classmethod 1539 def foobar(cls, x, y): 1540 "do the foobar" 1541 return Thingy(x + y) 1542 1543 return Thingy 1544 1545 def test_classmethod(self): 1546 Thingy = self._fixture() 1547 foob = langhelpers.public_factory(Thingy.foobar, ".sql.elements.foob") 1548 eq_(foob(3, 4).value, 7) 1549 eq_(foob(x=3, y=4).value, 7) 1550 eq_(foob.__doc__, "do the foobar") 1551 eq_(foob.__module__, "sqlalchemy.sql.elements") 1552 assert Thingy.foobar.__doc__.startswith("This function is mirrored;") 1553 1554 def test_constructor(self): 1555 Thingy = self._fixture() 1556 foob = langhelpers.public_factory(Thingy, ".sql.elements.foob") 1557 eq_(foob(7).value, 7) 1558 eq_(foob(value=7).value, 7) 1559 eq_(foob.__doc__, "make a thingy") 1560 eq_(foob.__module__, "sqlalchemy.sql.elements") 1561 assert Thingy.__init__.__doc__.startswith( 1562 "Construct a new :class:`.Thingy` object." 1563 ) 1564 1565 1566class ArgInspectionTest(fixtures.TestBase): 1567 def test_get_cls_kwargs(self): 1568 class A(object): 1569 def __init__(self, a): 1570 pass 1571 1572 class A1(A): 1573 def __init__(self, a1): 1574 pass 1575 1576 class A11(A1): 1577 def __init__(self, a11, **kw): 1578 pass 1579 1580 class B(object): 1581 def __init__(self, b, **kw): 1582 pass 1583 1584 class B1(B): 1585 def __init__(self, b1, **kw): 1586 pass 1587 1588 class B2(B): 1589 def __init__(self, b2): 1590 pass 1591 1592 class AB(A, B): 1593 def __init__(self, ab): 1594 pass 1595 1596 class BA(B, A): 1597 def __init__(self, ba, **kwargs): 1598 pass 1599 1600 class BA1(BA): 1601 pass 1602 1603 class CAB(A, B): 1604 pass 1605 1606 class CBA(B, A): 1607 pass 1608 1609 class CB1A1(B1, A1): 1610 pass 1611 1612 class CAB1(A, B1): 1613 pass 1614 1615 class CB1A(B1, A): 1616 pass 1617 1618 class CB2A(B2, A): 1619 pass 1620 1621 class D(object): 1622 pass 1623 1624 class BA2(B, A): 1625 pass 1626 1627 class A11B1(A11, B1): 1628 pass 1629 1630 def test(cls, *expected): 1631 eq_(set(util.get_cls_kwargs(cls)), set(expected)) 1632 1633 test(A, "a") 1634 test(A1, "a1") 1635 test(A11, "a11", "a1") 1636 test(B, "b") 1637 test(B1, "b1", "b") 1638 test(AB, "ab") 1639 test(BA, "ba", "b", "a") 1640 test(BA1, "ba", "b", "a") 1641 test(CAB, "a") 1642 test(CBA, "b", "a") 1643 test(CAB1, "a") 1644 test(CB1A, "b1", "b", "a") 1645 test(CB2A, "b2") 1646 test(CB1A1, "a1", "b1", "b") 1647 test(D) 1648 test(BA2, "a", "b") 1649 test(A11B1, "a1", "a11", "b", "b1") 1650 1651 def test_get_func_kwargs(self): 1652 def f1(): 1653 pass 1654 1655 def f2(foo): 1656 pass 1657 1658 def f3(*foo): 1659 pass 1660 1661 def f4(**foo): 1662 pass 1663 1664 def test(fn, *expected): 1665 eq_(set(util.get_func_kwargs(fn)), set(expected)) 1666 1667 test(f1) 1668 test(f2, "foo") 1669 test(f3) 1670 test(f4) 1671 1672 def test_callable_argspec_fn(self): 1673 def foo(x, y, **kw): 1674 pass 1675 1676 eq_(get_callable_argspec(foo), (["x", "y"], None, "kw", None)) 1677 1678 def test_callable_argspec_fn_no_self(self): 1679 def foo(x, y, **kw): 1680 pass 1681 1682 eq_( 1683 get_callable_argspec(foo, no_self=True), 1684 (["x", "y"], None, "kw", None), 1685 ) 1686 1687 def test_callable_argspec_fn_no_self_but_self(self): 1688 def foo(self, x, y, **kw): 1689 pass 1690 1691 eq_( 1692 get_callable_argspec(foo, no_self=True), 1693 (["self", "x", "y"], None, "kw", None), 1694 ) 1695 1696 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1697 def test_callable_argspec_py_builtin(self): 1698 import datetime 1699 1700 assert_raises(TypeError, get_callable_argspec, datetime.datetime.now) 1701 1702 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1703 def test_callable_argspec_obj_init(self): 1704 assert_raises(TypeError, get_callable_argspec, object) 1705 1706 def test_callable_argspec_method(self): 1707 class Foo(object): 1708 def foo(self, x, y, **kw): 1709 pass 1710 1711 eq_( 1712 get_callable_argspec(Foo.foo), 1713 (["self", "x", "y"], None, "kw", None), 1714 ) 1715 1716 def test_callable_argspec_instance_method_no_self(self): 1717 class Foo(object): 1718 def foo(self, x, y, **kw): 1719 pass 1720 1721 eq_( 1722 get_callable_argspec(Foo().foo, no_self=True), 1723 (["x", "y"], None, "kw", None), 1724 ) 1725 1726 def test_callable_argspec_unbound_method_no_self(self): 1727 class Foo(object): 1728 def foo(self, x, y, **kw): 1729 pass 1730 1731 eq_( 1732 get_callable_argspec(Foo.foo, no_self=True), 1733 (["self", "x", "y"], None, "kw", None), 1734 ) 1735 1736 def test_callable_argspec_init(self): 1737 class Foo(object): 1738 def __init__(self, x, y): 1739 pass 1740 1741 eq_(get_callable_argspec(Foo), (["self", "x", "y"], None, None, None)) 1742 1743 def test_callable_argspec_init_no_self(self): 1744 class Foo(object): 1745 def __init__(self, x, y): 1746 pass 1747 1748 eq_( 1749 get_callable_argspec(Foo, no_self=True), 1750 (["x", "y"], None, None, None), 1751 ) 1752 1753 def test_callable_argspec_call(self): 1754 class Foo(object): 1755 def __call__(self, x, y): 1756 pass 1757 1758 eq_( 1759 get_callable_argspec(Foo()), (["self", "x", "y"], None, None, None) 1760 ) 1761 1762 def test_callable_argspec_call_no_self(self): 1763 class Foo(object): 1764 def __call__(self, x, y): 1765 pass 1766 1767 eq_( 1768 get_callable_argspec(Foo(), no_self=True), 1769 (["x", "y"], None, None, None), 1770 ) 1771 1772 @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw") 1773 def test_callable_argspec_partial(self): 1774 from functools import partial 1775 1776 def foo(x, y, z, **kw): 1777 pass 1778 1779 bar = partial(foo, 5) 1780 1781 assert_raises(TypeError, get_callable_argspec, bar) 1782 1783 1784class SymbolTest(fixtures.TestBase): 1785 def test_basic(self): 1786 sym1 = util.symbol("foo") 1787 assert sym1.name == "foo" 1788 sym2 = util.symbol("foo") 1789 1790 assert sym1 is sym2 1791 assert sym1 == sym2 1792 1793 sym3 = util.symbol("bar") 1794 assert sym1 is not sym3 1795 assert sym1 != sym3 1796 1797 def test_pickle(self): 1798 sym1 = util.symbol("foo") 1799 sym2 = util.symbol("foo") 1800 1801 assert sym1 is sym2 1802 1803 # default 1804 s = util.pickle.dumps(sym1) 1805 sym3 = util.pickle.loads(s) 1806 1807 for protocol in 0, 1, 2: 1808 print(protocol) 1809 serial = util.pickle.dumps(sym1) 1810 rt = util.pickle.loads(serial) 1811 assert rt is sym1 1812 assert rt is sym2 1813 1814 def test_bitflags(self): 1815 sym1 = util.symbol("sym1", canonical=1) 1816 sym2 = util.symbol("sym2", canonical=2) 1817 1818 assert sym1 & sym1 1819 assert not sym1 & sym2 1820 assert not sym1 & sym1 & sym2 1821 1822 def test_composites(self): 1823 sym1 = util.symbol("sym1", canonical=1) 1824 sym2 = util.symbol("sym2", canonical=2) 1825 sym3 = util.symbol("sym3", canonical=4) 1826 sym4 = util.symbol("sym4", canonical=8) 1827 1828 assert sym1 & (sym2 | sym1 | sym4) 1829 assert not sym1 & (sym2 | sym3) 1830 1831 assert not (sym1 | sym2) & (sym3 | sym4) 1832 assert (sym1 | sym2) & (sym2 | sym4) 1833 1834 1835class TestFormatArgspec(fixtures.TestBase): 1836 def test_specs(self): 1837 def test(fn, wanted, grouped=None): 1838 if grouped is None: 1839 parsed = util.format_argspec_plus(fn) 1840 else: 1841 parsed = util.format_argspec_plus(fn, grouped=grouped) 1842 eq_(parsed, wanted) 1843 1844 test( 1845 lambda: None, 1846 { 1847 "args": "()", 1848 "self_arg": None, 1849 "apply_kw": "()", 1850 "apply_pos": "()", 1851 }, 1852 ) 1853 1854 test( 1855 lambda: None, 1856 {"args": "", "self_arg": None, "apply_kw": "", "apply_pos": ""}, 1857 grouped=False, 1858 ) 1859 1860 test( 1861 lambda self: None, 1862 { 1863 "args": "(self)", 1864 "self_arg": "self", 1865 "apply_kw": "(self)", 1866 "apply_pos": "(self)", 1867 }, 1868 ) 1869 1870 test( 1871 lambda self: None, 1872 { 1873 "args": "self", 1874 "self_arg": "self", 1875 "apply_kw": "self", 1876 "apply_pos": "self", 1877 }, 1878 grouped=False, 1879 ) 1880 1881 test( 1882 lambda *a: None, 1883 { 1884 "args": "(*a)", 1885 "self_arg": "a[0]", 1886 "apply_kw": "(*a)", 1887 "apply_pos": "(*a)", 1888 }, 1889 ) 1890 1891 test( 1892 lambda **kw: None, 1893 { 1894 "args": "(**kw)", 1895 "self_arg": None, 1896 "apply_kw": "(**kw)", 1897 "apply_pos": "(**kw)", 1898 }, 1899 ) 1900 1901 test( 1902 lambda *a, **kw: None, 1903 { 1904 "args": "(*a, **kw)", 1905 "self_arg": "a[0]", 1906 "apply_kw": "(*a, **kw)", 1907 "apply_pos": "(*a, **kw)", 1908 }, 1909 ) 1910 1911 test( 1912 lambda a, *b: None, 1913 { 1914 "args": "(a, *b)", 1915 "self_arg": "a", 1916 "apply_kw": "(a, *b)", 1917 "apply_pos": "(a, *b)", 1918 }, 1919 ) 1920 1921 test( 1922 lambda a, **b: None, 1923 { 1924 "args": "(a, **b)", 1925 "self_arg": "a", 1926 "apply_kw": "(a, **b)", 1927 "apply_pos": "(a, **b)", 1928 }, 1929 ) 1930 1931 test( 1932 lambda a, *b, **c: None, 1933 { 1934 "args": "(a, *b, **c)", 1935 "self_arg": "a", 1936 "apply_kw": "(a, *b, **c)", 1937 "apply_pos": "(a, *b, **c)", 1938 }, 1939 ) 1940 1941 test( 1942 lambda a, b=1, **c: None, 1943 { 1944 "args": "(a, b=1, **c)", 1945 "self_arg": "a", 1946 "apply_kw": "(a, b=b, **c)", 1947 "apply_pos": "(a, b, **c)", 1948 }, 1949 ) 1950 1951 test( 1952 lambda a=1, b=2: None, 1953 { 1954 "args": "(a=1, b=2)", 1955 "self_arg": "a", 1956 "apply_kw": "(a=a, b=b)", 1957 "apply_pos": "(a, b)", 1958 }, 1959 ) 1960 1961 test( 1962 lambda a=1, b=2: None, 1963 { 1964 "args": "a=1, b=2", 1965 "self_arg": "a", 1966 "apply_kw": "a=a, b=b", 1967 "apply_pos": "a, b", 1968 }, 1969 grouped=False, 1970 ) 1971 1972 @testing.fails_if( 1973 lambda: util.pypy, 1974 "pypy doesn't report Obj.__init__ as object.__init__", 1975 ) 1976 def test_init_grouped(self): 1977 object_spec = { 1978 "args": "(self)", 1979 "self_arg": "self", 1980 "apply_pos": "(self)", 1981 "apply_kw": "(self)", 1982 } 1983 wrapper_spec = { 1984 "args": "(self, *args, **kwargs)", 1985 "self_arg": "self", 1986 "apply_pos": "(self, *args, **kwargs)", 1987 "apply_kw": "(self, *args, **kwargs)", 1988 } 1989 custom_spec = { 1990 "args": "(slef, a=123)", 1991 "self_arg": "slef", # yes, slef 1992 "apply_pos": "(slef, a)", 1993 "apply_kw": "(slef, a=a)", 1994 } 1995 1996 self._test_init(None, object_spec, wrapper_spec, custom_spec) 1997 self._test_init(True, object_spec, wrapper_spec, custom_spec) 1998 1999 @testing.fails_if( 2000 lambda: util.pypy, 2001 "pypy doesn't report Obj.__init__ as object.__init__", 2002 ) 2003 def test_init_bare(self): 2004 object_spec = { 2005 "args": "self", 2006 "self_arg": "self", 2007 "apply_pos": "self", 2008 "apply_kw": "self", 2009 } 2010 wrapper_spec = { 2011 "args": "self, *args, **kwargs", 2012 "self_arg": "self", 2013 "apply_pos": "self, *args, **kwargs", 2014 "apply_kw": "self, *args, **kwargs", 2015 } 2016 custom_spec = { 2017 "args": "slef, a=123", 2018 "self_arg": "slef", # yes, slef 2019 "apply_pos": "slef, a", 2020 "apply_kw": "slef, a=a", 2021 } 2022 2023 self._test_init(False, object_spec, wrapper_spec, custom_spec) 2024 2025 def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec): 2026 def test(fn, wanted): 2027 if grouped is None: 2028 parsed = util.format_argspec_init(fn) 2029 else: 2030 parsed = util.format_argspec_init(fn, grouped=grouped) 2031 eq_(parsed, wanted) 2032 2033 class Obj(object): 2034 pass 2035 2036 test(Obj.__init__, object_spec) 2037 2038 class Obj(object): 2039 def __init__(self): 2040 pass 2041 2042 test(Obj.__init__, object_spec) 2043 2044 class Obj(object): 2045 def __init__(slef, a=123): 2046 pass 2047 2048 test(Obj.__init__, custom_spec) 2049 2050 class Obj(list): 2051 pass 2052 2053 test(Obj.__init__, wrapper_spec) 2054 2055 class Obj(list): 2056 def __init__(self, *args, **kwargs): 2057 pass 2058 2059 test(Obj.__init__, wrapper_spec) 2060 2061 class Obj(list): 2062 def __init__(self): 2063 pass 2064 2065 test(Obj.__init__, object_spec) 2066 2067 class Obj(list): 2068 def __init__(slef, a=123): 2069 pass 2070 2071 test(Obj.__init__, custom_spec) 2072 2073 2074class GenericReprTest(fixtures.TestBase): 2075 def test_all_positional(self): 2076 class Foo(object): 2077 def __init__(self, a, b, c): 2078 self.a = a 2079 self.b = b 2080 self.c = c 2081 2082 eq_(util.generic_repr(Foo(1, 2, 3)), "Foo(1, 2, 3)") 2083 2084 def test_positional_plus_kw(self): 2085 class Foo(object): 2086 def __init__(self, a, b, c=5, d=4): 2087 self.a = a 2088 self.b = b 2089 self.c = c 2090 self.d = d 2091 2092 eq_(util.generic_repr(Foo(1, 2, 3, 6)), "Foo(1, 2, c=3, d=6)") 2093 2094 def test_kw_defaults(self): 2095 class Foo(object): 2096 def __init__(self, a=1, b=2, c=3, d=4): 2097 self.a = a 2098 self.b = b 2099 self.c = c 2100 self.d = d 2101 2102 eq_(util.generic_repr(Foo(1, 5, 3, 7)), "Foo(b=5, d=7)") 2103 2104 def test_multi_kw(self): 2105 class Foo(object): 2106 def __init__(self, a, b, c=3, d=4): 2107 self.a = a 2108 self.b = b 2109 self.c = c 2110 self.d = d 2111 2112 class Bar(Foo): 2113 def __init__(self, e, f, g=5, **kw): 2114 self.e = e 2115 self.f = f 2116 self.g = g 2117 super(Bar, self).__init__(**kw) 2118 2119 eq_( 2120 util.generic_repr( 2121 Bar("e", "f", g=7, a=6, b=5, d=9), to_inspect=[Bar, Foo] 2122 ), 2123 "Bar('e', 'f', g=7, a=6, b=5, d=9)", 2124 ) 2125 2126 eq_( 2127 util.generic_repr(Bar("e", "f", a=6, b=5), to_inspect=[Bar, Foo]), 2128 "Bar('e', 'f', a=6, b=5)", 2129 ) 2130 2131 def test_multi_kw_repeated(self): 2132 class Foo(object): 2133 def __init__(self, a=1, b=2): 2134 self.a = a 2135 self.b = b 2136 2137 class Bar(Foo): 2138 def __init__(self, b=3, c=4, **kw): 2139 self.c = c 2140 super(Bar, self).__init__(b=b, **kw) 2141 2142 eq_( 2143 util.generic_repr(Bar(a="a", b="b", c="c"), to_inspect=[Bar, Foo]), 2144 "Bar(b='b', c='c', a='a')", 2145 ) 2146 2147 def test_discard_vargs(self): 2148 class Foo(object): 2149 def __init__(self, a, b, *args): 2150 self.a = a 2151 self.b = b 2152 self.c, self.d = args[0:2] 2153 2154 eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2)") 2155 2156 def test_discard_vargs_kwargs(self): 2157 class Foo(object): 2158 def __init__(self, a, b, *args, **kw): 2159 self.a = a 2160 self.b = b 2161 self.c, self.d = args[0:2] 2162 2163 eq_(util.generic_repr(Foo(1, 2, 3, 4, x=7, y=4)), "Foo(1, 2)") 2164 2165 def test_significant_vargs(self): 2166 class Foo(object): 2167 def __init__(self, a, b, *args): 2168 self.a = a 2169 self.b = b 2170 self.args = args 2171 2172 eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2, 3, 4)") 2173 2174 def test_no_args(self): 2175 class Foo(object): 2176 def __init__(self): 2177 pass 2178 2179 eq_(util.generic_repr(Foo()), "Foo()") 2180 2181 def test_no_init(self): 2182 class Foo(object): 2183 pass 2184 2185 eq_(util.generic_repr(Foo()), "Foo()") 2186 2187 2188class AsInterfaceTest(fixtures.TestBase): 2189 class Something(object): 2190 def _ignoreme(self): 2191 pass 2192 2193 def foo(self): 2194 pass 2195 2196 def bar(self): 2197 pass 2198 2199 class Partial(object): 2200 def bar(self): 2201 pass 2202 2203 class Object(object): 2204 pass 2205 2206 def test_instance(self): 2207 obj = object() 2208 assert_raises(TypeError, util.as_interface, obj, cls=self.Something) 2209 2210 assert_raises(TypeError, util.as_interface, obj, methods=("foo")) 2211 2212 assert_raises( 2213 TypeError, 2214 util.as_interface, 2215 obj, 2216 cls=self.Something, 2217 required=("foo"), 2218 ) 2219 2220 obj = self.Something() 2221 eq_(obj, util.as_interface(obj, cls=self.Something)) 2222 eq_(obj, util.as_interface(obj, methods=("foo",))) 2223 eq_( 2224 obj, 2225 util.as_interface( 2226 obj, cls=self.Something, required=("outofband",) 2227 ), 2228 ) 2229 partial = self.Partial() 2230 2231 slotted = self.Object() 2232 slotted.bar = lambda self: 123 2233 2234 for obj in partial, slotted: 2235 eq_(obj, util.as_interface(obj, cls=self.Something)) 2236 assert_raises(TypeError, util.as_interface, obj, methods=("foo")) 2237 eq_(obj, util.as_interface(obj, methods=("bar",))) 2238 eq_( 2239 obj, 2240 util.as_interface(obj, cls=self.Something, required=("bar",)), 2241 ) 2242 assert_raises( 2243 TypeError, 2244 util.as_interface, 2245 obj, 2246 cls=self.Something, 2247 required=("foo",), 2248 ) 2249 2250 assert_raises( 2251 TypeError, 2252 util.as_interface, 2253 obj, 2254 cls=self.Something, 2255 required=self.Something, 2256 ) 2257 2258 def test_dict(self): 2259 obj = {} 2260 assert_raises(TypeError, util.as_interface, obj, cls=self.Something) 2261 assert_raises(TypeError, util.as_interface, obj, methods="foo") 2262 assert_raises( 2263 TypeError, 2264 util.as_interface, 2265 obj, 2266 cls=self.Something, 2267 required="foo", 2268 ) 2269 2270 def assertAdapted(obj, *methods): 2271 assert isinstance(obj, type) 2272 found = set([m for m in dir(obj) if not m.startswith("_")]) 2273 for method in methods: 2274 assert method in found 2275 found.remove(method) 2276 assert not found 2277 2278 def fn(self): 2279 return 123 2280 2281 obj = {"foo": fn, "bar": fn} 2282 res = util.as_interface(obj, cls=self.Something) 2283 assertAdapted(res, "foo", "bar") 2284 res = util.as_interface( 2285 obj, cls=self.Something, required=self.Something 2286 ) 2287 assertAdapted(res, "foo", "bar") 2288 res = util.as_interface(obj, cls=self.Something, required=("foo",)) 2289 assertAdapted(res, "foo", "bar") 2290 res = util.as_interface(obj, methods=("foo", "bar")) 2291 assertAdapted(res, "foo", "bar") 2292 res = util.as_interface(obj, methods=("foo", "bar", "baz")) 2293 assertAdapted(res, "foo", "bar") 2294 res = util.as_interface(obj, methods=("foo", "bar"), required=("foo",)) 2295 assertAdapted(res, "foo", "bar") 2296 assert_raises(TypeError, util.as_interface, obj, methods=("foo",)) 2297 assert_raises( 2298 TypeError, 2299 util.as_interface, 2300 obj, 2301 methods=("foo", "bar", "baz"), 2302 required=("baz",), 2303 ) 2304 obj = {"foo": 123} 2305 assert_raises(TypeError, util.as_interface, obj, cls=self.Something) 2306 2307 2308class TestClassHierarchy(fixtures.TestBase): 2309 def test_object(self): 2310 eq_(set(util.class_hierarchy(object)), set((object,))) 2311 2312 def test_single(self): 2313 class A(object): 2314 pass 2315 2316 class B(object): 2317 pass 2318 2319 eq_(set(util.class_hierarchy(A)), set((A, object))) 2320 eq_(set(util.class_hierarchy(B)), set((B, object))) 2321 2322 class C(A, B): 2323 pass 2324 2325 eq_(set(util.class_hierarchy(A)), set((A, B, C, object))) 2326 eq_(set(util.class_hierarchy(B)), set((A, B, C, object))) 2327 2328 if util.py2k: 2329 2330 def test_oldstyle_mixin(self): 2331 class A(object): 2332 pass 2333 2334 class Mixin: 2335 pass 2336 2337 class B(A, Mixin): 2338 pass 2339 2340 eq_(set(util.class_hierarchy(B)), set((A, B, object))) 2341 eq_(set(util.class_hierarchy(Mixin)), set()) 2342 eq_(set(util.class_hierarchy(A)), set((A, B, object))) 2343 2344 2345class ReraiseTest(fixtures.TestBase): 2346 @testing.requires.python3 2347 def test_raise_from_cause_same_cause(self): 2348 class MyException(Exception): 2349 pass 2350 2351 def go(): 2352 try: 2353 raise MyException("exc one") 2354 except Exception as err: 2355 util.raise_from_cause(err) 2356 2357 try: 2358 go() 2359 assert False 2360 except MyException as err: 2361 is_(err.__cause__, None) 2362 2363 def test_reraise_disallow_same_cause(self): 2364 class MyException(Exception): 2365 pass 2366 2367 def go(): 2368 try: 2369 raise MyException("exc one") 2370 except Exception as err: 2371 type_, value, tb = sys.exc_info() 2372 util.reraise(type_, err, tb, value) 2373 2374 assert_raises_message(AssertionError, "Same cause emitted", go) 2375 2376 def test_raise_from_cause(self): 2377 class MyException(Exception): 2378 pass 2379 2380 class MyOtherException(Exception): 2381 pass 2382 2383 me = MyException("exc on") 2384 2385 def go(): 2386 try: 2387 raise me 2388 except Exception: 2389 util.raise_from_cause(MyOtherException("exc two")) 2390 2391 try: 2392 go() 2393 assert False 2394 except MyOtherException as moe: 2395 if testing.requires.python3.enabled: 2396 is_(moe.__cause__, me) 2397 2398 @testing.requires.python2 2399 def test_safe_reraise_py2k_warning(self): 2400 class MyException(Exception): 2401 pass 2402 2403 class MyOtherException(Exception): 2404 pass 2405 2406 m1 = MyException("exc one") 2407 m2 = MyOtherException("exc two") 2408 2409 def go2(): 2410 raise m2 2411 2412 def go(): 2413 try: 2414 raise m1 2415 except Exception: 2416 with util.safe_reraise(): 2417 go2() 2418 2419 with expect_warnings( 2420 "An exception has occurred during handling of a previous " 2421 "exception. The previous exception " 2422 "is:.*MyException.*exc one" 2423 ): 2424 try: 2425 go() 2426 assert False 2427 except MyOtherException: 2428 pass 2429 2430 2431class TestClassProperty(fixtures.TestBase): 2432 def test_simple(self): 2433 class A(object): 2434 something = {"foo": 1} 2435 2436 class B(A): 2437 @classproperty 2438 def something(cls): 2439 d = dict(super(B, cls).something) 2440 d.update({"bazz": 2}) 2441 return d 2442 2443 eq_(B.something, {"foo": 1, "bazz": 2}) 2444 2445 2446class TestProperties(fixtures.TestBase): 2447 def test_pickle(self): 2448 data = {"hello": "bla"} 2449 props = util.Properties(data) 2450 2451 for loader, dumper in picklers(): 2452 s = dumper(props) 2453 p = loader(s) 2454 2455 eq_(props._data, p._data) 2456 eq_(props.keys(), p.keys()) 2457 2458 def test_pickle_immuatbleprops(self): 2459 data = {"hello": "bla"} 2460 props = util.Properties(data).as_immutable() 2461 2462 for loader, dumper in picklers(): 2463 s = dumper(props) 2464 p = loader(s) 2465 2466 eq_(props._data, p._data) 2467 eq_(props.keys(), p.keys()) 2468 2469 def test_pickle_orderedprops(self): 2470 data = {"hello": "bla"} 2471 props = util.OrderedProperties() 2472 props.update(data) 2473 2474 for loader, dumper in picklers(): 2475 s = dumper(props) 2476 p = loader(s) 2477 2478 eq_(props._data, p._data) 2479 eq_(props.keys(), p.keys()) 2480 2481 2482class QuotedTokenParserTest(fixtures.TestBase): 2483 def _test(self, string, expected): 2484 eq_(langhelpers.quoted_token_parser(string), expected) 2485 2486 def test_single(self): 2487 self._test("name", ["name"]) 2488 2489 def test_dotted(self): 2490 self._test("schema.name", ["schema", "name"]) 2491 2492 def test_dotted_quoted_left(self): 2493 self._test('"Schema".name', ["Schema", "name"]) 2494 2495 def test_dotted_quoted_left_w_quote_left_edge(self): 2496 self._test('"""Schema".name', ['"Schema', "name"]) 2497 2498 def test_dotted_quoted_left_w_quote_right_edge(self): 2499 self._test('"Schema""".name', ['Schema"', "name"]) 2500 2501 def test_dotted_quoted_left_w_quote_middle(self): 2502 self._test('"Sch""ema".name', ['Sch"ema', "name"]) 2503 2504 def test_dotted_quoted_right(self): 2505 self._test('schema."SomeName"', ["schema", "SomeName"]) 2506 2507 def test_dotted_quoted_right_w_quote_left_edge(self): 2508 self._test('schema."""name"', ["schema", '"name']) 2509 2510 def test_dotted_quoted_right_w_quote_right_edge(self): 2511 self._test('schema."name"""', ["schema", 'name"']) 2512 2513 def test_dotted_quoted_right_w_quote_middle(self): 2514 self._test('schema."na""me"', ["schema", 'na"me']) 2515 2516 def test_quoted_single_w_quote_left_edge(self): 2517 self._test('"""name"', ['"name']) 2518 2519 def test_quoted_single_w_quote_right_edge(self): 2520 self._test('"name"""', ['name"']) 2521 2522 def test_quoted_single_w_quote_middle(self): 2523 self._test('"na""me"', ['na"me']) 2524 2525 def test_dotted_quoted_left_w_dot_left_edge(self): 2526 self._test('".Schema".name', [".Schema", "name"]) 2527 2528 def test_dotted_quoted_left_w_dot_right_edge(self): 2529 self._test('"Schema.".name', ["Schema.", "name"]) 2530 2531 def test_dotted_quoted_left_w_dot_middle(self): 2532 self._test('"Sch.ema".name', ["Sch.ema", "name"]) 2533 2534 def test_dotted_quoted_right_w_dot_left_edge(self): 2535 self._test('schema.".name"', ["schema", ".name"]) 2536 2537 def test_dotted_quoted_right_w_dot_right_edge(self): 2538 self._test('schema."name."', ["schema", "name."]) 2539 2540 def test_dotted_quoted_right_w_dot_middle(self): 2541 self._test('schema."na.me"', ["schema", "na.me"]) 2542 2543 def test_quoted_single_w_dot_left_edge(self): 2544 self._test('".name"', [".name"]) 2545 2546 def test_quoted_single_w_dot_right_edge(self): 2547 self._test('"name."', ["name."]) 2548 2549 def test_quoted_single_w_dot_middle(self): 2550 self._test('"na.me"', ["na.me"]) 2551 2552 2553class BackslashReplaceTest(fixtures.TestBase): 2554 def test_ascii_to_utf8(self): 2555 eq_( 2556 compat.decode_backslashreplace(util.b("hello world"), "utf-8"), 2557 util.u("hello world"), 2558 ) 2559 2560 def test_utf8_to_utf8(self): 2561 eq_( 2562 compat.decode_backslashreplace( 2563 util.u("some message méil").encode("utf-8"), "utf-8" 2564 ), 2565 util.u("some message méil"), 2566 ) 2567 2568 def test_latin1_to_utf8(self): 2569 eq_( 2570 compat.decode_backslashreplace( 2571 util.u("some message méil").encode("latin-1"), "utf-8" 2572 ), 2573 util.u("some message m\\xe9il"), 2574 ) 2575 2576 eq_( 2577 compat.decode_backslashreplace( 2578 util.u("some message méil").encode("latin-1"), "latin-1" 2579 ), 2580 util.u("some message méil"), 2581 ) 2582 2583 def test_cp1251_to_utf8(self): 2584 message = util.u("some message П").encode("cp1251") 2585 eq_(message, b"some message \xcf") 2586 eq_( 2587 compat.decode_backslashreplace(message, "utf-8"), 2588 util.u("some message \\xcf"), 2589 ) 2590 2591 eq_( 2592 compat.decode_backslashreplace(message, "cp1251"), 2593 util.u("some message П"), 2594 ) 2595