1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'FILTER_DIR', 18 'NonCallableMock', 19 'NonCallableMagicMock', 20 'mock_open', 21 'PropertyMock', 22 'seal', 23) 24 25 26__version__ = '1.0' 27 28import asyncio 29import contextlib 30import io 31import inspect 32import pprint 33import sys 34import builtins 35from types import CodeType, ModuleType, MethodType 36from unittest.util import safe_repr 37from functools import wraps, partial 38 39 40_builtins = {name for name in dir(builtins) if not name.startswith('_')} 41 42FILTER_DIR = True 43 44# Workaround for issue #12370 45# Without this, the __class__ properties wouldn't be set correctly 46_safe_super = super 47 48def _is_async_obj(obj): 49 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 50 return False 51 if hasattr(obj, '__func__'): 52 obj = getattr(obj, '__func__') 53 return asyncio.iscoroutinefunction(obj) or inspect.isawaitable(obj) 54 55 56def _is_async_func(func): 57 if getattr(func, '__code__', None): 58 return asyncio.iscoroutinefunction(func) 59 else: 60 return False 61 62 63def _is_instance_mock(obj): 64 # can't use isinstance on Mock objects because they override __class__ 65 # The base class for all mocks is NonCallableMock 66 return issubclass(type(obj), NonCallableMock) 67 68 69def _is_exception(obj): 70 return ( 71 isinstance(obj, BaseException) or 72 isinstance(obj, type) and issubclass(obj, BaseException) 73 ) 74 75 76def _extract_mock(obj): 77 # Autospecced functions will return a FunctionType with "mock" attribute 78 # which is the actual mock object that needs to be used. 79 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 80 return obj.mock 81 else: 82 return obj 83 84 85def _get_signature_object(func, as_instance, eat_self): 86 """ 87 Given an arbitrary, possibly callable object, try to create a suitable 88 signature object. 89 Return a (reduced func, signature) tuple, or None. 90 """ 91 if isinstance(func, type) and not as_instance: 92 # If it's a type and should be modelled as a type, use __init__. 93 func = func.__init__ 94 # Skip the `self` argument in __init__ 95 eat_self = True 96 elif not isinstance(func, FunctionTypes): 97 # If we really want to model an instance of the passed type, 98 # __call__ should be looked up, not __init__. 99 try: 100 func = func.__call__ 101 except AttributeError: 102 return None 103 if eat_self: 104 sig_func = partial(func, None) 105 else: 106 sig_func = func 107 try: 108 return func, inspect.signature(sig_func) 109 except ValueError: 110 # Certain callable types are not supported by inspect.signature() 111 return None 112 113 114def _check_signature(func, mock, skipfirst, instance=False): 115 sig = _get_signature_object(func, instance, skipfirst) 116 if sig is None: 117 return 118 func, sig = sig 119 def checksig(self, /, *args, **kwargs): 120 sig.bind(*args, **kwargs) 121 _copy_func_details(func, checksig) 122 type(mock)._mock_check_sig = checksig 123 type(mock).__signature__ = sig 124 125 126def _copy_func_details(func, funcopy): 127 # we explicitly don't copy func.__dict__ into this copy as it would 128 # expose original attributes that should be mocked 129 for attribute in ( 130 '__name__', '__doc__', '__text_signature__', 131 '__module__', '__defaults__', '__kwdefaults__', 132 ): 133 try: 134 setattr(funcopy, attribute, getattr(func, attribute)) 135 except AttributeError: 136 pass 137 138 139def _callable(obj): 140 if isinstance(obj, type): 141 return True 142 if isinstance(obj, (staticmethod, classmethod, MethodType)): 143 return _callable(obj.__func__) 144 if getattr(obj, '__call__', None) is not None: 145 return True 146 return False 147 148 149def _is_list(obj): 150 # checks for list or tuples 151 # XXXX badly named! 152 return type(obj) in (list, tuple) 153 154 155def _instance_callable(obj): 156 """Given an object, return True if the object is callable. 157 For classes, return True if instances would be callable.""" 158 if not isinstance(obj, type): 159 # already an instance 160 return getattr(obj, '__call__', None) is not None 161 162 # *could* be broken by a class overriding __mro__ or __dict__ via 163 # a metaclass 164 for base in (obj,) + obj.__mro__: 165 if base.__dict__.get('__call__') is not None: 166 return True 167 return False 168 169 170def _set_signature(mock, original, instance=False): 171 # creates a function with signature (*args, **kwargs) that delegates to a 172 # mock. It still does signature checking by calling a lambda with the same 173 # signature as the original. 174 175 skipfirst = isinstance(original, type) 176 result = _get_signature_object(original, instance, skipfirst) 177 if result is None: 178 return mock 179 func, sig = result 180 def checksig(*args, **kwargs): 181 sig.bind(*args, **kwargs) 182 _copy_func_details(func, checksig) 183 184 name = original.__name__ 185 if not name.isidentifier(): 186 name = 'funcopy' 187 context = {'_checksig_': checksig, 'mock': mock} 188 src = """def %s(*args, **kwargs): 189 _checksig_(*args, **kwargs) 190 return mock(*args, **kwargs)""" % name 191 exec (src, context) 192 funcopy = context[name] 193 _setup_func(funcopy, mock, sig) 194 return funcopy 195 196 197def _setup_func(funcopy, mock, sig): 198 funcopy.mock = mock 199 200 def assert_called_with(*args, **kwargs): 201 return mock.assert_called_with(*args, **kwargs) 202 def assert_called(*args, **kwargs): 203 return mock.assert_called(*args, **kwargs) 204 def assert_not_called(*args, **kwargs): 205 return mock.assert_not_called(*args, **kwargs) 206 def assert_called_once(*args, **kwargs): 207 return mock.assert_called_once(*args, **kwargs) 208 def assert_called_once_with(*args, **kwargs): 209 return mock.assert_called_once_with(*args, **kwargs) 210 def assert_has_calls(*args, **kwargs): 211 return mock.assert_has_calls(*args, **kwargs) 212 def assert_any_call(*args, **kwargs): 213 return mock.assert_any_call(*args, **kwargs) 214 def reset_mock(): 215 funcopy.method_calls = _CallList() 216 funcopy.mock_calls = _CallList() 217 mock.reset_mock() 218 ret = funcopy.return_value 219 if _is_instance_mock(ret) and not ret is mock: 220 ret.reset_mock() 221 222 funcopy.called = False 223 funcopy.call_count = 0 224 funcopy.call_args = None 225 funcopy.call_args_list = _CallList() 226 funcopy.method_calls = _CallList() 227 funcopy.mock_calls = _CallList() 228 229 funcopy.return_value = mock.return_value 230 funcopy.side_effect = mock.side_effect 231 funcopy._mock_children = mock._mock_children 232 233 funcopy.assert_called_with = assert_called_with 234 funcopy.assert_called_once_with = assert_called_once_with 235 funcopy.assert_has_calls = assert_has_calls 236 funcopy.assert_any_call = assert_any_call 237 funcopy.reset_mock = reset_mock 238 funcopy.assert_called = assert_called 239 funcopy.assert_not_called = assert_not_called 240 funcopy.assert_called_once = assert_called_once 241 funcopy.__signature__ = sig 242 243 mock._mock_delegate = funcopy 244 245 246def _setup_async_mock(mock): 247 mock._is_coroutine = asyncio.coroutines._is_coroutine 248 mock.await_count = 0 249 mock.await_args = None 250 mock.await_args_list = _CallList() 251 252 # Mock is not configured yet so the attributes are set 253 # to a function and then the corresponding mock helper function 254 # is called when the helper is accessed similar to _setup_func. 255 def wrapper(attr, /, *args, **kwargs): 256 return getattr(mock.mock, attr)(*args, **kwargs) 257 258 for attribute in ('assert_awaited', 259 'assert_awaited_once', 260 'assert_awaited_with', 261 'assert_awaited_once_with', 262 'assert_any_await', 263 'assert_has_awaits', 264 'assert_not_awaited'): 265 266 # setattr(mock, attribute, wrapper) causes late binding 267 # hence attribute will always be the last value in the loop 268 # Use partial(wrapper, attribute) to ensure the attribute is bound 269 # correctly. 270 setattr(mock, attribute, partial(wrapper, attribute)) 271 272 273def _is_magic(name): 274 return '__%s__' % name[2:-2] == name 275 276 277class _SentinelObject(object): 278 "A unique, named, sentinel object." 279 def __init__(self, name): 280 self.name = name 281 282 def __repr__(self): 283 return 'sentinel.%s' % self.name 284 285 def __reduce__(self): 286 return 'sentinel.%s' % self.name 287 288 289class _Sentinel(object): 290 """Access attributes to return a named object, usable as a sentinel.""" 291 def __init__(self): 292 self._sentinels = {} 293 294 def __getattr__(self, name): 295 if name == '__bases__': 296 # Without this help(unittest.mock) raises an exception 297 raise AttributeError 298 return self._sentinels.setdefault(name, _SentinelObject(name)) 299 300 def __reduce__(self): 301 return 'sentinel' 302 303 304sentinel = _Sentinel() 305 306DEFAULT = sentinel.DEFAULT 307_missing = sentinel.MISSING 308_deleted = sentinel.DELETED 309 310 311_allowed_names = { 312 'return_value', '_mock_return_value', 'side_effect', 313 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 314 '_mock_name', '_mock_new_name' 315} 316 317 318def _delegating_property(name): 319 _allowed_names.add(name) 320 _the_name = '_mock_' + name 321 def _get(self, name=name, _the_name=_the_name): 322 sig = self._mock_delegate 323 if sig is None: 324 return getattr(self, _the_name) 325 return getattr(sig, name) 326 def _set(self, value, name=name, _the_name=_the_name): 327 sig = self._mock_delegate 328 if sig is None: 329 self.__dict__[_the_name] = value 330 else: 331 setattr(sig, name, value) 332 333 return property(_get, _set) 334 335 336 337class _CallList(list): 338 339 def __contains__(self, value): 340 if not isinstance(value, list): 341 return list.__contains__(self, value) 342 len_value = len(value) 343 len_self = len(self) 344 if len_value > len_self: 345 return False 346 347 for i in range(0, len_self - len_value + 1): 348 sub_list = self[i:i+len_value] 349 if sub_list == value: 350 return True 351 return False 352 353 def __repr__(self): 354 return pprint.pformat(list(self)) 355 356 357def _check_and_set_parent(parent, value, name, new_name): 358 value = _extract_mock(value) 359 360 if not _is_instance_mock(value): 361 return False 362 if ((value._mock_name or value._mock_new_name) or 363 (value._mock_parent is not None) or 364 (value._mock_new_parent is not None)): 365 return False 366 367 _parent = parent 368 while _parent is not None: 369 # setting a mock (value) as a child or return value of itself 370 # should not modify the mock 371 if _parent is value: 372 return False 373 _parent = _parent._mock_new_parent 374 375 if new_name: 376 value._mock_new_parent = parent 377 value._mock_new_name = new_name 378 if name: 379 value._mock_parent = parent 380 value._mock_name = name 381 return True 382 383# Internal class to identify if we wrapped an iterator object or not. 384class _MockIter(object): 385 def __init__(self, obj): 386 self.obj = iter(obj) 387 def __next__(self): 388 return next(self.obj) 389 390class Base(object): 391 _mock_return_value = DEFAULT 392 _mock_side_effect = None 393 def __init__(self, /, *args, **kwargs): 394 pass 395 396 397 398class NonCallableMock(Base): 399 """A non-callable version of `Mock`""" 400 401 def __new__(cls, /, *args, **kw): 402 # every instance has its own class 403 # so we can create magic methods on the 404 # class without stomping on other mocks 405 bases = (cls,) 406 if not issubclass(cls, AsyncMock): 407 # Check if spec is an async object or function 408 sig = inspect.signature(NonCallableMock.__init__) 409 bound_args = sig.bind_partial(cls, *args, **kw).arguments 410 spec_arg = [ 411 arg for arg in bound_args.keys() 412 if arg.startswith('spec') 413 ] 414 if spec_arg: 415 # what if spec_set is different than spec? 416 if _is_async_obj(bound_args[spec_arg[0]]): 417 bases = (AsyncMockMixin, cls,) 418 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 419 instance = _safe_super(NonCallableMock, cls).__new__(new) 420 return instance 421 422 423 def __init__( 424 self, spec=None, wraps=None, name=None, spec_set=None, 425 parent=None, _spec_state=None, _new_name='', _new_parent=None, 426 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 427 ): 428 if _new_parent is None: 429 _new_parent = parent 430 431 __dict__ = self.__dict__ 432 __dict__['_mock_parent'] = parent 433 __dict__['_mock_name'] = name 434 __dict__['_mock_new_name'] = _new_name 435 __dict__['_mock_new_parent'] = _new_parent 436 __dict__['_mock_sealed'] = False 437 438 if spec_set is not None: 439 spec = spec_set 440 spec_set = True 441 if _eat_self is None: 442 _eat_self = parent is not None 443 444 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 445 446 __dict__['_mock_children'] = {} 447 __dict__['_mock_wraps'] = wraps 448 __dict__['_mock_delegate'] = None 449 450 __dict__['_mock_called'] = False 451 __dict__['_mock_call_args'] = None 452 __dict__['_mock_call_count'] = 0 453 __dict__['_mock_call_args_list'] = _CallList() 454 __dict__['_mock_mock_calls'] = _CallList() 455 456 __dict__['method_calls'] = _CallList() 457 __dict__['_mock_unsafe'] = unsafe 458 459 if kwargs: 460 self.configure_mock(**kwargs) 461 462 _safe_super(NonCallableMock, self).__init__( 463 spec, wraps, name, spec_set, parent, 464 _spec_state 465 ) 466 467 468 def attach_mock(self, mock, attribute): 469 """ 470 Attach a mock as an attribute of this one, replacing its name and 471 parent. Calls to the attached mock will be recorded in the 472 `method_calls` and `mock_calls` attributes of this one.""" 473 inner_mock = _extract_mock(mock) 474 475 inner_mock._mock_parent = None 476 inner_mock._mock_new_parent = None 477 inner_mock._mock_name = '' 478 inner_mock._mock_new_name = None 479 480 setattr(self, attribute, mock) 481 482 483 def mock_add_spec(self, spec, spec_set=False): 484 """Add a spec to a mock. `spec` can either be an object or a 485 list of strings. Only attributes on the `spec` can be fetched as 486 attributes from the mock. 487 488 If `spec_set` is True then only attributes on the spec can be set.""" 489 self._mock_add_spec(spec, spec_set) 490 491 492 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 493 _eat_self=False): 494 _spec_class = None 495 _spec_signature = None 496 _spec_asyncs = [] 497 498 for attr in dir(spec): 499 if asyncio.iscoroutinefunction(getattr(spec, attr, None)): 500 _spec_asyncs.append(attr) 501 502 if spec is not None and not _is_list(spec): 503 if isinstance(spec, type): 504 _spec_class = spec 505 else: 506 _spec_class = type(spec) 507 res = _get_signature_object(spec, 508 _spec_as_instance, _eat_self) 509 _spec_signature = res and res[1] 510 511 spec = dir(spec) 512 513 __dict__ = self.__dict__ 514 __dict__['_spec_class'] = _spec_class 515 __dict__['_spec_set'] = spec_set 516 __dict__['_spec_signature'] = _spec_signature 517 __dict__['_mock_methods'] = spec 518 __dict__['_spec_asyncs'] = _spec_asyncs 519 520 def __get_return_value(self): 521 ret = self._mock_return_value 522 if self._mock_delegate is not None: 523 ret = self._mock_delegate.return_value 524 525 if ret is DEFAULT: 526 ret = self._get_child_mock( 527 _new_parent=self, _new_name='()' 528 ) 529 self.return_value = ret 530 return ret 531 532 533 def __set_return_value(self, value): 534 if self._mock_delegate is not None: 535 self._mock_delegate.return_value = value 536 else: 537 self._mock_return_value = value 538 _check_and_set_parent(self, value, None, '()') 539 540 __return_value_doc = "The value to be returned when the mock is called." 541 return_value = property(__get_return_value, __set_return_value, 542 __return_value_doc) 543 544 545 @property 546 def __class__(self): 547 if self._spec_class is None: 548 return type(self) 549 return self._spec_class 550 551 called = _delegating_property('called') 552 call_count = _delegating_property('call_count') 553 call_args = _delegating_property('call_args') 554 call_args_list = _delegating_property('call_args_list') 555 mock_calls = _delegating_property('mock_calls') 556 557 558 def __get_side_effect(self): 559 delegated = self._mock_delegate 560 if delegated is None: 561 return self._mock_side_effect 562 sf = delegated.side_effect 563 if (sf is not None and not callable(sf) 564 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 565 sf = _MockIter(sf) 566 delegated.side_effect = sf 567 return sf 568 569 def __set_side_effect(self, value): 570 value = _try_iter(value) 571 delegated = self._mock_delegate 572 if delegated is None: 573 self._mock_side_effect = value 574 else: 575 delegated.side_effect = value 576 577 side_effect = property(__get_side_effect, __set_side_effect) 578 579 580 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 581 "Restore the mock object to its initial state." 582 if visited is None: 583 visited = [] 584 if id(self) in visited: 585 return 586 visited.append(id(self)) 587 588 self.called = False 589 self.call_args = None 590 self.call_count = 0 591 self.mock_calls = _CallList() 592 self.call_args_list = _CallList() 593 self.method_calls = _CallList() 594 595 if return_value: 596 self._mock_return_value = DEFAULT 597 if side_effect: 598 self._mock_side_effect = None 599 600 for child in self._mock_children.values(): 601 if isinstance(child, _SpecState) or child is _deleted: 602 continue 603 child.reset_mock(visited) 604 605 ret = self._mock_return_value 606 if _is_instance_mock(ret) and ret is not self: 607 ret.reset_mock(visited) 608 609 610 def configure_mock(self, /, **kwargs): 611 """Set attributes on the mock through keyword arguments. 612 613 Attributes plus return values and side effects can be set on child 614 mocks using standard dot notation and unpacking a dictionary in the 615 method call: 616 617 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 618 >>> mock.configure_mock(**attrs)""" 619 for arg, val in sorted(kwargs.items(), 620 # we sort on the number of dots so that 621 # attributes are set before we set attributes on 622 # attributes 623 key=lambda entry: entry[0].count('.')): 624 args = arg.split('.') 625 final = args.pop() 626 obj = self 627 for entry in args: 628 obj = getattr(obj, entry) 629 setattr(obj, final, val) 630 631 632 def __getattr__(self, name): 633 if name in {'_mock_methods', '_mock_unsafe'}: 634 raise AttributeError(name) 635 elif self._mock_methods is not None: 636 if name not in self._mock_methods or name in _all_magics: 637 raise AttributeError("Mock object has no attribute %r" % name) 638 elif _is_magic(name): 639 raise AttributeError(name) 640 if not self._mock_unsafe: 641 if name.startswith(('assert', 'assret')): 642 raise AttributeError("Attributes cannot start with 'assert' " 643 "or 'assret'") 644 645 result = self._mock_children.get(name) 646 if result is _deleted: 647 raise AttributeError(name) 648 elif result is None: 649 wraps = None 650 if self._mock_wraps is not None: 651 # XXXX should we get the attribute without triggering code 652 # execution? 653 wraps = getattr(self._mock_wraps, name) 654 655 result = self._get_child_mock( 656 parent=self, name=name, wraps=wraps, _new_name=name, 657 _new_parent=self 658 ) 659 self._mock_children[name] = result 660 661 elif isinstance(result, _SpecState): 662 result = create_autospec( 663 result.spec, result.spec_set, result.instance, 664 result.parent, result.name 665 ) 666 self._mock_children[name] = result 667 668 return result 669 670 671 def _extract_mock_name(self): 672 _name_list = [self._mock_new_name] 673 _parent = self._mock_new_parent 674 last = self 675 676 dot = '.' 677 if _name_list == ['()']: 678 dot = '' 679 680 while _parent is not None: 681 last = _parent 682 683 _name_list.append(_parent._mock_new_name + dot) 684 dot = '.' 685 if _parent._mock_new_name == '()': 686 dot = '' 687 688 _parent = _parent._mock_new_parent 689 690 _name_list = list(reversed(_name_list)) 691 _first = last._mock_name or 'mock' 692 if len(_name_list) > 1: 693 if _name_list[1] not in ('()', '().'): 694 _first += '.' 695 _name_list[0] = _first 696 return ''.join(_name_list) 697 698 def __repr__(self): 699 name = self._extract_mock_name() 700 701 name_string = '' 702 if name not in ('mock', 'mock.'): 703 name_string = ' name=%r' % name 704 705 spec_string = '' 706 if self._spec_class is not None: 707 spec_string = ' spec=%r' 708 if self._spec_set: 709 spec_string = ' spec_set=%r' 710 spec_string = spec_string % self._spec_class.__name__ 711 return "<%s%s%s id='%s'>" % ( 712 type(self).__name__, 713 name_string, 714 spec_string, 715 id(self) 716 ) 717 718 719 def __dir__(self): 720 """Filter the output of `dir(mock)` to only useful members.""" 721 if not FILTER_DIR: 722 return object.__dir__(self) 723 724 extras = self._mock_methods or [] 725 from_type = dir(type(self)) 726 from_dict = list(self.__dict__) 727 from_child_mocks = [ 728 m_name for m_name, m_value in self._mock_children.items() 729 if m_value is not _deleted] 730 731 from_type = [e for e in from_type if not e.startswith('_')] 732 from_dict = [e for e in from_dict if not e.startswith('_') or 733 _is_magic(e)] 734 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 735 736 737 def __setattr__(self, name, value): 738 if name in _allowed_names: 739 # property setters go through here 740 return object.__setattr__(self, name, value) 741 elif (self._spec_set and self._mock_methods is not None and 742 name not in self._mock_methods and 743 name not in self.__dict__): 744 raise AttributeError("Mock object has no attribute '%s'" % name) 745 elif name in _unsupported_magics: 746 msg = 'Attempting to set unsupported magic method %r.' % name 747 raise AttributeError(msg) 748 elif name in _all_magics: 749 if self._mock_methods is not None and name not in self._mock_methods: 750 raise AttributeError("Mock object has no attribute '%s'" % name) 751 752 if not _is_instance_mock(value): 753 setattr(type(self), name, _get_method(name, value)) 754 original = value 755 value = lambda *args, **kw: original(self, *args, **kw) 756 else: 757 # only set _new_name and not name so that mock_calls is tracked 758 # but not method calls 759 _check_and_set_parent(self, value, None, name) 760 setattr(type(self), name, value) 761 self._mock_children[name] = value 762 elif name == '__class__': 763 self._spec_class = value 764 return 765 else: 766 if _check_and_set_parent(self, value, name, name): 767 self._mock_children[name] = value 768 769 if self._mock_sealed and not hasattr(self, name): 770 mock_name = f'{self._extract_mock_name()}.{name}' 771 raise AttributeError(f'Cannot set {mock_name}') 772 773 return object.__setattr__(self, name, value) 774 775 776 def __delattr__(self, name): 777 if name in _all_magics and name in type(self).__dict__: 778 delattr(type(self), name) 779 if name not in self.__dict__: 780 # for magic methods that are still MagicProxy objects and 781 # not set on the instance itself 782 return 783 784 obj = self._mock_children.get(name, _missing) 785 if name in self.__dict__: 786 _safe_super(NonCallableMock, self).__delattr__(name) 787 elif obj is _deleted: 788 raise AttributeError(name) 789 if obj is not _missing: 790 del self._mock_children[name] 791 self._mock_children[name] = _deleted 792 793 794 def _format_mock_call_signature(self, args, kwargs): 795 name = self._mock_name or 'mock' 796 return _format_call_signature(name, args, kwargs) 797 798 799 def _format_mock_failure_message(self, args, kwargs, action='call'): 800 message = 'expected %s not found.\nExpected: %s\nActual: %s' 801 expected_string = self._format_mock_call_signature(args, kwargs) 802 call_args = self.call_args 803 actual_string = self._format_mock_call_signature(*call_args) 804 return message % (action, expected_string, actual_string) 805 806 807 def _get_call_signature_from_name(self, name): 808 """ 809 * If call objects are asserted against a method/function like obj.meth1 810 then there could be no name for the call object to lookup. Hence just 811 return the spec_signature of the method/function being asserted against. 812 * If the name is not empty then remove () and split by '.' to get 813 list of names to iterate through the children until a potential 814 match is found. A child mock is created only during attribute access 815 so if we get a _SpecState then no attributes of the spec were accessed 816 and can be safely exited. 817 """ 818 if not name: 819 return self._spec_signature 820 821 sig = None 822 names = name.replace('()', '').split('.') 823 children = self._mock_children 824 825 for name in names: 826 child = children.get(name) 827 if child is None or isinstance(child, _SpecState): 828 break 829 else: 830 # If an autospecced object is attached using attach_mock the 831 # child would be a function with mock object as attribute from 832 # which signature has to be derived. 833 child = _extract_mock(child) 834 children = child._mock_children 835 sig = child._spec_signature 836 837 return sig 838 839 840 def _call_matcher(self, _call): 841 """ 842 Given a call (or simply an (args, kwargs) tuple), return a 843 comparison key suitable for matching with other calls. 844 This is a best effort method which relies on the spec's signature, 845 if available, or falls back on the arguments themselves. 846 """ 847 848 if isinstance(_call, tuple) and len(_call) > 2: 849 sig = self._get_call_signature_from_name(_call[0]) 850 else: 851 sig = self._spec_signature 852 853 if sig is not None: 854 if len(_call) == 2: 855 name = '' 856 args, kwargs = _call 857 else: 858 name, args, kwargs = _call 859 try: 860 return name, sig.bind(*args, **kwargs) 861 except TypeError as e: 862 return e.with_traceback(None) 863 else: 864 return _call 865 866 def assert_not_called(self): 867 """assert that the mock was never called. 868 """ 869 if self.call_count != 0: 870 msg = ("Expected '%s' to not have been called. Called %s times.%s" 871 % (self._mock_name or 'mock', 872 self.call_count, 873 self._calls_repr())) 874 raise AssertionError(msg) 875 876 def assert_called(self): 877 """assert that the mock was called at least once 878 """ 879 if self.call_count == 0: 880 msg = ("Expected '%s' to have been called." % 881 (self._mock_name or 'mock')) 882 raise AssertionError(msg) 883 884 def assert_called_once(self): 885 """assert that the mock was called only once. 886 """ 887 if not self.call_count == 1: 888 msg = ("Expected '%s' to have been called once. Called %s times.%s" 889 % (self._mock_name or 'mock', 890 self.call_count, 891 self._calls_repr())) 892 raise AssertionError(msg) 893 894 def assert_called_with(self, /, *args, **kwargs): 895 """assert that the last call was made with the specified arguments. 896 897 Raises an AssertionError if the args and keyword args passed in are 898 different to the last call to the mock.""" 899 if self.call_args is None: 900 expected = self._format_mock_call_signature(args, kwargs) 901 actual = 'not called.' 902 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 903 % (expected, actual)) 904 raise AssertionError(error_message) 905 906 def _error_message(): 907 msg = self._format_mock_failure_message(args, kwargs) 908 return msg 909 expected = self._call_matcher((args, kwargs)) 910 actual = self._call_matcher(self.call_args) 911 if expected != actual: 912 cause = expected if isinstance(expected, Exception) else None 913 raise AssertionError(_error_message()) from cause 914 915 916 def assert_called_once_with(self, /, *args, **kwargs): 917 """assert that the mock was called exactly once and that that call was 918 with the specified arguments.""" 919 if not self.call_count == 1: 920 msg = ("Expected '%s' to be called once. Called %s times.%s" 921 % (self._mock_name or 'mock', 922 self.call_count, 923 self._calls_repr())) 924 raise AssertionError(msg) 925 return self.assert_called_with(*args, **kwargs) 926 927 928 def assert_has_calls(self, calls, any_order=False): 929 """assert the mock has been called with the specified calls. 930 The `mock_calls` list is checked for the calls. 931 932 If `any_order` is False (the default) then the calls must be 933 sequential. There can be extra calls before or after the 934 specified calls. 935 936 If `any_order` is True then the calls can be in any order, but 937 they must all appear in `mock_calls`.""" 938 expected = [self._call_matcher(c) for c in calls] 939 cause = next((e for e in expected if isinstance(e, Exception)), None) 940 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 941 if not any_order: 942 if expected not in all_calls: 943 if cause is None: 944 problem = 'Calls not found.' 945 else: 946 problem = ('Error processing expected calls.\n' 947 'Errors: {}').format( 948 [e if isinstance(e, Exception) else None 949 for e in expected]) 950 raise AssertionError( 951 f'{problem}\n' 952 f'Expected: {_CallList(calls)}' 953 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 954 ) from cause 955 return 956 957 all_calls = list(all_calls) 958 959 not_found = [] 960 for kall in expected: 961 try: 962 all_calls.remove(kall) 963 except ValueError: 964 not_found.append(kall) 965 if not_found: 966 raise AssertionError( 967 '%r does not contain all of %r in its call list, ' 968 'found %r instead' % (self._mock_name or 'mock', 969 tuple(not_found), all_calls) 970 ) from cause 971 972 973 def assert_any_call(self, /, *args, **kwargs): 974 """assert the mock has been called with the specified arguments. 975 976 The assert passes if the mock has *ever* been called, unlike 977 `assert_called_with` and `assert_called_once_with` that only pass if 978 the call is the most recent one.""" 979 expected = self._call_matcher((args, kwargs)) 980 actual = [self._call_matcher(c) for c in self.call_args_list] 981 if expected not in actual: 982 cause = expected if isinstance(expected, Exception) else None 983 expected_string = self._format_mock_call_signature(args, kwargs) 984 raise AssertionError( 985 '%s call not found' % expected_string 986 ) from cause 987 988 989 def _get_child_mock(self, /, **kw): 990 """Create the child mocks for attributes and return value. 991 By default child mocks will be the same type as the parent. 992 Subclasses of Mock may want to override this to customize the way 993 child mocks are made. 994 995 For non-callable mocks the callable variant will be used (rather than 996 any custom subclass).""" 997 _new_name = kw.get("_new_name") 998 if _new_name in self.__dict__['_spec_asyncs']: 999 return AsyncMock(**kw) 1000 1001 _type = type(self) 1002 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 1003 # Any asynchronous magic becomes an AsyncMock 1004 klass = AsyncMock 1005 elif issubclass(_type, AsyncMockMixin): 1006 if (_new_name in _all_sync_magics or 1007 self._mock_methods and _new_name in self._mock_methods): 1008 # Any synchronous method on AsyncMock becomes a MagicMock 1009 klass = MagicMock 1010 else: 1011 klass = AsyncMock 1012 elif not issubclass(_type, CallableMixin): 1013 if issubclass(_type, NonCallableMagicMock): 1014 klass = MagicMock 1015 elif issubclass(_type, NonCallableMock): 1016 klass = Mock 1017 else: 1018 klass = _type.__mro__[1] 1019 1020 if self._mock_sealed: 1021 attribute = "." + kw["name"] if "name" in kw else "()" 1022 mock_name = self._extract_mock_name() + attribute 1023 raise AttributeError(mock_name) 1024 1025 return klass(**kw) 1026 1027 1028 def _calls_repr(self, prefix="Calls"): 1029 """Renders self.mock_calls as a string. 1030 1031 Example: "\nCalls: [call(1), call(2)]." 1032 1033 If self.mock_calls is empty, an empty string is returned. The 1034 output will be truncated if very long. 1035 """ 1036 if not self.mock_calls: 1037 return "" 1038 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 1039 1040 1041 1042def _try_iter(obj): 1043 if obj is None: 1044 return obj 1045 if _is_exception(obj): 1046 return obj 1047 if _callable(obj): 1048 return obj 1049 try: 1050 return iter(obj) 1051 except TypeError: 1052 # XXXX backwards compatibility 1053 # but this will blow up on first call - so maybe we should fail early? 1054 return obj 1055 1056 1057class CallableMixin(Base): 1058 1059 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1060 wraps=None, name=None, spec_set=None, parent=None, 1061 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1062 self.__dict__['_mock_return_value'] = return_value 1063 _safe_super(CallableMixin, self).__init__( 1064 spec, wraps, name, spec_set, parent, 1065 _spec_state, _new_name, _new_parent, **kwargs 1066 ) 1067 1068 self.side_effect = side_effect 1069 1070 1071 def _mock_check_sig(self, /, *args, **kwargs): 1072 # stub method that can be replaced with one with a specific signature 1073 pass 1074 1075 1076 def __call__(self, /, *args, **kwargs): 1077 # can't use self in-case a function / method we are mocking uses self 1078 # in the signature 1079 self._mock_check_sig(*args, **kwargs) 1080 self._increment_mock_call(*args, **kwargs) 1081 return self._mock_call(*args, **kwargs) 1082 1083 1084 def _mock_call(self, /, *args, **kwargs): 1085 return self._execute_mock_call(*args, **kwargs) 1086 1087 def _increment_mock_call(self, /, *args, **kwargs): 1088 self.called = True 1089 self.call_count += 1 1090 1091 # handle call_args 1092 # needs to be set here so assertions on call arguments pass before 1093 # execution in the case of awaited calls 1094 _call = _Call((args, kwargs), two=True) 1095 self.call_args = _call 1096 self.call_args_list.append(_call) 1097 1098 # initial stuff for method_calls: 1099 do_method_calls = self._mock_parent is not None 1100 method_call_name = self._mock_name 1101 1102 # initial stuff for mock_calls: 1103 mock_call_name = self._mock_new_name 1104 is_a_call = mock_call_name == '()' 1105 self.mock_calls.append(_Call(('', args, kwargs))) 1106 1107 # follow up the chain of mocks: 1108 _new_parent = self._mock_new_parent 1109 while _new_parent is not None: 1110 1111 # handle method_calls: 1112 if do_method_calls: 1113 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1114 do_method_calls = _new_parent._mock_parent is not None 1115 if do_method_calls: 1116 method_call_name = _new_parent._mock_name + '.' + method_call_name 1117 1118 # handle mock_calls: 1119 this_mock_call = _Call((mock_call_name, args, kwargs)) 1120 _new_parent.mock_calls.append(this_mock_call) 1121 1122 if _new_parent._mock_new_name: 1123 if is_a_call: 1124 dot = '' 1125 else: 1126 dot = '.' 1127 is_a_call = _new_parent._mock_new_name == '()' 1128 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1129 1130 # follow the parental chain: 1131 _new_parent = _new_parent._mock_new_parent 1132 1133 def _execute_mock_call(self, /, *args, **kwargs): 1134 # separate from _increment_mock_call so that awaited functions are 1135 # executed separately from their call, also AsyncMock overrides this method 1136 1137 effect = self.side_effect 1138 if effect is not None: 1139 if _is_exception(effect): 1140 raise effect 1141 elif not _callable(effect): 1142 result = next(effect) 1143 if _is_exception(result): 1144 raise result 1145 else: 1146 result = effect(*args, **kwargs) 1147 1148 if result is not DEFAULT: 1149 return result 1150 1151 if self._mock_return_value is not DEFAULT: 1152 return self.return_value 1153 1154 if self._mock_wraps is not None: 1155 return self._mock_wraps(*args, **kwargs) 1156 1157 return self.return_value 1158 1159 1160 1161class Mock(CallableMixin, NonCallableMock): 1162 """ 1163 Create a new `Mock` object. `Mock` takes several optional arguments 1164 that specify the behaviour of the Mock object: 1165 1166 * `spec`: This can be either a list of strings or an existing object (a 1167 class or instance) that acts as the specification for the mock object. If 1168 you pass in an object then a list of strings is formed by calling dir on 1169 the object (excluding unsupported magic attributes and methods). Accessing 1170 any attribute not in this list will raise an `AttributeError`. 1171 1172 If `spec` is an object (rather than a list of strings) then 1173 `mock.__class__` returns the class of the spec object. This allows mocks 1174 to pass `isinstance` tests. 1175 1176 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1177 or get an attribute on the mock that isn't on the object passed as 1178 `spec_set` will raise an `AttributeError`. 1179 1180 * `side_effect`: A function to be called whenever the Mock is called. See 1181 the `side_effect` attribute. Useful for raising exceptions or 1182 dynamically changing return values. The function is called with the same 1183 arguments as the mock, and unless it returns `DEFAULT`, the return 1184 value of this function is used as the return value. 1185 1186 If `side_effect` is an iterable then each call to the mock will return 1187 the next value from the iterable. If any of the members of the iterable 1188 are exceptions they will be raised instead of returned. 1189 1190 * `return_value`: The value returned when the mock is called. By default 1191 this is a new Mock (created on first access). See the 1192 `return_value` attribute. 1193 1194 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1195 calling the Mock will pass the call through to the wrapped object 1196 (returning the real result). Attribute access on the mock will return a 1197 Mock object that wraps the corresponding attribute of the wrapped object 1198 (so attempting to access an attribute that doesn't exist will raise an 1199 `AttributeError`). 1200 1201 If the mock has an explicit `return_value` set then calls are not passed 1202 to the wrapped object and the `return_value` is returned instead. 1203 1204 * `name`: If the mock has a name then it will be used in the repr of the 1205 mock. This can be useful for debugging. The name is propagated to child 1206 mocks. 1207 1208 Mocks can also be called with arbitrary keyword arguments. These will be 1209 used to set attributes on the mock after it is created. 1210 """ 1211 1212 1213def _dot_lookup(thing, comp, import_path): 1214 try: 1215 return getattr(thing, comp) 1216 except AttributeError: 1217 __import__(import_path) 1218 return getattr(thing, comp) 1219 1220 1221def _importer(target): 1222 components = target.split('.') 1223 import_path = components.pop(0) 1224 thing = __import__(import_path) 1225 1226 for comp in components: 1227 import_path += ".%s" % comp 1228 thing = _dot_lookup(thing, comp, import_path) 1229 return thing 1230 1231 1232class _patch(object): 1233 1234 attribute_name = None 1235 _active_patches = [] 1236 1237 def __init__( 1238 self, getter, attribute, new, spec, create, 1239 spec_set, autospec, new_callable, kwargs 1240 ): 1241 if new_callable is not None: 1242 if new is not DEFAULT: 1243 raise ValueError( 1244 "Cannot use 'new' and 'new_callable' together" 1245 ) 1246 if autospec is not None: 1247 raise ValueError( 1248 "Cannot use 'autospec' and 'new_callable' together" 1249 ) 1250 1251 self.getter = getter 1252 self.attribute = attribute 1253 self.new = new 1254 self.new_callable = new_callable 1255 self.spec = spec 1256 self.create = create 1257 self.has_local = False 1258 self.spec_set = spec_set 1259 self.autospec = autospec 1260 self.kwargs = kwargs 1261 self.additional_patchers = [] 1262 1263 1264 def copy(self): 1265 patcher = _patch( 1266 self.getter, self.attribute, self.new, self.spec, 1267 self.create, self.spec_set, 1268 self.autospec, self.new_callable, self.kwargs 1269 ) 1270 patcher.attribute_name = self.attribute_name 1271 patcher.additional_patchers = [ 1272 p.copy() for p in self.additional_patchers 1273 ] 1274 return patcher 1275 1276 1277 def __call__(self, func): 1278 if isinstance(func, type): 1279 return self.decorate_class(func) 1280 if inspect.iscoroutinefunction(func): 1281 return self.decorate_async_callable(func) 1282 return self.decorate_callable(func) 1283 1284 1285 def decorate_class(self, klass): 1286 for attr in dir(klass): 1287 if not attr.startswith(patch.TEST_PREFIX): 1288 continue 1289 1290 attr_value = getattr(klass, attr) 1291 if not hasattr(attr_value, "__call__"): 1292 continue 1293 1294 patcher = self.copy() 1295 setattr(klass, attr, patcher(attr_value)) 1296 return klass 1297 1298 1299 @contextlib.contextmanager 1300 def decoration_helper(self, patched, args, keywargs): 1301 extra_args = [] 1302 with contextlib.ExitStack() as exit_stack: 1303 for patching in patched.patchings: 1304 arg = exit_stack.enter_context(patching) 1305 if patching.attribute_name is not None: 1306 keywargs.update(arg) 1307 elif patching.new is DEFAULT: 1308 extra_args.append(arg) 1309 1310 args += tuple(extra_args) 1311 yield (args, keywargs) 1312 1313 1314 def decorate_callable(self, func): 1315 # NB. Keep the method in sync with decorate_async_callable() 1316 if hasattr(func, 'patchings'): 1317 func.patchings.append(self) 1318 return func 1319 1320 @wraps(func) 1321 def patched(*args, **keywargs): 1322 with self.decoration_helper(patched, 1323 args, 1324 keywargs) as (newargs, newkeywargs): 1325 return func(*newargs, **newkeywargs) 1326 1327 patched.patchings = [self] 1328 return patched 1329 1330 1331 def decorate_async_callable(self, func): 1332 # NB. Keep the method in sync with decorate_callable() 1333 if hasattr(func, 'patchings'): 1334 func.patchings.append(self) 1335 return func 1336 1337 @wraps(func) 1338 async def patched(*args, **keywargs): 1339 with self.decoration_helper(patched, 1340 args, 1341 keywargs) as (newargs, newkeywargs): 1342 return await func(*newargs, **newkeywargs) 1343 1344 patched.patchings = [self] 1345 return patched 1346 1347 1348 def get_original(self): 1349 target = self.getter() 1350 name = self.attribute 1351 1352 original = DEFAULT 1353 local = False 1354 1355 try: 1356 original = target.__dict__[name] 1357 except (AttributeError, KeyError): 1358 original = getattr(target, name, DEFAULT) 1359 else: 1360 local = True 1361 1362 if name in _builtins and isinstance(target, ModuleType): 1363 self.create = True 1364 1365 if not self.create and original is DEFAULT: 1366 raise AttributeError( 1367 "%s does not have the attribute %r" % (target, name) 1368 ) 1369 return original, local 1370 1371 1372 def __enter__(self): 1373 """Perform the patch.""" 1374 new, spec, spec_set = self.new, self.spec, self.spec_set 1375 autospec, kwargs = self.autospec, self.kwargs 1376 new_callable = self.new_callable 1377 self.target = self.getter() 1378 1379 # normalise False to None 1380 if spec is False: 1381 spec = None 1382 if spec_set is False: 1383 spec_set = None 1384 if autospec is False: 1385 autospec = None 1386 1387 if spec is not None and autospec is not None: 1388 raise TypeError("Can't specify spec and autospec") 1389 if ((spec is not None or autospec is not None) and 1390 spec_set not in (True, None)): 1391 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1392 1393 original, local = self.get_original() 1394 1395 if new is DEFAULT and autospec is None: 1396 inherit = False 1397 if spec is True: 1398 # set spec to the object we are replacing 1399 spec = original 1400 if spec_set is True: 1401 spec_set = original 1402 spec = None 1403 elif spec is not None: 1404 if spec_set is True: 1405 spec_set = spec 1406 spec = None 1407 elif spec_set is True: 1408 spec_set = original 1409 1410 if spec is not None or spec_set is not None: 1411 if original is DEFAULT: 1412 raise TypeError("Can't use 'spec' with create=True") 1413 if isinstance(original, type): 1414 # If we're patching out a class and there is a spec 1415 inherit = True 1416 if spec is None and _is_async_obj(original): 1417 Klass = AsyncMock 1418 else: 1419 Klass = MagicMock 1420 _kwargs = {} 1421 if new_callable is not None: 1422 Klass = new_callable 1423 elif spec is not None or spec_set is not None: 1424 this_spec = spec 1425 if spec_set is not None: 1426 this_spec = spec_set 1427 if _is_list(this_spec): 1428 not_callable = '__call__' not in this_spec 1429 else: 1430 not_callable = not callable(this_spec) 1431 if _is_async_obj(this_spec): 1432 Klass = AsyncMock 1433 elif not_callable: 1434 Klass = NonCallableMagicMock 1435 1436 if spec is not None: 1437 _kwargs['spec'] = spec 1438 if spec_set is not None: 1439 _kwargs['spec_set'] = spec_set 1440 1441 # add a name to mocks 1442 if (isinstance(Klass, type) and 1443 issubclass(Klass, NonCallableMock) and self.attribute): 1444 _kwargs['name'] = self.attribute 1445 1446 _kwargs.update(kwargs) 1447 new = Klass(**_kwargs) 1448 1449 if inherit and _is_instance_mock(new): 1450 # we can only tell if the instance should be callable if the 1451 # spec is not a list 1452 this_spec = spec 1453 if spec_set is not None: 1454 this_spec = spec_set 1455 if (not _is_list(this_spec) and not 1456 _instance_callable(this_spec)): 1457 Klass = NonCallableMagicMock 1458 1459 _kwargs.pop('name') 1460 new.return_value = Klass(_new_parent=new, _new_name='()', 1461 **_kwargs) 1462 elif autospec is not None: 1463 # spec is ignored, new *must* be default, spec_set is treated 1464 # as a boolean. Should we check spec is not None and that spec_set 1465 # is a bool? 1466 if new is not DEFAULT: 1467 raise TypeError( 1468 "autospec creates the mock for you. Can't specify " 1469 "autospec and new." 1470 ) 1471 if original is DEFAULT: 1472 raise TypeError("Can't use 'autospec' with create=True") 1473 spec_set = bool(spec_set) 1474 if autospec is True: 1475 autospec = original 1476 1477 new = create_autospec(autospec, spec_set=spec_set, 1478 _name=self.attribute, **kwargs) 1479 elif kwargs: 1480 # can't set keyword args when we aren't creating the mock 1481 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1482 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1483 1484 new_attr = new 1485 1486 self.temp_original = original 1487 self.is_local = local 1488 self._exit_stack = contextlib.ExitStack() 1489 try: 1490 setattr(self.target, self.attribute, new_attr) 1491 if self.attribute_name is not None: 1492 extra_args = {} 1493 if self.new is DEFAULT: 1494 extra_args[self.attribute_name] = new 1495 for patching in self.additional_patchers: 1496 arg = self._exit_stack.enter_context(patching) 1497 if patching.new is DEFAULT: 1498 extra_args.update(arg) 1499 return extra_args 1500 1501 return new 1502 except: 1503 if not self.__exit__(*sys.exc_info()): 1504 raise 1505 1506 def __exit__(self, *exc_info): 1507 """Undo the patch.""" 1508 if self.is_local and self.temp_original is not DEFAULT: 1509 setattr(self.target, self.attribute, self.temp_original) 1510 else: 1511 delattr(self.target, self.attribute) 1512 if not self.create and (not hasattr(self.target, self.attribute) or 1513 self.attribute in ('__doc__', '__module__', 1514 '__defaults__', '__annotations__', 1515 '__kwdefaults__')): 1516 # needed for proxy objects like django settings 1517 setattr(self.target, self.attribute, self.temp_original) 1518 1519 del self.temp_original 1520 del self.is_local 1521 del self.target 1522 exit_stack = self._exit_stack 1523 del self._exit_stack 1524 return exit_stack.__exit__(*exc_info) 1525 1526 1527 def start(self): 1528 """Activate a patch, returning any created mock.""" 1529 result = self.__enter__() 1530 self._active_patches.append(self) 1531 return result 1532 1533 1534 def stop(self): 1535 """Stop an active patch.""" 1536 try: 1537 self._active_patches.remove(self) 1538 except ValueError: 1539 # If the patch hasn't been started this will fail 1540 return None 1541 1542 return self.__exit__(None, None, None) 1543 1544 1545 1546def _get_target(target): 1547 try: 1548 target, attribute = target.rsplit('.', 1) 1549 except (TypeError, ValueError): 1550 raise TypeError("Need a valid target to patch. You supplied: %r" % 1551 (target,)) 1552 getter = lambda: _importer(target) 1553 return getter, attribute 1554 1555 1556def _patch_object( 1557 target, attribute, new=DEFAULT, spec=None, 1558 create=False, spec_set=None, autospec=None, 1559 new_callable=None, **kwargs 1560 ): 1561 """ 1562 patch the named member (`attribute`) on an object (`target`) with a mock 1563 object. 1564 1565 `patch.object` can be used as a decorator, class decorator or a context 1566 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1567 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1568 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1569 the mock object it creates. 1570 1571 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1572 for choosing which methods to wrap. 1573 """ 1574 if type(target) is str: 1575 raise TypeError( 1576 f"{target!r} must be the actual object to be patched, not a str" 1577 ) 1578 getter = lambda: target 1579 return _patch( 1580 getter, attribute, new, spec, create, 1581 spec_set, autospec, new_callable, kwargs 1582 ) 1583 1584 1585def _patch_multiple(target, spec=None, create=False, spec_set=None, 1586 autospec=None, new_callable=None, **kwargs): 1587 """Perform multiple patches in a single call. It takes the object to be 1588 patched (either as an object or a string to fetch the object by importing) 1589 and keyword arguments for the patches:: 1590 1591 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1592 ... 1593 1594 Use `DEFAULT` as the value if you want `patch.multiple` to create 1595 mocks for you. In this case the created mocks are passed into a decorated 1596 function by keyword, and a dictionary is returned when `patch.multiple` is 1597 used as a context manager. 1598 1599 `patch.multiple` can be used as a decorator, class decorator or a context 1600 manager. The arguments `spec`, `spec_set`, `create`, 1601 `autospec` and `new_callable` have the same meaning as for `patch`. These 1602 arguments will be applied to *all* patches done by `patch.multiple`. 1603 1604 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1605 for choosing which methods to wrap. 1606 """ 1607 if type(target) is str: 1608 getter = lambda: _importer(target) 1609 else: 1610 getter = lambda: target 1611 1612 if not kwargs: 1613 raise ValueError( 1614 'Must supply at least one keyword argument with patch.multiple' 1615 ) 1616 # need to wrap in a list for python 3, where items is a view 1617 items = list(kwargs.items()) 1618 attribute, new = items[0] 1619 patcher = _patch( 1620 getter, attribute, new, spec, create, spec_set, 1621 autospec, new_callable, {} 1622 ) 1623 patcher.attribute_name = attribute 1624 for attribute, new in items[1:]: 1625 this_patcher = _patch( 1626 getter, attribute, new, spec, create, spec_set, 1627 autospec, new_callable, {} 1628 ) 1629 this_patcher.attribute_name = attribute 1630 patcher.additional_patchers.append(this_patcher) 1631 return patcher 1632 1633 1634def patch( 1635 target, new=DEFAULT, spec=None, create=False, 1636 spec_set=None, autospec=None, new_callable=None, **kwargs 1637 ): 1638 """ 1639 `patch` acts as a function decorator, class decorator or a context 1640 manager. Inside the body of the function or with statement, the `target` 1641 is patched with a `new` object. When the function/with statement exits 1642 the patch is undone. 1643 1644 If `new` is omitted, then the target is replaced with an 1645 `AsyncMock if the patched object is an async function or a 1646 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1647 omitted, the created mock is passed in as an extra argument to the 1648 decorated function. If `patch` is used as a context manager the created 1649 mock is returned by the context manager. 1650 1651 `target` should be a string in the form `'package.module.ClassName'`. The 1652 `target` is imported and the specified object replaced with the `new` 1653 object, so the `target` must be importable from the environment you are 1654 calling `patch` from. The target is imported when the decorated function 1655 is executed, not at decoration time. 1656 1657 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1658 if patch is creating one for you. 1659 1660 In addition you can pass `spec=True` or `spec_set=True`, which causes 1661 patch to pass in the object being mocked as the spec/spec_set object. 1662 1663 `new_callable` allows you to specify a different class, or callable object, 1664 that will be called to create the `new` object. By default `AsyncMock` is 1665 used for async functions and `MagicMock` for the rest. 1666 1667 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1668 then the mock will be created with a spec from the object being replaced. 1669 All attributes of the mock will also have the spec of the corresponding 1670 attribute of the object being replaced. Methods and functions being 1671 mocked will have their arguments checked and will raise a `TypeError` if 1672 they are called with the wrong signature. For mocks replacing a class, 1673 their return value (the 'instance') will have the same spec as the class. 1674 1675 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1676 arbitrary object as the spec instead of the one being replaced. 1677 1678 By default `patch` will fail to replace attributes that don't exist. If 1679 you pass in `create=True`, and the attribute doesn't exist, patch will 1680 create the attribute for you when the patched function is called, and 1681 delete it again afterwards. This is useful for writing tests against 1682 attributes that your production code creates at runtime. It is off by 1683 default because it can be dangerous. With it switched on you can write 1684 passing tests against APIs that don't actually exist! 1685 1686 Patch can be used as a `TestCase` class decorator. It works by 1687 decorating each test method in the class. This reduces the boilerplate 1688 code when your test methods share a common patchings set. `patch` finds 1689 tests by looking for method names that start with `patch.TEST_PREFIX`. 1690 By default this is `test`, which matches the way `unittest` finds tests. 1691 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1692 1693 Patch can be used as a context manager, with the with statement. Here the 1694 patching applies to the indented block after the with statement. If you 1695 use "as" then the patched object will be bound to the name after the 1696 "as"; very useful if `patch` is creating a mock object for you. 1697 1698 `patch` takes arbitrary keyword arguments. These will be passed to 1699 the `Mock` (or `new_callable`) on construction. 1700 1701 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1702 available for alternate use-cases. 1703 """ 1704 getter, attribute = _get_target(target) 1705 return _patch( 1706 getter, attribute, new, spec, create, 1707 spec_set, autospec, new_callable, kwargs 1708 ) 1709 1710 1711class _patch_dict(object): 1712 """ 1713 Patch a dictionary, or dictionary like object, and restore the dictionary 1714 to its original state after the test. 1715 1716 `in_dict` can be a dictionary or a mapping like container. If it is a 1717 mapping then it must at least support getting, setting and deleting items 1718 plus iterating over keys. 1719 1720 `in_dict` can also be a string specifying the name of the dictionary, which 1721 will then be fetched by importing it. 1722 1723 `values` can be a dictionary of values to set in the dictionary. `values` 1724 can also be an iterable of `(key, value)` pairs. 1725 1726 If `clear` is True then the dictionary will be cleared before the new 1727 values are set. 1728 1729 `patch.dict` can also be called with arbitrary keyword arguments to set 1730 values in the dictionary:: 1731 1732 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1733 ... 1734 1735 `patch.dict` can be used as a context manager, decorator or class 1736 decorator. When used as a class decorator `patch.dict` honours 1737 `patch.TEST_PREFIX` for choosing which methods to wrap. 1738 """ 1739 1740 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1741 self.in_dict = in_dict 1742 # support any argument supported by dict(...) constructor 1743 self.values = dict(values) 1744 self.values.update(kwargs) 1745 self.clear = clear 1746 self._original = None 1747 1748 1749 def __call__(self, f): 1750 if isinstance(f, type): 1751 return self.decorate_class(f) 1752 @wraps(f) 1753 def _inner(*args, **kw): 1754 self._patch_dict() 1755 try: 1756 return f(*args, **kw) 1757 finally: 1758 self._unpatch_dict() 1759 1760 return _inner 1761 1762 1763 def decorate_class(self, klass): 1764 for attr in dir(klass): 1765 attr_value = getattr(klass, attr) 1766 if (attr.startswith(patch.TEST_PREFIX) and 1767 hasattr(attr_value, "__call__")): 1768 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1769 decorated = decorator(attr_value) 1770 setattr(klass, attr, decorated) 1771 return klass 1772 1773 1774 def __enter__(self): 1775 """Patch the dict.""" 1776 self._patch_dict() 1777 return self.in_dict 1778 1779 1780 def _patch_dict(self): 1781 values = self.values 1782 if isinstance(self.in_dict, str): 1783 self.in_dict = _importer(self.in_dict) 1784 in_dict = self.in_dict 1785 clear = self.clear 1786 1787 try: 1788 original = in_dict.copy() 1789 except AttributeError: 1790 # dict like object with no copy method 1791 # must support iteration over keys 1792 original = {} 1793 for key in in_dict: 1794 original[key] = in_dict[key] 1795 self._original = original 1796 1797 if clear: 1798 _clear_dict(in_dict) 1799 1800 try: 1801 in_dict.update(values) 1802 except AttributeError: 1803 # dict like object with no update method 1804 for key in values: 1805 in_dict[key] = values[key] 1806 1807 1808 def _unpatch_dict(self): 1809 in_dict = self.in_dict 1810 original = self._original 1811 1812 _clear_dict(in_dict) 1813 1814 try: 1815 in_dict.update(original) 1816 except AttributeError: 1817 for key in original: 1818 in_dict[key] = original[key] 1819 1820 1821 def __exit__(self, *args): 1822 """Unpatch the dict.""" 1823 self._unpatch_dict() 1824 return False 1825 1826 start = __enter__ 1827 stop = __exit__ 1828 1829 1830def _clear_dict(in_dict): 1831 try: 1832 in_dict.clear() 1833 except AttributeError: 1834 keys = list(in_dict) 1835 for key in keys: 1836 del in_dict[key] 1837 1838 1839def _patch_stopall(): 1840 """Stop all active patches. LIFO to unroll nested patches.""" 1841 for patch in reversed(_patch._active_patches): 1842 patch.stop() 1843 1844 1845patch.object = _patch_object 1846patch.dict = _patch_dict 1847patch.multiple = _patch_multiple 1848patch.stopall = _patch_stopall 1849patch.TEST_PREFIX = 'test' 1850 1851magic_methods = ( 1852 "lt le gt ge eq ne " 1853 "getitem setitem delitem " 1854 "len contains iter " 1855 "hash str sizeof " 1856 "enter exit " 1857 # we added divmod and rdivmod here instead of numerics 1858 # because there is no idivmod 1859 "divmod rdivmod neg pos abs invert " 1860 "complex int float index " 1861 "round trunc floor ceil " 1862 "bool next " 1863 "fspath " 1864 "aiter " 1865) 1866 1867numerics = ( 1868 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv" 1869) 1870inplace = ' '.join('i%s' % n for n in numerics.split()) 1871right = ' '.join('r%s' % n for n in numerics.split()) 1872 1873# not including __prepare__, __instancecheck__, __subclasscheck__ 1874# (as they are metaclass methods) 1875# __del__ is not supported at all as it causes problems if it exists 1876 1877_non_defaults = { 1878 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 1879 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 1880 '__getstate__', '__setstate__', '__getformat__', '__setformat__', 1881 '__repr__', '__dir__', '__subclasses__', '__format__', 1882 '__getnewargs_ex__', 1883} 1884 1885 1886def _get_method(name, func): 1887 "Turns a callable object (like a mock) into a real function" 1888 def method(self, /, *args, **kw): 1889 return func(self, *args, **kw) 1890 method.__name__ = name 1891 return method 1892 1893 1894_magics = { 1895 '__%s__' % method for method in 1896 ' '.join([magic_methods, numerics, inplace, right]).split() 1897} 1898 1899# Magic methods used for async `with` statements 1900_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 1901# Magic methods that are only used with async calls but are synchronous functions themselves 1902_sync_async_magics = {"__aiter__"} 1903_async_magics = _async_method_magics | _sync_async_magics 1904 1905_all_sync_magics = _magics | _non_defaults 1906_all_magics = _all_sync_magics | _async_magics 1907 1908_unsupported_magics = { 1909 '__getattr__', '__setattr__', 1910 '__init__', '__new__', '__prepare__', 1911 '__instancecheck__', '__subclasscheck__', 1912 '__del__' 1913} 1914 1915_calculate_return_value = { 1916 '__hash__': lambda self: object.__hash__(self), 1917 '__str__': lambda self: object.__str__(self), 1918 '__sizeof__': lambda self: object.__sizeof__(self), 1919 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 1920} 1921 1922_return_values = { 1923 '__lt__': NotImplemented, 1924 '__gt__': NotImplemented, 1925 '__le__': NotImplemented, 1926 '__ge__': NotImplemented, 1927 '__int__': 1, 1928 '__contains__': False, 1929 '__len__': 0, 1930 '__exit__': False, 1931 '__complex__': 1j, 1932 '__float__': 1.0, 1933 '__bool__': True, 1934 '__index__': 1, 1935 '__aexit__': False, 1936} 1937 1938 1939def _get_eq(self): 1940 def __eq__(other): 1941 ret_val = self.__eq__._mock_return_value 1942 if ret_val is not DEFAULT: 1943 return ret_val 1944 if self is other: 1945 return True 1946 return NotImplemented 1947 return __eq__ 1948 1949def _get_ne(self): 1950 def __ne__(other): 1951 if self.__ne__._mock_return_value is not DEFAULT: 1952 return DEFAULT 1953 if self is other: 1954 return False 1955 return NotImplemented 1956 return __ne__ 1957 1958def _get_iter(self): 1959 def __iter__(): 1960 ret_val = self.__iter__._mock_return_value 1961 if ret_val is DEFAULT: 1962 return iter([]) 1963 # if ret_val was already an iterator, then calling iter on it should 1964 # return the iterator unchanged 1965 return iter(ret_val) 1966 return __iter__ 1967 1968def _get_async_iter(self): 1969 def __aiter__(): 1970 ret_val = self.__aiter__._mock_return_value 1971 if ret_val is DEFAULT: 1972 return _AsyncIterator(iter([])) 1973 return _AsyncIterator(iter(ret_val)) 1974 return __aiter__ 1975 1976_side_effect_methods = { 1977 '__eq__': _get_eq, 1978 '__ne__': _get_ne, 1979 '__iter__': _get_iter, 1980 '__aiter__': _get_async_iter 1981} 1982 1983 1984 1985def _set_return_value(mock, method, name): 1986 fixed = _return_values.get(name, DEFAULT) 1987 if fixed is not DEFAULT: 1988 method.return_value = fixed 1989 return 1990 1991 return_calculator = _calculate_return_value.get(name) 1992 if return_calculator is not None: 1993 return_value = return_calculator(mock) 1994 method.return_value = return_value 1995 return 1996 1997 side_effector = _side_effect_methods.get(name) 1998 if side_effector is not None: 1999 method.side_effect = side_effector(mock) 2000 2001 2002 2003class MagicMixin(Base): 2004 def __init__(self, /, *args, **kw): 2005 self._mock_set_magics() # make magic work for kwargs in init 2006 _safe_super(MagicMixin, self).__init__(*args, **kw) 2007 self._mock_set_magics() # fix magic broken by upper level init 2008 2009 2010 def _mock_set_magics(self): 2011 orig_magics = _magics | _async_method_magics 2012 these_magics = orig_magics 2013 2014 if getattr(self, "_mock_methods", None) is not None: 2015 these_magics = orig_magics.intersection(self._mock_methods) 2016 2017 remove_magics = set() 2018 remove_magics = orig_magics - these_magics 2019 2020 for entry in remove_magics: 2021 if entry in type(self).__dict__: 2022 # remove unneeded magic methods 2023 delattr(self, entry) 2024 2025 # don't overwrite existing attributes if called a second time 2026 these_magics = these_magics - set(type(self).__dict__) 2027 2028 _type = type(self) 2029 for entry in these_magics: 2030 setattr(_type, entry, MagicProxy(entry, self)) 2031 2032 2033 2034class NonCallableMagicMock(MagicMixin, NonCallableMock): 2035 """A version of `MagicMock` that isn't callable.""" 2036 def mock_add_spec(self, spec, spec_set=False): 2037 """Add a spec to a mock. `spec` can either be an object or a 2038 list of strings. Only attributes on the `spec` can be fetched as 2039 attributes from the mock. 2040 2041 If `spec_set` is True then only attributes on the spec can be set.""" 2042 self._mock_add_spec(spec, spec_set) 2043 self._mock_set_magics() 2044 2045 2046class AsyncMagicMixin(MagicMixin): 2047 def __init__(self, /, *args, **kw): 2048 self._mock_set_magics() # make magic work for kwargs in init 2049 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw) 2050 self._mock_set_magics() # fix magic broken by upper level init 2051 2052class MagicMock(MagicMixin, Mock): 2053 """ 2054 MagicMock is a subclass of Mock with default implementations 2055 of most of the magic methods. You can use MagicMock without having to 2056 configure the magic methods yourself. 2057 2058 If you use the `spec` or `spec_set` arguments then *only* magic 2059 methods that exist in the spec will be created. 2060 2061 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2062 """ 2063 def mock_add_spec(self, spec, spec_set=False): 2064 """Add a spec to a mock. `spec` can either be an object or a 2065 list of strings. Only attributes on the `spec` can be fetched as 2066 attributes from the mock. 2067 2068 If `spec_set` is True then only attributes on the spec can be set.""" 2069 self._mock_add_spec(spec, spec_set) 2070 self._mock_set_magics() 2071 2072 2073 2074class MagicProxy(Base): 2075 def __init__(self, name, parent): 2076 self.name = name 2077 self.parent = parent 2078 2079 def create_mock(self): 2080 entry = self.name 2081 parent = self.parent 2082 m = parent._get_child_mock(name=entry, _new_name=entry, 2083 _new_parent=parent) 2084 setattr(parent, entry, m) 2085 _set_return_value(parent, m, entry) 2086 return m 2087 2088 def __get__(self, obj, _type=None): 2089 return self.create_mock() 2090 2091 2092class AsyncMockMixin(Base): 2093 await_count = _delegating_property('await_count') 2094 await_args = _delegating_property('await_args') 2095 await_args_list = _delegating_property('await_args_list') 2096 2097 def __init__(self, /, *args, **kwargs): 2098 super().__init__(*args, **kwargs) 2099 # asyncio.iscoroutinefunction() checks _is_coroutine property to say if an 2100 # object is a coroutine. Without this check it looks to see if it is a 2101 # function/method, which in this case it is not (since it is an 2102 # AsyncMock). 2103 # It is set through __dict__ because when spec_set is True, this 2104 # attribute is likely undefined. 2105 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2106 self.__dict__['_mock_await_count'] = 0 2107 self.__dict__['_mock_await_args'] = None 2108 self.__dict__['_mock_await_args_list'] = _CallList() 2109 code_mock = NonCallableMock(spec_set=CodeType) 2110 code_mock.co_flags = inspect.CO_COROUTINE 2111 self.__dict__['__code__'] = code_mock 2112 2113 async def _execute_mock_call(self, /, *args, **kwargs): 2114 # This is nearly just like super(), except for sepcial handling 2115 # of coroutines 2116 2117 _call = _Call((args, kwargs), two=True) 2118 self.await_count += 1 2119 self.await_args = _call 2120 self.await_args_list.append(_call) 2121 2122 effect = self.side_effect 2123 if effect is not None: 2124 if _is_exception(effect): 2125 raise effect 2126 elif not _callable(effect): 2127 try: 2128 result = next(effect) 2129 except StopIteration: 2130 # It is impossible to propogate a StopIteration 2131 # through coroutines because of PEP 479 2132 raise StopAsyncIteration 2133 if _is_exception(result): 2134 raise result 2135 elif asyncio.iscoroutinefunction(effect): 2136 result = await effect(*args, **kwargs) 2137 else: 2138 result = effect(*args, **kwargs) 2139 2140 if result is not DEFAULT: 2141 return result 2142 2143 if self._mock_return_value is not DEFAULT: 2144 return self.return_value 2145 2146 if self._mock_wraps is not None: 2147 if asyncio.iscoroutinefunction(self._mock_wraps): 2148 return await self._mock_wraps(*args, **kwargs) 2149 return self._mock_wraps(*args, **kwargs) 2150 2151 return self.return_value 2152 2153 def assert_awaited(self): 2154 """ 2155 Assert that the mock was awaited at least once. 2156 """ 2157 if self.await_count == 0: 2158 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2159 raise AssertionError(msg) 2160 2161 def assert_awaited_once(self): 2162 """ 2163 Assert that the mock was awaited exactly once. 2164 """ 2165 if not self.await_count == 1: 2166 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2167 f" Awaited {self.await_count} times.") 2168 raise AssertionError(msg) 2169 2170 def assert_awaited_with(self, /, *args, **kwargs): 2171 """ 2172 Assert that the last await was with the specified arguments. 2173 """ 2174 if self.await_args is None: 2175 expected = self._format_mock_call_signature(args, kwargs) 2176 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2177 2178 def _error_message(): 2179 msg = self._format_mock_failure_message(args, kwargs, action='await') 2180 return msg 2181 2182 expected = self._call_matcher((args, kwargs)) 2183 actual = self._call_matcher(self.await_args) 2184 if expected != actual: 2185 cause = expected if isinstance(expected, Exception) else None 2186 raise AssertionError(_error_message()) from cause 2187 2188 def assert_awaited_once_with(self, /, *args, **kwargs): 2189 """ 2190 Assert that the mock was awaited exactly once and with the specified 2191 arguments. 2192 """ 2193 if not self.await_count == 1: 2194 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2195 f" Awaited {self.await_count} times.") 2196 raise AssertionError(msg) 2197 return self.assert_awaited_with(*args, **kwargs) 2198 2199 def assert_any_await(self, /, *args, **kwargs): 2200 """ 2201 Assert the mock has ever been awaited with the specified arguments. 2202 """ 2203 expected = self._call_matcher((args, kwargs)) 2204 actual = [self._call_matcher(c) for c in self.await_args_list] 2205 if expected not in actual: 2206 cause = expected if isinstance(expected, Exception) else None 2207 expected_string = self._format_mock_call_signature(args, kwargs) 2208 raise AssertionError( 2209 '%s await not found' % expected_string 2210 ) from cause 2211 2212 def assert_has_awaits(self, calls, any_order=False): 2213 """ 2214 Assert the mock has been awaited with the specified calls. 2215 The :attr:`await_args_list` list is checked for the awaits. 2216 2217 If `any_order` is False (the default) then the awaits must be 2218 sequential. There can be extra calls before or after the 2219 specified awaits. 2220 2221 If `any_order` is True then the awaits can be in any order, but 2222 they must all appear in :attr:`await_args_list`. 2223 """ 2224 expected = [self._call_matcher(c) for c in calls] 2225 cause = next((e for e in expected if isinstance(e, Exception)), None) 2226 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2227 if not any_order: 2228 if expected not in all_awaits: 2229 if cause is None: 2230 problem = 'Awaits not found.' 2231 else: 2232 problem = ('Error processing expected awaits.\n' 2233 'Errors: {}').format( 2234 [e if isinstance(e, Exception) else None 2235 for e in expected]) 2236 raise AssertionError( 2237 f'{problem}\n' 2238 f'Expected: {_CallList(calls)}\n' 2239 f'Actual: {self.await_args_list}' 2240 ) from cause 2241 return 2242 2243 all_awaits = list(all_awaits) 2244 2245 not_found = [] 2246 for kall in expected: 2247 try: 2248 all_awaits.remove(kall) 2249 except ValueError: 2250 not_found.append(kall) 2251 if not_found: 2252 raise AssertionError( 2253 '%r not all found in await list' % (tuple(not_found),) 2254 ) from cause 2255 2256 def assert_not_awaited(self): 2257 """ 2258 Assert that the mock was never awaited. 2259 """ 2260 if self.await_count != 0: 2261 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2262 f" Awaited {self.await_count} times.") 2263 raise AssertionError(msg) 2264 2265 def reset_mock(self, /, *args, **kwargs): 2266 """ 2267 See :func:`.Mock.reset_mock()` 2268 """ 2269 super().reset_mock(*args, **kwargs) 2270 self.await_count = 0 2271 self.await_args = None 2272 self.await_args_list = _CallList() 2273 2274 2275class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2276 """ 2277 Enhance :class:`Mock` with features allowing to mock 2278 an async function. 2279 2280 The :class:`AsyncMock` object will behave so the object is 2281 recognized as an async function, and the result of a call is an awaitable: 2282 2283 >>> mock = AsyncMock() 2284 >>> asyncio.iscoroutinefunction(mock) 2285 True 2286 >>> inspect.isawaitable(mock()) 2287 True 2288 2289 2290 The result of ``mock()`` is an async function which will have the outcome 2291 of ``side_effect`` or ``return_value``: 2292 2293 - if ``side_effect`` is a function, the async function will return the 2294 result of that function, 2295 - if ``side_effect`` is an exception, the async function will raise the 2296 exception, 2297 - if ``side_effect`` is an iterable, the async function will return the 2298 next value of the iterable, however, if the sequence of result is 2299 exhausted, ``StopIteration`` is raised immediately, 2300 - if ``side_effect`` is not defined, the async function will return the 2301 value defined by ``return_value``, hence, by default, the async function 2302 returns a new :class:`AsyncMock` object. 2303 2304 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2305 the mock async function obtained when the mock object is called will be this 2306 async function itself (and not an async function returning an async 2307 function). 2308 2309 The test author can also specify a wrapped object with ``wraps``. In this 2310 case, the :class:`Mock` object behavior is the same as with an 2311 :class:`.Mock` object: the wrapped object may have methods 2312 defined as async function functions. 2313 2314 Based on Martin Richard's asynctest project. 2315 """ 2316 2317 2318class _ANY(object): 2319 "A helper object that compares equal to everything." 2320 2321 def __eq__(self, other): 2322 return True 2323 2324 def __ne__(self, other): 2325 return False 2326 2327 def __repr__(self): 2328 return '<ANY>' 2329 2330ANY = _ANY() 2331 2332 2333 2334def _format_call_signature(name, args, kwargs): 2335 message = '%s(%%s)' % name 2336 formatted_args = '' 2337 args_string = ', '.join([repr(arg) for arg in args]) 2338 kwargs_string = ', '.join([ 2339 '%s=%r' % (key, value) for key, value in kwargs.items() 2340 ]) 2341 if args_string: 2342 formatted_args = args_string 2343 if kwargs_string: 2344 if formatted_args: 2345 formatted_args += ', ' 2346 formatted_args += kwargs_string 2347 2348 return message % formatted_args 2349 2350 2351 2352class _Call(tuple): 2353 """ 2354 A tuple for holding the results of a call to a mock, either in the form 2355 `(args, kwargs)` or `(name, args, kwargs)`. 2356 2357 If args or kwargs are empty then a call tuple will compare equal to 2358 a tuple without those values. This makes comparisons less verbose:: 2359 2360 _Call(('name', (), {})) == ('name',) 2361 _Call(('name', (1,), {})) == ('name', (1,)) 2362 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2363 2364 The `_Call` object provides a useful shortcut for comparing with call:: 2365 2366 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2367 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2368 2369 If the _Call has no name then it will match any name. 2370 """ 2371 def __new__(cls, value=(), name='', parent=None, two=False, 2372 from_kall=True): 2373 args = () 2374 kwargs = {} 2375 _len = len(value) 2376 if _len == 3: 2377 name, args, kwargs = value 2378 elif _len == 2: 2379 first, second = value 2380 if isinstance(first, str): 2381 name = first 2382 if isinstance(second, tuple): 2383 args = second 2384 else: 2385 kwargs = second 2386 else: 2387 args, kwargs = first, second 2388 elif _len == 1: 2389 value, = value 2390 if isinstance(value, str): 2391 name = value 2392 elif isinstance(value, tuple): 2393 args = value 2394 else: 2395 kwargs = value 2396 2397 if two: 2398 return tuple.__new__(cls, (args, kwargs)) 2399 2400 return tuple.__new__(cls, (name, args, kwargs)) 2401 2402 2403 def __init__(self, value=(), name=None, parent=None, two=False, 2404 from_kall=True): 2405 self._mock_name = name 2406 self._mock_parent = parent 2407 self._mock_from_kall = from_kall 2408 2409 2410 def __eq__(self, other): 2411 if other is ANY: 2412 return True 2413 try: 2414 len_other = len(other) 2415 except TypeError: 2416 return False 2417 2418 self_name = '' 2419 if len(self) == 2: 2420 self_args, self_kwargs = self 2421 else: 2422 self_name, self_args, self_kwargs = self 2423 2424 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2425 and self._mock_parent != other._mock_parent): 2426 return False 2427 2428 other_name = '' 2429 if len_other == 0: 2430 other_args, other_kwargs = (), {} 2431 elif len_other == 3: 2432 other_name, other_args, other_kwargs = other 2433 elif len_other == 1: 2434 value, = other 2435 if isinstance(value, tuple): 2436 other_args = value 2437 other_kwargs = {} 2438 elif isinstance(value, str): 2439 other_name = value 2440 other_args, other_kwargs = (), {} 2441 else: 2442 other_args = () 2443 other_kwargs = value 2444 elif len_other == 2: 2445 # could be (name, args) or (name, kwargs) or (args, kwargs) 2446 first, second = other 2447 if isinstance(first, str): 2448 other_name = first 2449 if isinstance(second, tuple): 2450 other_args, other_kwargs = second, {} 2451 else: 2452 other_args, other_kwargs = (), second 2453 else: 2454 other_args, other_kwargs = first, second 2455 else: 2456 return False 2457 2458 if self_name and other_name != self_name: 2459 return False 2460 2461 # this order is important for ANY to work! 2462 return (other_args, other_kwargs) == (self_args, self_kwargs) 2463 2464 2465 __ne__ = object.__ne__ 2466 2467 2468 def __call__(self, /, *args, **kwargs): 2469 if self._mock_name is None: 2470 return _Call(('', args, kwargs), name='()') 2471 2472 name = self._mock_name + '()' 2473 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2474 2475 2476 def __getattr__(self, attr): 2477 if self._mock_name is None: 2478 return _Call(name=attr, from_kall=False) 2479 name = '%s.%s' % (self._mock_name, attr) 2480 return _Call(name=name, parent=self, from_kall=False) 2481 2482 2483 def __getattribute__(self, attr): 2484 if attr in tuple.__dict__: 2485 raise AttributeError 2486 return tuple.__getattribute__(self, attr) 2487 2488 2489 def count(self, /, *args, **kwargs): 2490 return self.__getattr__('count')(*args, **kwargs) 2491 2492 def index(self, /, *args, **kwargs): 2493 return self.__getattr__('index')(*args, **kwargs) 2494 2495 def _get_call_arguments(self): 2496 if len(self) == 2: 2497 args, kwargs = self 2498 else: 2499 name, args, kwargs = self 2500 2501 return args, kwargs 2502 2503 @property 2504 def args(self): 2505 return self._get_call_arguments()[0] 2506 2507 @property 2508 def kwargs(self): 2509 return self._get_call_arguments()[1] 2510 2511 def __repr__(self): 2512 if not self._mock_from_kall: 2513 name = self._mock_name or 'call' 2514 if name.startswith('()'): 2515 name = 'call%s' % name 2516 return name 2517 2518 if len(self) == 2: 2519 name = 'call' 2520 args, kwargs = self 2521 else: 2522 name, args, kwargs = self 2523 if not name: 2524 name = 'call' 2525 elif not name.startswith('()'): 2526 name = 'call.%s' % name 2527 else: 2528 name = 'call%s' % name 2529 return _format_call_signature(name, args, kwargs) 2530 2531 2532 def call_list(self): 2533 """For a call object that represents multiple calls, `call_list` 2534 returns a list of all the intermediate calls as well as the 2535 final call.""" 2536 vals = [] 2537 thing = self 2538 while thing is not None: 2539 if thing._mock_from_kall: 2540 vals.append(thing) 2541 thing = thing._mock_parent 2542 return _CallList(reversed(vals)) 2543 2544 2545call = _Call(from_kall=False) 2546 2547 2548def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2549 _name=None, **kwargs): 2550 """Create a mock object using another object as a spec. Attributes on the 2551 mock will use the corresponding attribute on the `spec` object as their 2552 spec. 2553 2554 Functions or methods being mocked will have their arguments checked 2555 to check that they are called with the correct signature. 2556 2557 If `spec_set` is True then attempting to set attributes that don't exist 2558 on the spec object will raise an `AttributeError`. 2559 2560 If a class is used as a spec then the return value of the mock (the 2561 instance of the class) will have the same spec. You can use a class as the 2562 spec for an instance object by passing `instance=True`. The returned mock 2563 will only be callable if instances of the mock are callable. 2564 2565 `create_autospec` also takes arbitrary keyword arguments that are passed to 2566 the constructor of the created mock.""" 2567 if _is_list(spec): 2568 # can't pass a list instance to the mock constructor as it will be 2569 # interpreted as a list of strings 2570 spec = type(spec) 2571 2572 is_type = isinstance(spec, type) 2573 is_async_func = _is_async_func(spec) 2574 _kwargs = {'spec': spec} 2575 if spec_set: 2576 _kwargs = {'spec_set': spec} 2577 elif spec is None: 2578 # None we mock with a normal mock without a spec 2579 _kwargs = {} 2580 if _kwargs and instance: 2581 _kwargs['_spec_as_instance'] = True 2582 2583 _kwargs.update(kwargs) 2584 2585 Klass = MagicMock 2586 if inspect.isdatadescriptor(spec): 2587 # descriptors don't have a spec 2588 # because we don't know what type they return 2589 _kwargs = {} 2590 elif is_async_func: 2591 if instance: 2592 raise RuntimeError("Instance can not be True when create_autospec " 2593 "is mocking an async function") 2594 Klass = AsyncMock 2595 elif not _callable(spec): 2596 Klass = NonCallableMagicMock 2597 elif is_type and instance and not _instance_callable(spec): 2598 Klass = NonCallableMagicMock 2599 2600 _name = _kwargs.pop('name', _name) 2601 2602 _new_name = _name 2603 if _parent is None: 2604 # for a top level object no _new_name should be set 2605 _new_name = '' 2606 2607 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2608 name=_name, **_kwargs) 2609 2610 if isinstance(spec, FunctionTypes): 2611 # should only happen at the top level because we don't 2612 # recurse for functions 2613 mock = _set_signature(mock, spec) 2614 if is_async_func: 2615 _setup_async_mock(mock) 2616 else: 2617 _check_signature(spec, mock, is_type, instance) 2618 2619 if _parent is not None and not instance: 2620 _parent._mock_children[_name] = mock 2621 2622 if is_type and not instance and 'return_value' not in kwargs: 2623 mock.return_value = create_autospec(spec, spec_set, instance=True, 2624 _name='()', _parent=mock) 2625 2626 for entry in dir(spec): 2627 if _is_magic(entry): 2628 # MagicMock already does the useful magic methods for us 2629 continue 2630 2631 # XXXX do we need a better way of getting attributes without 2632 # triggering code execution (?) Probably not - we need the actual 2633 # object to mock it so we would rather trigger a property than mock 2634 # the property descriptor. Likewise we want to mock out dynamically 2635 # provided attributes. 2636 # XXXX what about attributes that raise exceptions other than 2637 # AttributeError on being fetched? 2638 # we could be resilient against it, or catch and propagate the 2639 # exception when the attribute is fetched from the mock 2640 try: 2641 original = getattr(spec, entry) 2642 except AttributeError: 2643 continue 2644 2645 kwargs = {'spec': original} 2646 if spec_set: 2647 kwargs = {'spec_set': original} 2648 2649 if not isinstance(original, FunctionTypes): 2650 new = _SpecState(original, spec_set, mock, entry, instance) 2651 mock._mock_children[entry] = new 2652 else: 2653 parent = mock 2654 if isinstance(spec, FunctionTypes): 2655 parent = mock.mock 2656 2657 skipfirst = _must_skip(spec, entry, is_type) 2658 kwargs['_eat_self'] = skipfirst 2659 if asyncio.iscoroutinefunction(original): 2660 child_klass = AsyncMock 2661 else: 2662 child_klass = MagicMock 2663 new = child_klass(parent=parent, name=entry, _new_name=entry, 2664 _new_parent=parent, 2665 **kwargs) 2666 mock._mock_children[entry] = new 2667 _check_signature(original, new, skipfirst=skipfirst) 2668 2669 # so functions created with _set_signature become instance attributes, 2670 # *plus* their underlying mock exists in _mock_children of the parent 2671 # mock. Adding to _mock_children may be unnecessary where we are also 2672 # setting as an instance attribute? 2673 if isinstance(new, FunctionTypes): 2674 setattr(mock, entry, new) 2675 2676 return mock 2677 2678 2679def _must_skip(spec, entry, is_type): 2680 """ 2681 Return whether we should skip the first argument on spec's `entry` 2682 attribute. 2683 """ 2684 if not isinstance(spec, type): 2685 if entry in getattr(spec, '__dict__', {}): 2686 # instance attribute - shouldn't skip 2687 return False 2688 spec = spec.__class__ 2689 2690 for klass in spec.__mro__: 2691 result = klass.__dict__.get(entry, DEFAULT) 2692 if result is DEFAULT: 2693 continue 2694 if isinstance(result, (staticmethod, classmethod)): 2695 return False 2696 elif isinstance(result, FunctionTypes): 2697 # Normal method => skip if looked up on type 2698 # (if looked up on instance, self is already skipped) 2699 return is_type 2700 else: 2701 return False 2702 2703 # function is a dynamically provided attribute 2704 return is_type 2705 2706 2707class _SpecState(object): 2708 2709 def __init__(self, spec, spec_set=False, parent=None, 2710 name=None, ids=None, instance=False): 2711 self.spec = spec 2712 self.ids = ids 2713 self.spec_set = spec_set 2714 self.parent = parent 2715 self.instance = instance 2716 self.name = name 2717 2718 2719FunctionTypes = ( 2720 # python function 2721 type(create_autospec), 2722 # instance method 2723 type(ANY.__eq__), 2724) 2725 2726 2727file_spec = None 2728 2729 2730def _to_stream(read_data): 2731 if isinstance(read_data, bytes): 2732 return io.BytesIO(read_data) 2733 else: 2734 return io.StringIO(read_data) 2735 2736 2737def mock_open(mock=None, read_data=''): 2738 """ 2739 A helper function to create a mock to replace the use of `open`. It works 2740 for `open` called directly or used as a context manager. 2741 2742 The `mock` argument is the mock object to configure. If `None` (the 2743 default) then a `MagicMock` will be created for you, with the API limited 2744 to methods or attributes available on standard file handles. 2745 2746 `read_data` is a string for the `read`, `readline` and `readlines` of the 2747 file handle to return. This is an empty string by default. 2748 """ 2749 _read_data = _to_stream(read_data) 2750 _state = [_read_data, None] 2751 2752 def _readlines_side_effect(*args, **kwargs): 2753 if handle.readlines.return_value is not None: 2754 return handle.readlines.return_value 2755 return _state[0].readlines(*args, **kwargs) 2756 2757 def _read_side_effect(*args, **kwargs): 2758 if handle.read.return_value is not None: 2759 return handle.read.return_value 2760 return _state[0].read(*args, **kwargs) 2761 2762 def _readline_side_effect(*args, **kwargs): 2763 yield from _iter_side_effect() 2764 while True: 2765 yield _state[0].readline(*args, **kwargs) 2766 2767 def _iter_side_effect(): 2768 if handle.readline.return_value is not None: 2769 while True: 2770 yield handle.readline.return_value 2771 for line in _state[0]: 2772 yield line 2773 2774 def _next_side_effect(): 2775 if handle.readline.return_value is not None: 2776 return handle.readline.return_value 2777 return next(_state[0]) 2778 2779 global file_spec 2780 if file_spec is None: 2781 import _io 2782 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2783 2784 if mock is None: 2785 mock = MagicMock(name='open', spec=open) 2786 2787 handle = MagicMock(spec=file_spec) 2788 handle.__enter__.return_value = handle 2789 2790 handle.write.return_value = None 2791 handle.read.return_value = None 2792 handle.readline.return_value = None 2793 handle.readlines.return_value = None 2794 2795 handle.read.side_effect = _read_side_effect 2796 _state[1] = _readline_side_effect() 2797 handle.readline.side_effect = _state[1] 2798 handle.readlines.side_effect = _readlines_side_effect 2799 handle.__iter__.side_effect = _iter_side_effect 2800 handle.__next__.side_effect = _next_side_effect 2801 2802 def reset_data(*args, **kwargs): 2803 _state[0] = _to_stream(read_data) 2804 if handle.readline.side_effect == _state[1]: 2805 # Only reset the side effect if the user hasn't overridden it. 2806 _state[1] = _readline_side_effect() 2807 handle.readline.side_effect = _state[1] 2808 return DEFAULT 2809 2810 mock.side_effect = reset_data 2811 mock.return_value = handle 2812 return mock 2813 2814 2815class PropertyMock(Mock): 2816 """ 2817 A mock intended to be used as a property, or other descriptor, on a class. 2818 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 2819 a return value when it is fetched. 2820 2821 Fetching a `PropertyMock` instance from an object calls the mock, with 2822 no args. Setting it calls the mock with the value being set. 2823 """ 2824 def _get_child_mock(self, /, **kwargs): 2825 return MagicMock(**kwargs) 2826 2827 def __get__(self, obj, obj_type=None): 2828 return self() 2829 def __set__(self, obj, val): 2830 self(val) 2831 2832 2833def seal(mock): 2834 """Disable the automatic generation of child mocks. 2835 2836 Given an input Mock, seals it to ensure no further mocks will be generated 2837 when accessing an attribute that was not already defined. 2838 2839 The operation recursively seals the mock passed in, meaning that 2840 the mock itself, any mocks generated by accessing one of its attributes, 2841 and all assigned mocks without a name or spec will be sealed. 2842 """ 2843 mock._mock_sealed = True 2844 for attr in dir(mock): 2845 try: 2846 m = getattr(mock, attr) 2847 except AttributeError: 2848 continue 2849 if not isinstance(m, NonCallableMock): 2850 continue 2851 if m._mock_new_parent is mock: 2852 seal(m) 2853 2854 2855class _AsyncIterator: 2856 """ 2857 Wraps an iterator in an asynchronous iterator. 2858 """ 2859 def __init__(self, iterator): 2860 self.iterator = iterator 2861 code_mock = NonCallableMock(spec_set=CodeType) 2862 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 2863 self.__dict__['__code__'] = code_mock 2864 2865 def __aiter__(self): 2866 return self 2867 2868 async def __anext__(self): 2869 try: 2870 return next(self.iterator) 2871 except StopIteration: 2872 pass 2873 raise StopAsyncIteration 2874