1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
6
7__all__ = (
8    'Mock',
9    'MagicMock',
10    'patch',
11    'sentinel',
12    'DEFAULT',
13    'ANY',
14    'call',
15    'create_autospec',
16    'AsyncMock',
17    'FILTER_DIR',
18    'NonCallableMock',
19    'NonCallableMagicMock',
20    'mock_open',
21    'PropertyMock',
22    'seal',
23)
24
25
26import asyncio
27import contextlib
28import io
29import inspect
30import pprint
31import sys
32import builtins
33from asyncio import iscoroutinefunction
34from types import CodeType, ModuleType, MethodType
35from unittest.util import safe_repr
36from functools import wraps, partial
37
38
39class InvalidSpecError(Exception):
40    """Indicates that an invalid value was used as a mock spec."""
41
42
43_builtins = {name for name in dir(builtins) if not name.startswith('_')}
44
45FILTER_DIR = True
46
47# Workaround for issue #12370
48# Without this, the __class__ properties wouldn't be set correctly
49_safe_super = super
50
51def _is_async_obj(obj):
52    if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
53        return False
54    if hasattr(obj, '__func__'):
55        obj = getattr(obj, '__func__')
56    return iscoroutinefunction(obj) or inspect.isawaitable(obj)
57
58
59def _is_async_func(func):
60    if getattr(func, '__code__', None):
61        return iscoroutinefunction(func)
62    else:
63        return False
64
65
66def _is_instance_mock(obj):
67    # can't use isinstance on Mock objects because they override __class__
68    # The base class for all mocks is NonCallableMock
69    return issubclass(type(obj), NonCallableMock)
70
71
72def _is_exception(obj):
73    return (
74        isinstance(obj, BaseException) or
75        isinstance(obj, type) and issubclass(obj, BaseException)
76    )
77
78
79def _extract_mock(obj):
80    # Autospecced functions will return a FunctionType with "mock" attribute
81    # which is the actual mock object that needs to be used.
82    if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
83        return obj.mock
84    else:
85        return obj
86
87
88def _get_signature_object(func, as_instance, eat_self):
89    """
90    Given an arbitrary, possibly callable object, try to create a suitable
91    signature object.
92    Return a (reduced func, signature) tuple, or None.
93    """
94    if isinstance(func, type) and not as_instance:
95        # If it's a type and should be modelled as a type, use __init__.
96        func = func.__init__
97        # Skip the `self` argument in __init__
98        eat_self = True
99    elif not isinstance(func, FunctionTypes):
100        # If we really want to model an instance of the passed type,
101        # __call__ should be looked up, not __init__.
102        try:
103            func = func.__call__
104        except AttributeError:
105            return None
106    if eat_self:
107        sig_func = partial(func, None)
108    else:
109        sig_func = func
110    try:
111        return func, inspect.signature(sig_func)
112    except ValueError:
113        # Certain callable types are not supported by inspect.signature()
114        return None
115
116
117def _check_signature(func, mock, skipfirst, instance=False):
118    sig = _get_signature_object(func, instance, skipfirst)
119    if sig is None:
120        return
121    func, sig = sig
122    def checksig(self, /, *args, **kwargs):
123        sig.bind(*args, **kwargs)
124    _copy_func_details(func, checksig)
125    type(mock)._mock_check_sig = checksig
126    type(mock).__signature__ = sig
127
128
129def _copy_func_details(func, funcopy):
130    # we explicitly don't copy func.__dict__ into this copy as it would
131    # expose original attributes that should be mocked
132    for attribute in (
133        '__name__', '__doc__', '__text_signature__',
134        '__module__', '__defaults__', '__kwdefaults__',
135    ):
136        try:
137            setattr(funcopy, attribute, getattr(func, attribute))
138        except AttributeError:
139            pass
140
141
142def _callable(obj):
143    if isinstance(obj, type):
144        return True
145    if isinstance(obj, (staticmethod, classmethod, MethodType)):
146        return _callable(obj.__func__)
147    if getattr(obj, '__call__', None) is not None:
148        return True
149    return False
150
151
152def _is_list(obj):
153    # checks for list or tuples
154    # XXXX badly named!
155    return type(obj) in (list, tuple)
156
157
158def _instance_callable(obj):
159    """Given an object, return True if the object is callable.
160    For classes, return True if instances would be callable."""
161    if not isinstance(obj, type):
162        # already an instance
163        return getattr(obj, '__call__', None) is not None
164
165    # *could* be broken by a class overriding __mro__ or __dict__ via
166    # a metaclass
167    for base in (obj,) + obj.__mro__:
168        if base.__dict__.get('__call__') is not None:
169            return True
170    return False
171
172
173def _set_signature(mock, original, instance=False):
174    # creates a function with signature (*args, **kwargs) that delegates to a
175    # mock. It still does signature checking by calling a lambda with the same
176    # signature as the original.
177
178    skipfirst = isinstance(original, type)
179    result = _get_signature_object(original, instance, skipfirst)
180    if result is None:
181        return mock
182    func, sig = result
183    def checksig(*args, **kwargs):
184        sig.bind(*args, **kwargs)
185    _copy_func_details(func, checksig)
186
187    name = original.__name__
188    if not name.isidentifier():
189        name = 'funcopy'
190    context = {'_checksig_': checksig, 'mock': mock}
191    src = """def %s(*args, **kwargs):
192    _checksig_(*args, **kwargs)
193    return mock(*args, **kwargs)""" % name
194    exec (src, context)
195    funcopy = context[name]
196    _setup_func(funcopy, mock, sig)
197    return funcopy
198
199
200def _setup_func(funcopy, mock, sig):
201    funcopy.mock = mock
202
203    def assert_called_with(*args, **kwargs):
204        return mock.assert_called_with(*args, **kwargs)
205    def assert_called(*args, **kwargs):
206        return mock.assert_called(*args, **kwargs)
207    def assert_not_called(*args, **kwargs):
208        return mock.assert_not_called(*args, **kwargs)
209    def assert_called_once(*args, **kwargs):
210        return mock.assert_called_once(*args, **kwargs)
211    def assert_called_once_with(*args, **kwargs):
212        return mock.assert_called_once_with(*args, **kwargs)
213    def assert_has_calls(*args, **kwargs):
214        return mock.assert_has_calls(*args, **kwargs)
215    def assert_any_call(*args, **kwargs):
216        return mock.assert_any_call(*args, **kwargs)
217    def reset_mock():
218        funcopy.method_calls = _CallList()
219        funcopy.mock_calls = _CallList()
220        mock.reset_mock()
221        ret = funcopy.return_value
222        if _is_instance_mock(ret) and not ret is mock:
223            ret.reset_mock()
224
225    funcopy.called = False
226    funcopy.call_count = 0
227    funcopy.call_args = None
228    funcopy.call_args_list = _CallList()
229    funcopy.method_calls = _CallList()
230    funcopy.mock_calls = _CallList()
231
232    funcopy.return_value = mock.return_value
233    funcopy.side_effect = mock.side_effect
234    funcopy._mock_children = mock._mock_children
235
236    funcopy.assert_called_with = assert_called_with
237    funcopy.assert_called_once_with = assert_called_once_with
238    funcopy.assert_has_calls = assert_has_calls
239    funcopy.assert_any_call = assert_any_call
240    funcopy.reset_mock = reset_mock
241    funcopy.assert_called = assert_called
242    funcopy.assert_not_called = assert_not_called
243    funcopy.assert_called_once = assert_called_once
244    funcopy.__signature__ = sig
245
246    mock._mock_delegate = funcopy
247
248
249def _setup_async_mock(mock):
250    mock._is_coroutine = asyncio.coroutines._is_coroutine
251    mock.await_count = 0
252    mock.await_args = None
253    mock.await_args_list = _CallList()
254
255    # Mock is not configured yet so the attributes are set
256    # to a function and then the corresponding mock helper function
257    # is called when the helper is accessed similar to _setup_func.
258    def wrapper(attr, /, *args, **kwargs):
259        return getattr(mock.mock, attr)(*args, **kwargs)
260
261    for attribute in ('assert_awaited',
262                      'assert_awaited_once',
263                      'assert_awaited_with',
264                      'assert_awaited_once_with',
265                      'assert_any_await',
266                      'assert_has_awaits',
267                      'assert_not_awaited'):
268
269        # setattr(mock, attribute, wrapper) causes late binding
270        # hence attribute will always be the last value in the loop
271        # Use partial(wrapper, attribute) to ensure the attribute is bound
272        # correctly.
273        setattr(mock, attribute, partial(wrapper, attribute))
274
275
276def _is_magic(name):
277    return '__%s__' % name[2:-2] == name
278
279
280class _SentinelObject(object):
281    "A unique, named, sentinel object."
282    def __init__(self, name):
283        self.name = name
284
285    def __repr__(self):
286        return 'sentinel.%s' % self.name
287
288    def __reduce__(self):
289        return 'sentinel.%s' % self.name
290
291
292class _Sentinel(object):
293    """Access attributes to return a named object, usable as a sentinel."""
294    def __init__(self):
295        self._sentinels = {}
296
297    def __getattr__(self, name):
298        if name == '__bases__':
299            # Without this help(unittest.mock) raises an exception
300            raise AttributeError
301        return self._sentinels.setdefault(name, _SentinelObject(name))
302
303    def __reduce__(self):
304        return 'sentinel'
305
306
307sentinel = _Sentinel()
308
309DEFAULT = sentinel.DEFAULT
310_missing = sentinel.MISSING
311_deleted = sentinel.DELETED
312
313
314_allowed_names = {
315    'return_value', '_mock_return_value', 'side_effect',
316    '_mock_side_effect', '_mock_parent', '_mock_new_parent',
317    '_mock_name', '_mock_new_name'
318}
319
320
321def _delegating_property(name):
322    _allowed_names.add(name)
323    _the_name = '_mock_' + name
324    def _get(self, name=name, _the_name=_the_name):
325        sig = self._mock_delegate
326        if sig is None:
327            return getattr(self, _the_name)
328        return getattr(sig, name)
329    def _set(self, value, name=name, _the_name=_the_name):
330        sig = self._mock_delegate
331        if sig is None:
332            self.__dict__[_the_name] = value
333        else:
334            setattr(sig, name, value)
335
336    return property(_get, _set)
337
338
339
340class _CallList(list):
341
342    def __contains__(self, value):
343        if not isinstance(value, list):
344            return list.__contains__(self, value)
345        len_value = len(value)
346        len_self = len(self)
347        if len_value > len_self:
348            return False
349
350        for i in range(0, len_self - len_value + 1):
351            sub_list = self[i:i+len_value]
352            if sub_list == value:
353                return True
354        return False
355
356    def __repr__(self):
357        return pprint.pformat(list(self))
358
359
360def _check_and_set_parent(parent, value, name, new_name):
361    value = _extract_mock(value)
362
363    if not _is_instance_mock(value):
364        return False
365    if ((value._mock_name or value._mock_new_name) or
366        (value._mock_parent is not None) or
367        (value._mock_new_parent is not None)):
368        return False
369
370    _parent = parent
371    while _parent is not None:
372        # setting a mock (value) as a child or return value of itself
373        # should not modify the mock
374        if _parent is value:
375            return False
376        _parent = _parent._mock_new_parent
377
378    if new_name:
379        value._mock_new_parent = parent
380        value._mock_new_name = new_name
381    if name:
382        value._mock_parent = parent
383        value._mock_name = name
384    return True
385
386# Internal class to identify if we wrapped an iterator object or not.
387class _MockIter(object):
388    def __init__(self, obj):
389        self.obj = iter(obj)
390    def __next__(self):
391        return next(self.obj)
392
393class Base(object):
394    _mock_return_value = DEFAULT
395    _mock_side_effect = None
396    def __init__(self, /, *args, **kwargs):
397        pass
398
399
400
401class NonCallableMock(Base):
402    """A non-callable version of `Mock`"""
403
404    def __new__(cls, /, *args, **kw):
405        # every instance has its own class
406        # so we can create magic methods on the
407        # class without stomping on other mocks
408        bases = (cls,)
409        if not issubclass(cls, AsyncMockMixin):
410            # Check if spec is an async object or function
411            bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
412            spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
413            if spec_arg is not None and _is_async_obj(spec_arg):
414                bases = (AsyncMockMixin, cls)
415        new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
416        instance = _safe_super(NonCallableMock, cls).__new__(new)
417        return instance
418
419
420    def __init__(
421            self, spec=None, wraps=None, name=None, spec_set=None,
422            parent=None, _spec_state=None, _new_name='', _new_parent=None,
423            _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
424        ):
425        if _new_parent is None:
426            _new_parent = parent
427
428        __dict__ = self.__dict__
429        __dict__['_mock_parent'] = parent
430        __dict__['_mock_name'] = name
431        __dict__['_mock_new_name'] = _new_name
432        __dict__['_mock_new_parent'] = _new_parent
433        __dict__['_mock_sealed'] = False
434
435        if spec_set is not None:
436            spec = spec_set
437            spec_set = True
438        if _eat_self is None:
439            _eat_self = parent is not None
440
441        self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
442
443        __dict__['_mock_children'] = {}
444        __dict__['_mock_wraps'] = wraps
445        __dict__['_mock_delegate'] = None
446
447        __dict__['_mock_called'] = False
448        __dict__['_mock_call_args'] = None
449        __dict__['_mock_call_count'] = 0
450        __dict__['_mock_call_args_list'] = _CallList()
451        __dict__['_mock_mock_calls'] = _CallList()
452
453        __dict__['method_calls'] = _CallList()
454        __dict__['_mock_unsafe'] = unsafe
455
456        if kwargs:
457            self.configure_mock(**kwargs)
458
459        _safe_super(NonCallableMock, self).__init__(
460            spec, wraps, name, spec_set, parent,
461            _spec_state
462        )
463
464
465    def attach_mock(self, mock, attribute):
466        """
467        Attach a mock as an attribute of this one, replacing its name and
468        parent. Calls to the attached mock will be recorded in the
469        `method_calls` and `mock_calls` attributes of this one."""
470        inner_mock = _extract_mock(mock)
471
472        inner_mock._mock_parent = None
473        inner_mock._mock_new_parent = None
474        inner_mock._mock_name = ''
475        inner_mock._mock_new_name = None
476
477        setattr(self, attribute, mock)
478
479
480    def mock_add_spec(self, spec, spec_set=False):
481        """Add a spec to a mock. `spec` can either be an object or a
482        list of strings. Only attributes on the `spec` can be fetched as
483        attributes from the mock.
484
485        If `spec_set` is True then only attributes on the spec can be set."""
486        self._mock_add_spec(spec, spec_set)
487
488
489    def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
490                       _eat_self=False):
491        _spec_class = None
492        _spec_signature = None
493        _spec_asyncs = []
494
495        for attr in dir(spec):
496            if iscoroutinefunction(getattr(spec, attr, None)):
497                _spec_asyncs.append(attr)
498
499        if spec is not None and not _is_list(spec):
500            if isinstance(spec, type):
501                _spec_class = spec
502            else:
503                _spec_class = type(spec)
504            res = _get_signature_object(spec,
505                                        _spec_as_instance, _eat_self)
506            _spec_signature = res and res[1]
507
508            spec = dir(spec)
509
510        __dict__ = self.__dict__
511        __dict__['_spec_class'] = _spec_class
512        __dict__['_spec_set'] = spec_set
513        __dict__['_spec_signature'] = _spec_signature
514        __dict__['_mock_methods'] = spec
515        __dict__['_spec_asyncs'] = _spec_asyncs
516
517    def __get_return_value(self):
518        ret = self._mock_return_value
519        if self._mock_delegate is not None:
520            ret = self._mock_delegate.return_value
521
522        if ret is DEFAULT:
523            ret = self._get_child_mock(
524                _new_parent=self, _new_name='()'
525            )
526            self.return_value = ret
527        return ret
528
529
530    def __set_return_value(self, value):
531        if self._mock_delegate is not None:
532            self._mock_delegate.return_value = value
533        else:
534            self._mock_return_value = value
535            _check_and_set_parent(self, value, None, '()')
536
537    __return_value_doc = "The value to be returned when the mock is called."
538    return_value = property(__get_return_value, __set_return_value,
539                            __return_value_doc)
540
541
542    @property
543    def __class__(self):
544        if self._spec_class is None:
545            return type(self)
546        return self._spec_class
547
548    called = _delegating_property('called')
549    call_count = _delegating_property('call_count')
550    call_args = _delegating_property('call_args')
551    call_args_list = _delegating_property('call_args_list')
552    mock_calls = _delegating_property('mock_calls')
553
554
555    def __get_side_effect(self):
556        delegated = self._mock_delegate
557        if delegated is None:
558            return self._mock_side_effect
559        sf = delegated.side_effect
560        if (sf is not None and not callable(sf)
561                and not isinstance(sf, _MockIter) and not _is_exception(sf)):
562            sf = _MockIter(sf)
563            delegated.side_effect = sf
564        return sf
565
566    def __set_side_effect(self, value):
567        value = _try_iter(value)
568        delegated = self._mock_delegate
569        if delegated is None:
570            self._mock_side_effect = value
571        else:
572            delegated.side_effect = value
573
574    side_effect = property(__get_side_effect, __set_side_effect)
575
576
577    def reset_mock(self,  visited=None,*, return_value=False, side_effect=False):
578        "Restore the mock object to its initial state."
579        if visited is None:
580            visited = []
581        if id(self) in visited:
582            return
583        visited.append(id(self))
584
585        self.called = False
586        self.call_args = None
587        self.call_count = 0
588        self.mock_calls = _CallList()
589        self.call_args_list = _CallList()
590        self.method_calls = _CallList()
591
592        if return_value:
593            self._mock_return_value = DEFAULT
594        if side_effect:
595            self._mock_side_effect = None
596
597        for child in self._mock_children.values():
598            if isinstance(child, _SpecState) or child is _deleted:
599                continue
600            child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
601
602        ret = self._mock_return_value
603        if _is_instance_mock(ret) and ret is not self:
604            ret.reset_mock(visited)
605
606
607    def configure_mock(self, /, **kwargs):
608        """Set attributes on the mock through keyword arguments.
609
610        Attributes plus return values and side effects can be set on child
611        mocks using standard dot notation and unpacking a dictionary in the
612        method call:
613
614        >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
615        >>> mock.configure_mock(**attrs)"""
616        for arg, val in sorted(kwargs.items(),
617                               # we sort on the number of dots so that
618                               # attributes are set before we set attributes on
619                               # attributes
620                               key=lambda entry: entry[0].count('.')):
621            args = arg.split('.')
622            final = args.pop()
623            obj = self
624            for entry in args:
625                obj = getattr(obj, entry)
626            setattr(obj, final, val)
627
628
629    def __getattr__(self, name):
630        if name in {'_mock_methods', '_mock_unsafe'}:
631            raise AttributeError(name)
632        elif self._mock_methods is not None:
633            if name not in self._mock_methods or name in _all_magics:
634                raise AttributeError("Mock object has no attribute %r" % name)
635        elif _is_magic(name):
636            raise AttributeError(name)
637        if not self._mock_unsafe:
638            if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')):
639                raise AttributeError(
640                    f"{name!r} is not a valid assertion. Use a spec "
641                    f"for the mock if {name!r} is meant to be an attribute.")
642
643        result = self._mock_children.get(name)
644        if result is _deleted:
645            raise AttributeError(name)
646        elif result is None:
647            wraps = None
648            if self._mock_wraps is not None:
649                # XXXX should we get the attribute without triggering code
650                # execution?
651                wraps = getattr(self._mock_wraps, name)
652
653            result = self._get_child_mock(
654                parent=self, name=name, wraps=wraps, _new_name=name,
655                _new_parent=self
656            )
657            self._mock_children[name]  = result
658
659        elif isinstance(result, _SpecState):
660            try:
661                result = create_autospec(
662                    result.spec, result.spec_set, result.instance,
663                    result.parent, result.name
664                )
665            except InvalidSpecError:
666                target_name = self.__dict__['_mock_name'] or self
667                raise InvalidSpecError(
668                    f'Cannot autospec attr {name!r} from target '
669                    f'{target_name!r} as it has already been mocked out. '
670                    f'[target={self!r}, attr={result.spec!r}]')
671            self._mock_children[name]  = result
672
673        return result
674
675
676    def _extract_mock_name(self):
677        _name_list = [self._mock_new_name]
678        _parent = self._mock_new_parent
679        last = self
680
681        dot = '.'
682        if _name_list == ['()']:
683            dot = ''
684
685        while _parent is not None:
686            last = _parent
687
688            _name_list.append(_parent._mock_new_name + dot)
689            dot = '.'
690            if _parent._mock_new_name == '()':
691                dot = ''
692
693            _parent = _parent._mock_new_parent
694
695        _name_list = list(reversed(_name_list))
696        _first = last._mock_name or 'mock'
697        if len(_name_list) > 1:
698            if _name_list[1] not in ('()', '().'):
699                _first += '.'
700        _name_list[0] = _first
701        return ''.join(_name_list)
702
703    def __repr__(self):
704        name = self._extract_mock_name()
705
706        name_string = ''
707        if name not in ('mock', 'mock.'):
708            name_string = ' name=%r' % name
709
710        spec_string = ''
711        if self._spec_class is not None:
712            spec_string = ' spec=%r'
713            if self._spec_set:
714                spec_string = ' spec_set=%r'
715            spec_string = spec_string % self._spec_class.__name__
716        return "<%s%s%s id='%s'>" % (
717            type(self).__name__,
718            name_string,
719            spec_string,
720            id(self)
721        )
722
723
724    def __dir__(self):
725        """Filter the output of `dir(mock)` to only useful members."""
726        if not FILTER_DIR:
727            return object.__dir__(self)
728
729        extras = self._mock_methods or []
730        from_type = dir(type(self))
731        from_dict = list(self.__dict__)
732        from_child_mocks = [
733            m_name for m_name, m_value in self._mock_children.items()
734            if m_value is not _deleted]
735
736        from_type = [e for e in from_type if not e.startswith('_')]
737        from_dict = [e for e in from_dict if not e.startswith('_') or
738                     _is_magic(e)]
739        return sorted(set(extras + from_type + from_dict + from_child_mocks))
740
741
742    def __setattr__(self, name, value):
743        if name in _allowed_names:
744            # property setters go through here
745            return object.__setattr__(self, name, value)
746        elif (self._spec_set and self._mock_methods is not None and
747            name not in self._mock_methods and
748            name not in self.__dict__):
749            raise AttributeError("Mock object has no attribute '%s'" % name)
750        elif name in _unsupported_magics:
751            msg = 'Attempting to set unsupported magic method %r.' % name
752            raise AttributeError(msg)
753        elif name in _all_magics:
754            if self._mock_methods is not None and name not in self._mock_methods:
755                raise AttributeError("Mock object has no attribute '%s'" % name)
756
757            if not _is_instance_mock(value):
758                setattr(type(self), name, _get_method(name, value))
759                original = value
760                value = lambda *args, **kw: original(self, *args, **kw)
761            else:
762                # only set _new_name and not name so that mock_calls is tracked
763                # but not method calls
764                _check_and_set_parent(self, value, None, name)
765                setattr(type(self), name, value)
766                self._mock_children[name] = value
767        elif name == '__class__':
768            self._spec_class = value
769            return
770        else:
771            if _check_and_set_parent(self, value, name, name):
772                self._mock_children[name] = value
773
774        if self._mock_sealed and not hasattr(self, name):
775            mock_name = f'{self._extract_mock_name()}.{name}'
776            raise AttributeError(f'Cannot set {mock_name}')
777
778        return object.__setattr__(self, name, value)
779
780
781    def __delattr__(self, name):
782        if name in _all_magics and name in type(self).__dict__:
783            delattr(type(self), name)
784            if name not in self.__dict__:
785                # for magic methods that are still MagicProxy objects and
786                # not set on the instance itself
787                return
788
789        obj = self._mock_children.get(name, _missing)
790        if name in self.__dict__:
791            _safe_super(NonCallableMock, self).__delattr__(name)
792        elif obj is _deleted:
793            raise AttributeError(name)
794        if obj is not _missing:
795            del self._mock_children[name]
796        self._mock_children[name] = _deleted
797
798
799    def _format_mock_call_signature(self, args, kwargs):
800        name = self._mock_name or 'mock'
801        return _format_call_signature(name, args, kwargs)
802
803
804    def _format_mock_failure_message(self, args, kwargs, action='call'):
805        message = 'expected %s not found.\nExpected: %s\nActual: %s'
806        expected_string = self._format_mock_call_signature(args, kwargs)
807        call_args = self.call_args
808        actual_string = self._format_mock_call_signature(*call_args)
809        return message % (action, expected_string, actual_string)
810
811
812    def _get_call_signature_from_name(self, name):
813        """
814        * If call objects are asserted against a method/function like obj.meth1
815        then there could be no name for the call object to lookup. Hence just
816        return the spec_signature of the method/function being asserted against.
817        * If the name is not empty then remove () and split by '.' to get
818        list of names to iterate through the children until a potential
819        match is found. A child mock is created only during attribute access
820        so if we get a _SpecState then no attributes of the spec were accessed
821        and can be safely exited.
822        """
823        if not name:
824            return self._spec_signature
825
826        sig = None
827        names = name.replace('()', '').split('.')
828        children = self._mock_children
829
830        for name in names:
831            child = children.get(name)
832            if child is None or isinstance(child, _SpecState):
833                break
834            else:
835                # If an autospecced object is attached using attach_mock the
836                # child would be a function with mock object as attribute from
837                # which signature has to be derived.
838                child = _extract_mock(child)
839                children = child._mock_children
840                sig = child._spec_signature
841
842        return sig
843
844
845    def _call_matcher(self, _call):
846        """
847        Given a call (or simply an (args, kwargs) tuple), return a
848        comparison key suitable for matching with other calls.
849        This is a best effort method which relies on the spec's signature,
850        if available, or falls back on the arguments themselves.
851        """
852
853        if isinstance(_call, tuple) and len(_call) > 2:
854            sig = self._get_call_signature_from_name(_call[0])
855        else:
856            sig = self._spec_signature
857
858        if sig is not None:
859            if len(_call) == 2:
860                name = ''
861                args, kwargs = _call
862            else:
863                name, args, kwargs = _call
864            try:
865                bound_call = sig.bind(*args, **kwargs)
866                return call(name, bound_call.args, bound_call.kwargs)
867            except TypeError as e:
868                return e.with_traceback(None)
869        else:
870            return _call
871
872    def assert_not_called(self):
873        """assert that the mock was never called.
874        """
875        if self.call_count != 0:
876            msg = ("Expected '%s' to not have been called. Called %s times.%s"
877                   % (self._mock_name or 'mock',
878                      self.call_count,
879                      self._calls_repr()))
880            raise AssertionError(msg)
881
882    def assert_called(self):
883        """assert that the mock was called at least once
884        """
885        if self.call_count == 0:
886            msg = ("Expected '%s' to have been called." %
887                   (self._mock_name or 'mock'))
888            raise AssertionError(msg)
889
890    def assert_called_once(self):
891        """assert that the mock was called only once.
892        """
893        if not self.call_count == 1:
894            msg = ("Expected '%s' to have been called once. Called %s times.%s"
895                   % (self._mock_name or 'mock',
896                      self.call_count,
897                      self._calls_repr()))
898            raise AssertionError(msg)
899
900    def assert_called_with(self, /, *args, **kwargs):
901        """assert that the last call was made with the specified arguments.
902
903        Raises an AssertionError if the args and keyword args passed in are
904        different to the last call to the mock."""
905        if self.call_args is None:
906            expected = self._format_mock_call_signature(args, kwargs)
907            actual = 'not called.'
908            error_message = ('expected call not found.\nExpected: %s\nActual: %s'
909                    % (expected, actual))
910            raise AssertionError(error_message)
911
912        def _error_message():
913            msg = self._format_mock_failure_message(args, kwargs)
914            return msg
915        expected = self._call_matcher(_Call((args, kwargs), two=True))
916        actual = self._call_matcher(self.call_args)
917        if actual != expected:
918            cause = expected if isinstance(expected, Exception) else None
919            raise AssertionError(_error_message()) from cause
920
921
922    def assert_called_once_with(self, /, *args, **kwargs):
923        """assert that the mock was called exactly once and that that call was
924        with the specified arguments."""
925        if not self.call_count == 1:
926            msg = ("Expected '%s' to be called once. Called %s times.%s"
927                   % (self._mock_name or 'mock',
928                      self.call_count,
929                      self._calls_repr()))
930            raise AssertionError(msg)
931        return self.assert_called_with(*args, **kwargs)
932
933
934    def assert_has_calls(self, calls, any_order=False):
935        """assert the mock has been called with the specified calls.
936        The `mock_calls` list is checked for the calls.
937
938        If `any_order` is False (the default) then the calls must be
939        sequential. There can be extra calls before or after the
940        specified calls.
941
942        If `any_order` is True then the calls can be in any order, but
943        they must all appear in `mock_calls`."""
944        expected = [self._call_matcher(c) for c in calls]
945        cause = next((e for e in expected if isinstance(e, Exception)), None)
946        all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
947        if not any_order:
948            if expected not in all_calls:
949                if cause is None:
950                    problem = 'Calls not found.'
951                else:
952                    problem = ('Error processing expected calls.\n'
953                               'Errors: {}').format(
954                                   [e if isinstance(e, Exception) else None
955                                    for e in expected])
956                raise AssertionError(
957                    f'{problem}\n'
958                    f'Expected: {_CallList(calls)}'
959                    f'{self._calls_repr(prefix="Actual").rstrip(".")}'
960                ) from cause
961            return
962
963        all_calls = list(all_calls)
964
965        not_found = []
966        for kall in expected:
967            try:
968                all_calls.remove(kall)
969            except ValueError:
970                not_found.append(kall)
971        if not_found:
972            raise AssertionError(
973                '%r does not contain all of %r in its call list, '
974                'found %r instead' % (self._mock_name or 'mock',
975                                      tuple(not_found), all_calls)
976            ) from cause
977
978
979    def assert_any_call(self, /, *args, **kwargs):
980        """assert the mock has been called with the specified arguments.
981
982        The assert passes if the mock has *ever* been called, unlike
983        `assert_called_with` and `assert_called_once_with` that only pass if
984        the call is the most recent one."""
985        expected = self._call_matcher(_Call((args, kwargs), two=True))
986        cause = expected if isinstance(expected, Exception) else None
987        actual = [self._call_matcher(c) for c in self.call_args_list]
988        if cause or expected not in _AnyComparer(actual):
989            expected_string = self._format_mock_call_signature(args, kwargs)
990            raise AssertionError(
991                '%s call not found' % expected_string
992            ) from cause
993
994
995    def _get_child_mock(self, /, **kw):
996        """Create the child mocks for attributes and return value.
997        By default child mocks will be the same type as the parent.
998        Subclasses of Mock may want to override this to customize the way
999        child mocks are made.
1000
1001        For non-callable mocks the callable variant will be used (rather than
1002        any custom subclass)."""
1003        _new_name = kw.get("_new_name")
1004        if _new_name in self.__dict__['_spec_asyncs']:
1005            return AsyncMock(**kw)
1006
1007        if self._mock_sealed:
1008            attribute = f".{kw['name']}" if "name" in kw else "()"
1009            mock_name = self._extract_mock_name() + attribute
1010            raise AttributeError(mock_name)
1011
1012        _type = type(self)
1013        if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1014            # Any asynchronous magic becomes an AsyncMock
1015            klass = AsyncMock
1016        elif issubclass(_type, AsyncMockMixin):
1017            if (_new_name in _all_sync_magics or
1018                    self._mock_methods and _new_name in self._mock_methods):
1019                # Any synchronous method on AsyncMock becomes a MagicMock
1020                klass = MagicMock
1021            else:
1022                klass = AsyncMock
1023        elif not issubclass(_type, CallableMixin):
1024            if issubclass(_type, NonCallableMagicMock):
1025                klass = MagicMock
1026            elif issubclass(_type, NonCallableMock):
1027                klass = Mock
1028        else:
1029            klass = _type.__mro__[1]
1030        return klass(**kw)
1031
1032
1033    def _calls_repr(self, prefix="Calls"):
1034        """Renders self.mock_calls as a string.
1035
1036        Example: "\nCalls: [call(1), call(2)]."
1037
1038        If self.mock_calls is empty, an empty string is returned. The
1039        output will be truncated if very long.
1040        """
1041        if not self.mock_calls:
1042            return ""
1043        return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1044
1045
1046_MOCK_SIG = inspect.signature(NonCallableMock.__init__)
1047
1048
1049class _AnyComparer(list):
1050    """A list which checks if it contains a call which may have an
1051    argument of ANY, flipping the components of item and self from
1052    their traditional locations so that ANY is guaranteed to be on
1053    the left."""
1054    def __contains__(self, item):
1055        for _call in self:
1056            assert len(item) == len(_call)
1057            if all([
1058                expected == actual
1059                for expected, actual in zip(item, _call)
1060            ]):
1061                return True
1062        return False
1063
1064
1065def _try_iter(obj):
1066    if obj is None:
1067        return obj
1068    if _is_exception(obj):
1069        return obj
1070    if _callable(obj):
1071        return obj
1072    try:
1073        return iter(obj)
1074    except TypeError:
1075        # XXXX backwards compatibility
1076        # but this will blow up on first call - so maybe we should fail early?
1077        return obj
1078
1079
1080class CallableMixin(Base):
1081
1082    def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1083                 wraps=None, name=None, spec_set=None, parent=None,
1084                 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1085        self.__dict__['_mock_return_value'] = return_value
1086        _safe_super(CallableMixin, self).__init__(
1087            spec, wraps, name, spec_set, parent,
1088            _spec_state, _new_name, _new_parent, **kwargs
1089        )
1090
1091        self.side_effect = side_effect
1092
1093
1094    def _mock_check_sig(self, /, *args, **kwargs):
1095        # stub method that can be replaced with one with a specific signature
1096        pass
1097
1098
1099    def __call__(self, /, *args, **kwargs):
1100        # can't use self in-case a function / method we are mocking uses self
1101        # in the signature
1102        self._mock_check_sig(*args, **kwargs)
1103        self._increment_mock_call(*args, **kwargs)
1104        return self._mock_call(*args, **kwargs)
1105
1106
1107    def _mock_call(self, /, *args, **kwargs):
1108        return self._execute_mock_call(*args, **kwargs)
1109
1110    def _increment_mock_call(self, /, *args, **kwargs):
1111        self.called = True
1112        self.call_count += 1
1113
1114        # handle call_args
1115        # needs to be set here so assertions on call arguments pass before
1116        # execution in the case of awaited calls
1117        _call = _Call((args, kwargs), two=True)
1118        self.call_args = _call
1119        self.call_args_list.append(_call)
1120
1121        # initial stuff for method_calls:
1122        do_method_calls = self._mock_parent is not None
1123        method_call_name = self._mock_name
1124
1125        # initial stuff for mock_calls:
1126        mock_call_name = self._mock_new_name
1127        is_a_call = mock_call_name == '()'
1128        self.mock_calls.append(_Call(('', args, kwargs)))
1129
1130        # follow up the chain of mocks:
1131        _new_parent = self._mock_new_parent
1132        while _new_parent is not None:
1133
1134            # handle method_calls:
1135            if do_method_calls:
1136                _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1137                do_method_calls = _new_parent._mock_parent is not None
1138                if do_method_calls:
1139                    method_call_name = _new_parent._mock_name + '.' + method_call_name
1140
1141            # handle mock_calls:
1142            this_mock_call = _Call((mock_call_name, args, kwargs))
1143            _new_parent.mock_calls.append(this_mock_call)
1144
1145            if _new_parent._mock_new_name:
1146                if is_a_call:
1147                    dot = ''
1148                else:
1149                    dot = '.'
1150                is_a_call = _new_parent._mock_new_name == '()'
1151                mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1152
1153            # follow the parental chain:
1154            _new_parent = _new_parent._mock_new_parent
1155
1156    def _execute_mock_call(self, /, *args, **kwargs):
1157        # separate from _increment_mock_call so that awaited functions are
1158        # executed separately from their call, also AsyncMock overrides this method
1159
1160        effect = self.side_effect
1161        if effect is not None:
1162            if _is_exception(effect):
1163                raise effect
1164            elif not _callable(effect):
1165                result = next(effect)
1166                if _is_exception(result):
1167                    raise result
1168            else:
1169                result = effect(*args, **kwargs)
1170
1171            if result is not DEFAULT:
1172                return result
1173
1174        if self._mock_return_value is not DEFAULT:
1175            return self.return_value
1176
1177        if self._mock_wraps is not None:
1178            return self._mock_wraps(*args, **kwargs)
1179
1180        return self.return_value
1181
1182
1183
1184class Mock(CallableMixin, NonCallableMock):
1185    """
1186    Create a new `Mock` object. `Mock` takes several optional arguments
1187    that specify the behaviour of the Mock object:
1188
1189    * `spec`: This can be either a list of strings or an existing object (a
1190      class or instance) that acts as the specification for the mock object. If
1191      you pass in an object then a list of strings is formed by calling dir on
1192      the object (excluding unsupported magic attributes and methods). Accessing
1193      any attribute not in this list will raise an `AttributeError`.
1194
1195      If `spec` is an object (rather than a list of strings) then
1196      `mock.__class__` returns the class of the spec object. This allows mocks
1197      to pass `isinstance` tests.
1198
1199    * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1200      or get an attribute on the mock that isn't on the object passed as
1201      `spec_set` will raise an `AttributeError`.
1202
1203    * `side_effect`: A function to be called whenever the Mock is called. See
1204      the `side_effect` attribute. Useful for raising exceptions or
1205      dynamically changing return values. The function is called with the same
1206      arguments as the mock, and unless it returns `DEFAULT`, the return
1207      value of this function is used as the return value.
1208
1209      If `side_effect` is an iterable then each call to the mock will return
1210      the next value from the iterable. If any of the members of the iterable
1211      are exceptions they will be raised instead of returned.
1212
1213    * `return_value`: The value returned when the mock is called. By default
1214      this is a new Mock (created on first access). See the
1215      `return_value` attribute.
1216
1217    * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1218      calling the Mock will pass the call through to the wrapped object
1219      (returning the real result). Attribute access on the mock will return a
1220      Mock object that wraps the corresponding attribute of the wrapped object
1221      (so attempting to access an attribute that doesn't exist will raise an
1222      `AttributeError`).
1223
1224      If the mock has an explicit `return_value` set then calls are not passed
1225      to the wrapped object and the `return_value` is returned instead.
1226
1227    * `name`: If the mock has a name then it will be used in the repr of the
1228      mock. This can be useful for debugging. The name is propagated to child
1229      mocks.
1230
1231    Mocks can also be called with arbitrary keyword arguments. These will be
1232    used to set attributes on the mock after it is created.
1233    """
1234
1235
1236def _dot_lookup(thing, comp, import_path):
1237    try:
1238        return getattr(thing, comp)
1239    except AttributeError:
1240        __import__(import_path)
1241        return getattr(thing, comp)
1242
1243
1244def _importer(target):
1245    components = target.split('.')
1246    import_path = components.pop(0)
1247    thing = __import__(import_path)
1248
1249    for comp in components:
1250        import_path += ".%s" % comp
1251        thing = _dot_lookup(thing, comp, import_path)
1252    return thing
1253
1254
1255# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1256# they don't contain common misspellings of arguments related to autospeccing.
1257def _check_spec_arg_typos(kwargs_to_check):
1258    typos = ("autospect", "auto_spec", "set_spec")
1259    for typo in typos:
1260        if typo in kwargs_to_check:
1261            raise RuntimeError(
1262                f"{typo!r} might be a typo; use unsafe=True if this is intended"
1263            )
1264
1265
1266class _patch(object):
1267
1268    attribute_name = None
1269    _active_patches = []
1270
1271    def __init__(
1272            self, getter, attribute, new, spec, create,
1273            spec_set, autospec, new_callable, kwargs, *, unsafe=False
1274        ):
1275        if new_callable is not None:
1276            if new is not DEFAULT:
1277                raise ValueError(
1278                    "Cannot use 'new' and 'new_callable' together"
1279                )
1280            if autospec is not None:
1281                raise ValueError(
1282                    "Cannot use 'autospec' and 'new_callable' together"
1283                )
1284        if not unsafe:
1285            _check_spec_arg_typos(kwargs)
1286        if _is_instance_mock(spec):
1287            raise InvalidSpecError(
1288                f'Cannot spec attr {attribute!r} as the spec '
1289                f'has already been mocked out. [spec={spec!r}]')
1290        if _is_instance_mock(spec_set):
1291            raise InvalidSpecError(
1292                f'Cannot spec attr {attribute!r} as the spec_set '
1293                f'target has already been mocked out. [spec_set={spec_set!r}]')
1294
1295        self.getter = getter
1296        self.attribute = attribute
1297        self.new = new
1298        self.new_callable = new_callable
1299        self.spec = spec
1300        self.create = create
1301        self.has_local = False
1302        self.spec_set = spec_set
1303        self.autospec = autospec
1304        self.kwargs = kwargs
1305        self.additional_patchers = []
1306
1307
1308    def copy(self):
1309        patcher = _patch(
1310            self.getter, self.attribute, self.new, self.spec,
1311            self.create, self.spec_set,
1312            self.autospec, self.new_callable, self.kwargs
1313        )
1314        patcher.attribute_name = self.attribute_name
1315        patcher.additional_patchers = [
1316            p.copy() for p in self.additional_patchers
1317        ]
1318        return patcher
1319
1320
1321    def __call__(self, func):
1322        if isinstance(func, type):
1323            return self.decorate_class(func)
1324        if inspect.iscoroutinefunction(func):
1325            return self.decorate_async_callable(func)
1326        return self.decorate_callable(func)
1327
1328
1329    def decorate_class(self, klass):
1330        for attr in dir(klass):
1331            if not attr.startswith(patch.TEST_PREFIX):
1332                continue
1333
1334            attr_value = getattr(klass, attr)
1335            if not hasattr(attr_value, "__call__"):
1336                continue
1337
1338            patcher = self.copy()
1339            setattr(klass, attr, patcher(attr_value))
1340        return klass
1341
1342
1343    @contextlib.contextmanager
1344    def decoration_helper(self, patched, args, keywargs):
1345        extra_args = []
1346        with contextlib.ExitStack() as exit_stack:
1347            for patching in patched.patchings:
1348                arg = exit_stack.enter_context(patching)
1349                if patching.attribute_name is not None:
1350                    keywargs.update(arg)
1351                elif patching.new is DEFAULT:
1352                    extra_args.append(arg)
1353
1354            args += tuple(extra_args)
1355            yield (args, keywargs)
1356
1357
1358    def decorate_callable(self, func):
1359        # NB. Keep the method in sync with decorate_async_callable()
1360        if hasattr(func, 'patchings'):
1361            func.patchings.append(self)
1362            return func
1363
1364        @wraps(func)
1365        def patched(*args, **keywargs):
1366            with self.decoration_helper(patched,
1367                                        args,
1368                                        keywargs) as (newargs, newkeywargs):
1369                return func(*newargs, **newkeywargs)
1370
1371        patched.patchings = [self]
1372        return patched
1373
1374
1375    def decorate_async_callable(self, func):
1376        # NB. Keep the method in sync with decorate_callable()
1377        if hasattr(func, 'patchings'):
1378            func.patchings.append(self)
1379            return func
1380
1381        @wraps(func)
1382        async def patched(*args, **keywargs):
1383            with self.decoration_helper(patched,
1384                                        args,
1385                                        keywargs) as (newargs, newkeywargs):
1386                return await func(*newargs, **newkeywargs)
1387
1388        patched.patchings = [self]
1389        return patched
1390
1391
1392    def get_original(self):
1393        target = self.getter()
1394        name = self.attribute
1395
1396        original = DEFAULT
1397        local = False
1398
1399        try:
1400            original = target.__dict__[name]
1401        except (AttributeError, KeyError):
1402            original = getattr(target, name, DEFAULT)
1403        else:
1404            local = True
1405
1406        if name in _builtins and isinstance(target, ModuleType):
1407            self.create = True
1408
1409        if not self.create and original is DEFAULT:
1410            raise AttributeError(
1411                "%s does not have the attribute %r" % (target, name)
1412            )
1413        return original, local
1414
1415
1416    def __enter__(self):
1417        """Perform the patch."""
1418        new, spec, spec_set = self.new, self.spec, self.spec_set
1419        autospec, kwargs = self.autospec, self.kwargs
1420        new_callable = self.new_callable
1421        self.target = self.getter()
1422
1423        # normalise False to None
1424        if spec is False:
1425            spec = None
1426        if spec_set is False:
1427            spec_set = None
1428        if autospec is False:
1429            autospec = None
1430
1431        if spec is not None and autospec is not None:
1432            raise TypeError("Can't specify spec and autospec")
1433        if ((spec is not None or autospec is not None) and
1434            spec_set not in (True, None)):
1435            raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1436
1437        original, local = self.get_original()
1438
1439        if new is DEFAULT and autospec is None:
1440            inherit = False
1441            if spec is True:
1442                # set spec to the object we are replacing
1443                spec = original
1444                if spec_set is True:
1445                    spec_set = original
1446                    spec = None
1447            elif spec is not None:
1448                if spec_set is True:
1449                    spec_set = spec
1450                    spec = None
1451            elif spec_set is True:
1452                spec_set = original
1453
1454            if spec is not None or spec_set is not None:
1455                if original is DEFAULT:
1456                    raise TypeError("Can't use 'spec' with create=True")
1457                if isinstance(original, type):
1458                    # If we're patching out a class and there is a spec
1459                    inherit = True
1460            if spec is None and _is_async_obj(original):
1461                Klass = AsyncMock
1462            else:
1463                Klass = MagicMock
1464            _kwargs = {}
1465            if new_callable is not None:
1466                Klass = new_callable
1467            elif spec is not None or spec_set is not None:
1468                this_spec = spec
1469                if spec_set is not None:
1470                    this_spec = spec_set
1471                if _is_list(this_spec):
1472                    not_callable = '__call__' not in this_spec
1473                else:
1474                    not_callable = not callable(this_spec)
1475                if _is_async_obj(this_spec):
1476                    Klass = AsyncMock
1477                elif not_callable:
1478                    Klass = NonCallableMagicMock
1479
1480            if spec is not None:
1481                _kwargs['spec'] = spec
1482            if spec_set is not None:
1483                _kwargs['spec_set'] = spec_set
1484
1485            # add a name to mocks
1486            if (isinstance(Klass, type) and
1487                issubclass(Klass, NonCallableMock) and self.attribute):
1488                _kwargs['name'] = self.attribute
1489
1490            _kwargs.update(kwargs)
1491            new = Klass(**_kwargs)
1492
1493            if inherit and _is_instance_mock(new):
1494                # we can only tell if the instance should be callable if the
1495                # spec is not a list
1496                this_spec = spec
1497                if spec_set is not None:
1498                    this_spec = spec_set
1499                if (not _is_list(this_spec) and not
1500                    _instance_callable(this_spec)):
1501                    Klass = NonCallableMagicMock
1502
1503                _kwargs.pop('name')
1504                new.return_value = Klass(_new_parent=new, _new_name='()',
1505                                         **_kwargs)
1506        elif autospec is not None:
1507            # spec is ignored, new *must* be default, spec_set is treated
1508            # as a boolean. Should we check spec is not None and that spec_set
1509            # is a bool?
1510            if new is not DEFAULT:
1511                raise TypeError(
1512                    "autospec creates the mock for you. Can't specify "
1513                    "autospec and new."
1514                )
1515            if original is DEFAULT:
1516                raise TypeError("Can't use 'autospec' with create=True")
1517            spec_set = bool(spec_set)
1518            if autospec is True:
1519                autospec = original
1520
1521            if _is_instance_mock(self.target):
1522                raise InvalidSpecError(
1523                    f'Cannot autospec attr {self.attribute!r} as the patch '
1524                    f'target has already been mocked out. '
1525                    f'[target={self.target!r}, attr={autospec!r}]')
1526            if _is_instance_mock(autospec):
1527                target_name = getattr(self.target, '__name__', self.target)
1528                raise InvalidSpecError(
1529                    f'Cannot autospec attr {self.attribute!r} from target '
1530                    f'{target_name!r} as it has already been mocked out. '
1531                    f'[target={self.target!r}, attr={autospec!r}]')
1532
1533            new = create_autospec(autospec, spec_set=spec_set,
1534                                  _name=self.attribute, **kwargs)
1535        elif kwargs:
1536            # can't set keyword args when we aren't creating the mock
1537            # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1538            raise TypeError("Can't pass kwargs to a mock we aren't creating")
1539
1540        new_attr = new
1541
1542        self.temp_original = original
1543        self.is_local = local
1544        self._exit_stack = contextlib.ExitStack()
1545        try:
1546            setattr(self.target, self.attribute, new_attr)
1547            if self.attribute_name is not None:
1548                extra_args = {}
1549                if self.new is DEFAULT:
1550                    extra_args[self.attribute_name] =  new
1551                for patching in self.additional_patchers:
1552                    arg = self._exit_stack.enter_context(patching)
1553                    if patching.new is DEFAULT:
1554                        extra_args.update(arg)
1555                return extra_args
1556
1557            return new
1558        except:
1559            if not self.__exit__(*sys.exc_info()):
1560                raise
1561
1562    def __exit__(self, *exc_info):
1563        """Undo the patch."""
1564        if self.is_local and self.temp_original is not DEFAULT:
1565            setattr(self.target, self.attribute, self.temp_original)
1566        else:
1567            delattr(self.target, self.attribute)
1568            if not self.create and (not hasattr(self.target, self.attribute) or
1569                        self.attribute in ('__doc__', '__module__',
1570                                           '__defaults__', '__annotations__',
1571                                           '__kwdefaults__')):
1572                # needed for proxy objects like django settings
1573                setattr(self.target, self.attribute, self.temp_original)
1574
1575        del self.temp_original
1576        del self.is_local
1577        del self.target
1578        exit_stack = self._exit_stack
1579        del self._exit_stack
1580        return exit_stack.__exit__(*exc_info)
1581
1582
1583    def start(self):
1584        """Activate a patch, returning any created mock."""
1585        result = self.__enter__()
1586        self._active_patches.append(self)
1587        return result
1588
1589
1590    def stop(self):
1591        """Stop an active patch."""
1592        try:
1593            self._active_patches.remove(self)
1594        except ValueError:
1595            # If the patch hasn't been started this will fail
1596            return None
1597
1598        return self.__exit__(None, None, None)
1599
1600
1601
1602def _get_target(target):
1603    try:
1604        target, attribute = target.rsplit('.', 1)
1605    except (TypeError, ValueError):
1606        raise TypeError("Need a valid target to patch. You supplied: %r" %
1607                        (target,))
1608    getter = lambda: _importer(target)
1609    return getter, attribute
1610
1611
1612def _patch_object(
1613        target, attribute, new=DEFAULT, spec=None,
1614        create=False, spec_set=None, autospec=None,
1615        new_callable=None, *, unsafe=False, **kwargs
1616    ):
1617    """
1618    patch the named member (`attribute`) on an object (`target`) with a mock
1619    object.
1620
1621    `patch.object` can be used as a decorator, class decorator or a context
1622    manager. Arguments `new`, `spec`, `create`, `spec_set`,
1623    `autospec` and `new_callable` have the same meaning as for `patch`. Like
1624    `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1625    the mock object it creates.
1626
1627    When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1628    for choosing which methods to wrap.
1629    """
1630    if type(target) is str:
1631        raise TypeError(
1632            f"{target!r} must be the actual object to be patched, not a str"
1633        )
1634    getter = lambda: target
1635    return _patch(
1636        getter, attribute, new, spec, create,
1637        spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1638    )
1639
1640
1641def _patch_multiple(target, spec=None, create=False, spec_set=None,
1642                    autospec=None, new_callable=None, **kwargs):
1643    """Perform multiple patches in a single call. It takes the object to be
1644    patched (either as an object or a string to fetch the object by importing)
1645    and keyword arguments for the patches::
1646
1647        with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1648            ...
1649
1650    Use `DEFAULT` as the value if you want `patch.multiple` to create
1651    mocks for you. In this case the created mocks are passed into a decorated
1652    function by keyword, and a dictionary is returned when `patch.multiple` is
1653    used as a context manager.
1654
1655    `patch.multiple` can be used as a decorator, class decorator or a context
1656    manager. The arguments `spec`, `spec_set`, `create`,
1657    `autospec` and `new_callable` have the same meaning as for `patch`. These
1658    arguments will be applied to *all* patches done by `patch.multiple`.
1659
1660    When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1661    for choosing which methods to wrap.
1662    """
1663    if type(target) is str:
1664        getter = lambda: _importer(target)
1665    else:
1666        getter = lambda: target
1667
1668    if not kwargs:
1669        raise ValueError(
1670            'Must supply at least one keyword argument with patch.multiple'
1671        )
1672    # need to wrap in a list for python 3, where items is a view
1673    items = list(kwargs.items())
1674    attribute, new = items[0]
1675    patcher = _patch(
1676        getter, attribute, new, spec, create, spec_set,
1677        autospec, new_callable, {}
1678    )
1679    patcher.attribute_name = attribute
1680    for attribute, new in items[1:]:
1681        this_patcher = _patch(
1682            getter, attribute, new, spec, create, spec_set,
1683            autospec, new_callable, {}
1684        )
1685        this_patcher.attribute_name = attribute
1686        patcher.additional_patchers.append(this_patcher)
1687    return patcher
1688
1689
1690def patch(
1691        target, new=DEFAULT, spec=None, create=False,
1692        spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1693    ):
1694    """
1695    `patch` acts as a function decorator, class decorator or a context
1696    manager. Inside the body of the function or with statement, the `target`
1697    is patched with a `new` object. When the function/with statement exits
1698    the patch is undone.
1699
1700    If `new` is omitted, then the target is replaced with an
1701    `AsyncMock if the patched object is an async function or a
1702    `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1703    omitted, the created mock is passed in as an extra argument to the
1704    decorated function. If `patch` is used as a context manager the created
1705    mock is returned by the context manager.
1706
1707    `target` should be a string in the form `'package.module.ClassName'`. The
1708    `target` is imported and the specified object replaced with the `new`
1709    object, so the `target` must be importable from the environment you are
1710    calling `patch` from. The target is imported when the decorated function
1711    is executed, not at decoration time.
1712
1713    The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1714    if patch is creating one for you.
1715
1716    In addition you can pass `spec=True` or `spec_set=True`, which causes
1717    patch to pass in the object being mocked as the spec/spec_set object.
1718
1719    `new_callable` allows you to specify a different class, or callable object,
1720    that will be called to create the `new` object. By default `AsyncMock` is
1721    used for async functions and `MagicMock` for the rest.
1722
1723    A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1724    then the mock will be created with a spec from the object being replaced.
1725    All attributes of the mock will also have the spec of the corresponding
1726    attribute of the object being replaced. Methods and functions being
1727    mocked will have their arguments checked and will raise a `TypeError` if
1728    they are called with the wrong signature. For mocks replacing a class,
1729    their return value (the 'instance') will have the same spec as the class.
1730
1731    Instead of `autospec=True` you can pass `autospec=some_object` to use an
1732    arbitrary object as the spec instead of the one being replaced.
1733
1734    By default `patch` will fail to replace attributes that don't exist. If
1735    you pass in `create=True`, and the attribute doesn't exist, patch will
1736    create the attribute for you when the patched function is called, and
1737    delete it again afterwards. This is useful for writing tests against
1738    attributes that your production code creates at runtime. It is off by
1739    default because it can be dangerous. With it switched on you can write
1740    passing tests against APIs that don't actually exist!
1741
1742    Patch can be used as a `TestCase` class decorator. It works by
1743    decorating each test method in the class. This reduces the boilerplate
1744    code when your test methods share a common patchings set. `patch` finds
1745    tests by looking for method names that start with `patch.TEST_PREFIX`.
1746    By default this is `test`, which matches the way `unittest` finds tests.
1747    You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1748
1749    Patch can be used as a context manager, with the with statement. Here the
1750    patching applies to the indented block after the with statement. If you
1751    use "as" then the patched object will be bound to the name after the
1752    "as"; very useful if `patch` is creating a mock object for you.
1753
1754    Patch will raise a `RuntimeError` if passed some common misspellings of
1755    the arguments autospec and spec_set. Pass the argument `unsafe` with the
1756    value True to disable that check.
1757
1758    `patch` takes arbitrary keyword arguments. These will be passed to
1759    `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1760    otherwise or to `new_callable` if specified.
1761
1762    `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1763    available for alternate use-cases.
1764    """
1765    getter, attribute = _get_target(target)
1766    return _patch(
1767        getter, attribute, new, spec, create,
1768        spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1769    )
1770
1771
1772class _patch_dict(object):
1773    """
1774    Patch a dictionary, or dictionary like object, and restore the dictionary
1775    to its original state after the test.
1776
1777    `in_dict` can be a dictionary or a mapping like container. If it is a
1778    mapping then it must at least support getting, setting and deleting items
1779    plus iterating over keys.
1780
1781    `in_dict` can also be a string specifying the name of the dictionary, which
1782    will then be fetched by importing it.
1783
1784    `values` can be a dictionary of values to set in the dictionary. `values`
1785    can also be an iterable of `(key, value)` pairs.
1786
1787    If `clear` is True then the dictionary will be cleared before the new
1788    values are set.
1789
1790    `patch.dict` can also be called with arbitrary keyword arguments to set
1791    values in the dictionary::
1792
1793        with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1794            ...
1795
1796    `patch.dict` can be used as a context manager, decorator or class
1797    decorator. When used as a class decorator `patch.dict` honours
1798    `patch.TEST_PREFIX` for choosing which methods to wrap.
1799    """
1800
1801    def __init__(self, in_dict, values=(), clear=False, **kwargs):
1802        self.in_dict = in_dict
1803        # support any argument supported by dict(...) constructor
1804        self.values = dict(values)
1805        self.values.update(kwargs)
1806        self.clear = clear
1807        self._original = None
1808
1809
1810    def __call__(self, f):
1811        if isinstance(f, type):
1812            return self.decorate_class(f)
1813        @wraps(f)
1814        def _inner(*args, **kw):
1815            self._patch_dict()
1816            try:
1817                return f(*args, **kw)
1818            finally:
1819                self._unpatch_dict()
1820
1821        return _inner
1822
1823
1824    def decorate_class(self, klass):
1825        for attr in dir(klass):
1826            attr_value = getattr(klass, attr)
1827            if (attr.startswith(patch.TEST_PREFIX) and
1828                 hasattr(attr_value, "__call__")):
1829                decorator = _patch_dict(self.in_dict, self.values, self.clear)
1830                decorated = decorator(attr_value)
1831                setattr(klass, attr, decorated)
1832        return klass
1833
1834
1835    def __enter__(self):
1836        """Patch the dict."""
1837        self._patch_dict()
1838        return self.in_dict
1839
1840
1841    def _patch_dict(self):
1842        values = self.values
1843        if isinstance(self.in_dict, str):
1844            self.in_dict = _importer(self.in_dict)
1845        in_dict = self.in_dict
1846        clear = self.clear
1847
1848        try:
1849            original = in_dict.copy()
1850        except AttributeError:
1851            # dict like object with no copy method
1852            # must support iteration over keys
1853            original = {}
1854            for key in in_dict:
1855                original[key] = in_dict[key]
1856        self._original = original
1857
1858        if clear:
1859            _clear_dict(in_dict)
1860
1861        try:
1862            in_dict.update(values)
1863        except AttributeError:
1864            # dict like object with no update method
1865            for key in values:
1866                in_dict[key] = values[key]
1867
1868
1869    def _unpatch_dict(self):
1870        in_dict = self.in_dict
1871        original = self._original
1872
1873        _clear_dict(in_dict)
1874
1875        try:
1876            in_dict.update(original)
1877        except AttributeError:
1878            for key in original:
1879                in_dict[key] = original[key]
1880
1881
1882    def __exit__(self, *args):
1883        """Unpatch the dict."""
1884        if self._original is not None:
1885            self._unpatch_dict()
1886        return False
1887
1888
1889    def start(self):
1890        """Activate a patch, returning any created mock."""
1891        result = self.__enter__()
1892        _patch._active_patches.append(self)
1893        return result
1894
1895
1896    def stop(self):
1897        """Stop an active patch."""
1898        try:
1899            _patch._active_patches.remove(self)
1900        except ValueError:
1901            # If the patch hasn't been started this will fail
1902            return None
1903
1904        return self.__exit__(None, None, None)
1905
1906
1907def _clear_dict(in_dict):
1908    try:
1909        in_dict.clear()
1910    except AttributeError:
1911        keys = list(in_dict)
1912        for key in keys:
1913            del in_dict[key]
1914
1915
1916def _patch_stopall():
1917    """Stop all active patches. LIFO to unroll nested patches."""
1918    for patch in reversed(_patch._active_patches):
1919        patch.stop()
1920
1921
1922patch.object = _patch_object
1923patch.dict = _patch_dict
1924patch.multiple = _patch_multiple
1925patch.stopall = _patch_stopall
1926patch.TEST_PREFIX = 'test'
1927
1928magic_methods = (
1929    "lt le gt ge eq ne "
1930    "getitem setitem delitem "
1931    "len contains iter "
1932    "hash str sizeof "
1933    "enter exit "
1934    # we added divmod and rdivmod here instead of numerics
1935    # because there is no idivmod
1936    "divmod rdivmod neg pos abs invert "
1937    "complex int float index "
1938    "round trunc floor ceil "
1939    "bool next "
1940    "fspath "
1941    "aiter "
1942)
1943
1944numerics = (
1945    "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv"
1946)
1947inplace = ' '.join('i%s' % n for n in numerics.split())
1948right = ' '.join('r%s' % n for n in numerics.split())
1949
1950# not including __prepare__, __instancecheck__, __subclasscheck__
1951# (as they are metaclass methods)
1952# __del__ is not supported at all as it causes problems if it exists
1953
1954_non_defaults = {
1955    '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1956    '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1957    '__getstate__', '__setstate__', '__getformat__', '__setformat__',
1958    '__repr__', '__dir__', '__subclasses__', '__format__',
1959    '__getnewargs_ex__',
1960}
1961
1962
1963def _get_method(name, func):
1964    "Turns a callable object (like a mock) into a real function"
1965    def method(self, /, *args, **kw):
1966        return func(self, *args, **kw)
1967    method.__name__ = name
1968    return method
1969
1970
1971_magics = {
1972    '__%s__' % method for method in
1973    ' '.join([magic_methods, numerics, inplace, right]).split()
1974}
1975
1976# Magic methods used for async `with` statements
1977_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
1978# Magic methods that are only used with async calls but are synchronous functions themselves
1979_sync_async_magics = {"__aiter__"}
1980_async_magics = _async_method_magics | _sync_async_magics
1981
1982_all_sync_magics = _magics | _non_defaults
1983_all_magics = _all_sync_magics | _async_magics
1984
1985_unsupported_magics = {
1986    '__getattr__', '__setattr__',
1987    '__init__', '__new__', '__prepare__',
1988    '__instancecheck__', '__subclasscheck__',
1989    '__del__'
1990}
1991
1992_calculate_return_value = {
1993    '__hash__': lambda self: object.__hash__(self),
1994    '__str__': lambda self: object.__str__(self),
1995    '__sizeof__': lambda self: object.__sizeof__(self),
1996    '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
1997}
1998
1999_return_values = {
2000    '__lt__': NotImplemented,
2001    '__gt__': NotImplemented,
2002    '__le__': NotImplemented,
2003    '__ge__': NotImplemented,
2004    '__int__': 1,
2005    '__contains__': False,
2006    '__len__': 0,
2007    '__exit__': False,
2008    '__complex__': 1j,
2009    '__float__': 1.0,
2010    '__bool__': True,
2011    '__index__': 1,
2012    '__aexit__': False,
2013}
2014
2015
2016def _get_eq(self):
2017    def __eq__(other):
2018        ret_val = self.__eq__._mock_return_value
2019        if ret_val is not DEFAULT:
2020            return ret_val
2021        if self is other:
2022            return True
2023        return NotImplemented
2024    return __eq__
2025
2026def _get_ne(self):
2027    def __ne__(other):
2028        if self.__ne__._mock_return_value is not DEFAULT:
2029            return DEFAULT
2030        if self is other:
2031            return False
2032        return NotImplemented
2033    return __ne__
2034
2035def _get_iter(self):
2036    def __iter__():
2037        ret_val = self.__iter__._mock_return_value
2038        if ret_val is DEFAULT:
2039            return iter([])
2040        # if ret_val was already an iterator, then calling iter on it should
2041        # return the iterator unchanged
2042        return iter(ret_val)
2043    return __iter__
2044
2045def _get_async_iter(self):
2046    def __aiter__():
2047        ret_val = self.__aiter__._mock_return_value
2048        if ret_val is DEFAULT:
2049            return _AsyncIterator(iter([]))
2050        return _AsyncIterator(iter(ret_val))
2051    return __aiter__
2052
2053_side_effect_methods = {
2054    '__eq__': _get_eq,
2055    '__ne__': _get_ne,
2056    '__iter__': _get_iter,
2057    '__aiter__': _get_async_iter
2058}
2059
2060
2061
2062def _set_return_value(mock, method, name):
2063    fixed = _return_values.get(name, DEFAULT)
2064    if fixed is not DEFAULT:
2065        method.return_value = fixed
2066        return
2067
2068    return_calculator = _calculate_return_value.get(name)
2069    if return_calculator is not None:
2070        return_value = return_calculator(mock)
2071        method.return_value = return_value
2072        return
2073
2074    side_effector = _side_effect_methods.get(name)
2075    if side_effector is not None:
2076        method.side_effect = side_effector(mock)
2077
2078
2079
2080class MagicMixin(Base):
2081    def __init__(self, /, *args, **kw):
2082        self._mock_set_magics()  # make magic work for kwargs in init
2083        _safe_super(MagicMixin, self).__init__(*args, **kw)
2084        self._mock_set_magics()  # fix magic broken by upper level init
2085
2086
2087    def _mock_set_magics(self):
2088        orig_magics = _magics | _async_method_magics
2089        these_magics = orig_magics
2090
2091        if getattr(self, "_mock_methods", None) is not None:
2092            these_magics = orig_magics.intersection(self._mock_methods)
2093
2094            remove_magics = set()
2095            remove_magics = orig_magics - these_magics
2096
2097            for entry in remove_magics:
2098                if entry in type(self).__dict__:
2099                    # remove unneeded magic methods
2100                    delattr(self, entry)
2101
2102        # don't overwrite existing attributes if called a second time
2103        these_magics = these_magics - set(type(self).__dict__)
2104
2105        _type = type(self)
2106        for entry in these_magics:
2107            setattr(_type, entry, MagicProxy(entry, self))
2108
2109
2110
2111class NonCallableMagicMock(MagicMixin, NonCallableMock):
2112    """A version of `MagicMock` that isn't callable."""
2113    def mock_add_spec(self, spec, spec_set=False):
2114        """Add a spec to a mock. `spec` can either be an object or a
2115        list of strings. Only attributes on the `spec` can be fetched as
2116        attributes from the mock.
2117
2118        If `spec_set` is True then only attributes on the spec can be set."""
2119        self._mock_add_spec(spec, spec_set)
2120        self._mock_set_magics()
2121
2122
2123class AsyncMagicMixin(MagicMixin):
2124    def __init__(self, /, *args, **kw):
2125        self._mock_set_magics()  # make magic work for kwargs in init
2126        _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
2127        self._mock_set_magics()  # fix magic broken by upper level init
2128
2129class MagicMock(MagicMixin, Mock):
2130    """
2131    MagicMock is a subclass of Mock with default implementations
2132    of most of the magic methods. You can use MagicMock without having to
2133    configure the magic methods yourself.
2134
2135    If you use the `spec` or `spec_set` arguments then *only* magic
2136    methods that exist in the spec will be created.
2137
2138    Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2139    """
2140    def mock_add_spec(self, spec, spec_set=False):
2141        """Add a spec to a mock. `spec` can either be an object or a
2142        list of strings. Only attributes on the `spec` can be fetched as
2143        attributes from the mock.
2144
2145        If `spec_set` is True then only attributes on the spec can be set."""
2146        self._mock_add_spec(spec, spec_set)
2147        self._mock_set_magics()
2148
2149
2150
2151class MagicProxy(Base):
2152    def __init__(self, name, parent):
2153        self.name = name
2154        self.parent = parent
2155
2156    def create_mock(self):
2157        entry = self.name
2158        parent = self.parent
2159        m = parent._get_child_mock(name=entry, _new_name=entry,
2160                                   _new_parent=parent)
2161        setattr(parent, entry, m)
2162        _set_return_value(parent, m, entry)
2163        return m
2164
2165    def __get__(self, obj, _type=None):
2166        return self.create_mock()
2167
2168
2169class AsyncMockMixin(Base):
2170    await_count = _delegating_property('await_count')
2171    await_args = _delegating_property('await_args')
2172    await_args_list = _delegating_property('await_args_list')
2173
2174    def __init__(self, /, *args, **kwargs):
2175        super().__init__(*args, **kwargs)
2176        # iscoroutinefunction() checks _is_coroutine property to say if an
2177        # object is a coroutine. Without this check it looks to see if it is a
2178        # function/method, which in this case it is not (since it is an
2179        # AsyncMock).
2180        # It is set through __dict__ because when spec_set is True, this
2181        # attribute is likely undefined.
2182        self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2183        self.__dict__['_mock_await_count'] = 0
2184        self.__dict__['_mock_await_args'] = None
2185        self.__dict__['_mock_await_args_list'] = _CallList()
2186        code_mock = NonCallableMock(spec_set=CodeType)
2187        code_mock.co_flags = inspect.CO_COROUTINE
2188        self.__dict__['__code__'] = code_mock
2189
2190    async def _execute_mock_call(self, /, *args, **kwargs):
2191        # This is nearly just like super(), except for special handling
2192        # of coroutines
2193
2194        _call = _Call((args, kwargs), two=True)
2195        self.await_count += 1
2196        self.await_args = _call
2197        self.await_args_list.append(_call)
2198
2199        effect = self.side_effect
2200        if effect is not None:
2201            if _is_exception(effect):
2202                raise effect
2203            elif not _callable(effect):
2204                try:
2205                    result = next(effect)
2206                except StopIteration:
2207                    # It is impossible to propogate a StopIteration
2208                    # through coroutines because of PEP 479
2209                    raise StopAsyncIteration
2210                if _is_exception(result):
2211                    raise result
2212            elif iscoroutinefunction(effect):
2213                result = await effect(*args, **kwargs)
2214            else:
2215                result = effect(*args, **kwargs)
2216
2217            if result is not DEFAULT:
2218                return result
2219
2220        if self._mock_return_value is not DEFAULT:
2221            return self.return_value
2222
2223        if self._mock_wraps is not None:
2224            if iscoroutinefunction(self._mock_wraps):
2225                return await self._mock_wraps(*args, **kwargs)
2226            return self._mock_wraps(*args, **kwargs)
2227
2228        return self.return_value
2229
2230    def assert_awaited(self):
2231        """
2232        Assert that the mock was awaited at least once.
2233        """
2234        if self.await_count == 0:
2235            msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2236            raise AssertionError(msg)
2237
2238    def assert_awaited_once(self):
2239        """
2240        Assert that the mock was awaited exactly once.
2241        """
2242        if not self.await_count == 1:
2243            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2244                   f" Awaited {self.await_count} times.")
2245            raise AssertionError(msg)
2246
2247    def assert_awaited_with(self, /, *args, **kwargs):
2248        """
2249        Assert that the last await was with the specified arguments.
2250        """
2251        if self.await_args is None:
2252            expected = self._format_mock_call_signature(args, kwargs)
2253            raise AssertionError(f'Expected await: {expected}\nNot awaited')
2254
2255        def _error_message():
2256            msg = self._format_mock_failure_message(args, kwargs, action='await')
2257            return msg
2258
2259        expected = self._call_matcher(_Call((args, kwargs), two=True))
2260        actual = self._call_matcher(self.await_args)
2261        if actual != expected:
2262            cause = expected if isinstance(expected, Exception) else None
2263            raise AssertionError(_error_message()) from cause
2264
2265    def assert_awaited_once_with(self, /, *args, **kwargs):
2266        """
2267        Assert that the mock was awaited exactly once and with the specified
2268        arguments.
2269        """
2270        if not self.await_count == 1:
2271            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2272                   f" Awaited {self.await_count} times.")
2273            raise AssertionError(msg)
2274        return self.assert_awaited_with(*args, **kwargs)
2275
2276    def assert_any_await(self, /, *args, **kwargs):
2277        """
2278        Assert the mock has ever been awaited with the specified arguments.
2279        """
2280        expected = self._call_matcher(_Call((args, kwargs), two=True))
2281        cause = expected if isinstance(expected, Exception) else None
2282        actual = [self._call_matcher(c) for c in self.await_args_list]
2283        if cause or expected not in _AnyComparer(actual):
2284            expected_string = self._format_mock_call_signature(args, kwargs)
2285            raise AssertionError(
2286                '%s await not found' % expected_string
2287            ) from cause
2288
2289    def assert_has_awaits(self, calls, any_order=False):
2290        """
2291        Assert the mock has been awaited with the specified calls.
2292        The :attr:`await_args_list` list is checked for the awaits.
2293
2294        If `any_order` is False (the default) then the awaits must be
2295        sequential. There can be extra calls before or after the
2296        specified awaits.
2297
2298        If `any_order` is True then the awaits can be in any order, but
2299        they must all appear in :attr:`await_args_list`.
2300        """
2301        expected = [self._call_matcher(c) for c in calls]
2302        cause = next((e for e in expected if isinstance(e, Exception)), None)
2303        all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2304        if not any_order:
2305            if expected not in all_awaits:
2306                if cause is None:
2307                    problem = 'Awaits not found.'
2308                else:
2309                    problem = ('Error processing expected awaits.\n'
2310                               'Errors: {}').format(
2311                                   [e if isinstance(e, Exception) else None
2312                                    for e in expected])
2313                raise AssertionError(
2314                    f'{problem}\n'
2315                    f'Expected: {_CallList(calls)}\n'
2316                    f'Actual: {self.await_args_list}'
2317                ) from cause
2318            return
2319
2320        all_awaits = list(all_awaits)
2321
2322        not_found = []
2323        for kall in expected:
2324            try:
2325                all_awaits.remove(kall)
2326            except ValueError:
2327                not_found.append(kall)
2328        if not_found:
2329            raise AssertionError(
2330                '%r not all found in await list' % (tuple(not_found),)
2331            ) from cause
2332
2333    def assert_not_awaited(self):
2334        """
2335        Assert that the mock was never awaited.
2336        """
2337        if self.await_count != 0:
2338            msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2339                   f" Awaited {self.await_count} times.")
2340            raise AssertionError(msg)
2341
2342    def reset_mock(self, /, *args, **kwargs):
2343        """
2344        See :func:`.Mock.reset_mock()`
2345        """
2346        super().reset_mock(*args, **kwargs)
2347        self.await_count = 0
2348        self.await_args = None
2349        self.await_args_list = _CallList()
2350
2351
2352class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2353    """
2354    Enhance :class:`Mock` with features allowing to mock
2355    an async function.
2356
2357    The :class:`AsyncMock` object will behave so the object is
2358    recognized as an async function, and the result of a call is an awaitable:
2359
2360    >>> mock = AsyncMock()
2361    >>> iscoroutinefunction(mock)
2362    True
2363    >>> inspect.isawaitable(mock())
2364    True
2365
2366
2367    The result of ``mock()`` is an async function which will have the outcome
2368    of ``side_effect`` or ``return_value``:
2369
2370    - if ``side_effect`` is a function, the async function will return the
2371      result of that function,
2372    - if ``side_effect`` is an exception, the async function will raise the
2373      exception,
2374    - if ``side_effect`` is an iterable, the async function will return the
2375      next value of the iterable, however, if the sequence of result is
2376      exhausted, ``StopIteration`` is raised immediately,
2377    - if ``side_effect`` is not defined, the async function will return the
2378      value defined by ``return_value``, hence, by default, the async function
2379      returns a new :class:`AsyncMock` object.
2380
2381    If the outcome of ``side_effect`` or ``return_value`` is an async function,
2382    the mock async function obtained when the mock object is called will be this
2383    async function itself (and not an async function returning an async
2384    function).
2385
2386    The test author can also specify a wrapped object with ``wraps``. In this
2387    case, the :class:`Mock` object behavior is the same as with an
2388    :class:`.Mock` object: the wrapped object may have methods
2389    defined as async function functions.
2390
2391    Based on Martin Richard's asynctest project.
2392    """
2393
2394
2395class _ANY(object):
2396    "A helper object that compares equal to everything."
2397
2398    def __eq__(self, other):
2399        return True
2400
2401    def __ne__(self, other):
2402        return False
2403
2404    def __repr__(self):
2405        return '<ANY>'
2406
2407ANY = _ANY()
2408
2409
2410
2411def _format_call_signature(name, args, kwargs):
2412    message = '%s(%%s)' % name
2413    formatted_args = ''
2414    args_string = ', '.join([repr(arg) for arg in args])
2415    kwargs_string = ', '.join([
2416        '%s=%r' % (key, value) for key, value in kwargs.items()
2417    ])
2418    if args_string:
2419        formatted_args = args_string
2420    if kwargs_string:
2421        if formatted_args:
2422            formatted_args += ', '
2423        formatted_args += kwargs_string
2424
2425    return message % formatted_args
2426
2427
2428
2429class _Call(tuple):
2430    """
2431    A tuple for holding the results of a call to a mock, either in the form
2432    `(args, kwargs)` or `(name, args, kwargs)`.
2433
2434    If args or kwargs are empty then a call tuple will compare equal to
2435    a tuple without those values. This makes comparisons less verbose::
2436
2437        _Call(('name', (), {})) == ('name',)
2438        _Call(('name', (1,), {})) == ('name', (1,))
2439        _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2440
2441    The `_Call` object provides a useful shortcut for comparing with call::
2442
2443        _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2444        _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2445
2446    If the _Call has no name then it will match any name.
2447    """
2448    def __new__(cls, value=(), name='', parent=None, two=False,
2449                from_kall=True):
2450        args = ()
2451        kwargs = {}
2452        _len = len(value)
2453        if _len == 3:
2454            name, args, kwargs = value
2455        elif _len == 2:
2456            first, second = value
2457            if isinstance(first, str):
2458                name = first
2459                if isinstance(second, tuple):
2460                    args = second
2461                else:
2462                    kwargs = second
2463            else:
2464                args, kwargs = first, second
2465        elif _len == 1:
2466            value, = value
2467            if isinstance(value, str):
2468                name = value
2469            elif isinstance(value, tuple):
2470                args = value
2471            else:
2472                kwargs = value
2473
2474        if two:
2475            return tuple.__new__(cls, (args, kwargs))
2476
2477        return tuple.__new__(cls, (name, args, kwargs))
2478
2479
2480    def __init__(self, value=(), name=None, parent=None, two=False,
2481                 from_kall=True):
2482        self._mock_name = name
2483        self._mock_parent = parent
2484        self._mock_from_kall = from_kall
2485
2486
2487    def __eq__(self, other):
2488        try:
2489            len_other = len(other)
2490        except TypeError:
2491            return NotImplemented
2492
2493        self_name = ''
2494        if len(self) == 2:
2495            self_args, self_kwargs = self
2496        else:
2497            self_name, self_args, self_kwargs = self
2498
2499        if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2500                and self._mock_parent != other._mock_parent):
2501            return False
2502
2503        other_name = ''
2504        if len_other == 0:
2505            other_args, other_kwargs = (), {}
2506        elif len_other == 3:
2507            other_name, other_args, other_kwargs = other
2508        elif len_other == 1:
2509            value, = other
2510            if isinstance(value, tuple):
2511                other_args = value
2512                other_kwargs = {}
2513            elif isinstance(value, str):
2514                other_name = value
2515                other_args, other_kwargs = (), {}
2516            else:
2517                other_args = ()
2518                other_kwargs = value
2519        elif len_other == 2:
2520            # could be (name, args) or (name, kwargs) or (args, kwargs)
2521            first, second = other
2522            if isinstance(first, str):
2523                other_name = first
2524                if isinstance(second, tuple):
2525                    other_args, other_kwargs = second, {}
2526                else:
2527                    other_args, other_kwargs = (), second
2528            else:
2529                other_args, other_kwargs = first, second
2530        else:
2531            return False
2532
2533        if self_name and other_name != self_name:
2534            return False
2535
2536        # this order is important for ANY to work!
2537        return (other_args, other_kwargs) == (self_args, self_kwargs)
2538
2539
2540    __ne__ = object.__ne__
2541
2542
2543    def __call__(self, /, *args, **kwargs):
2544        if self._mock_name is None:
2545            return _Call(('', args, kwargs), name='()')
2546
2547        name = self._mock_name + '()'
2548        return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2549
2550
2551    def __getattr__(self, attr):
2552        if self._mock_name is None:
2553            return _Call(name=attr, from_kall=False)
2554        name = '%s.%s' % (self._mock_name, attr)
2555        return _Call(name=name, parent=self, from_kall=False)
2556
2557
2558    def __getattribute__(self, attr):
2559        if attr in tuple.__dict__:
2560            raise AttributeError
2561        return tuple.__getattribute__(self, attr)
2562
2563
2564    def _get_call_arguments(self):
2565        if len(self) == 2:
2566            args, kwargs = self
2567        else:
2568            name, args, kwargs = self
2569
2570        return args, kwargs
2571
2572    @property
2573    def args(self):
2574        return self._get_call_arguments()[0]
2575
2576    @property
2577    def kwargs(self):
2578        return self._get_call_arguments()[1]
2579
2580    def __repr__(self):
2581        if not self._mock_from_kall:
2582            name = self._mock_name or 'call'
2583            if name.startswith('()'):
2584                name = 'call%s' % name
2585            return name
2586
2587        if len(self) == 2:
2588            name = 'call'
2589            args, kwargs = self
2590        else:
2591            name, args, kwargs = self
2592            if not name:
2593                name = 'call'
2594            elif not name.startswith('()'):
2595                name = 'call.%s' % name
2596            else:
2597                name = 'call%s' % name
2598        return _format_call_signature(name, args, kwargs)
2599
2600
2601    def call_list(self):
2602        """For a call object that represents multiple calls, `call_list`
2603        returns a list of all the intermediate calls as well as the
2604        final call."""
2605        vals = []
2606        thing = self
2607        while thing is not None:
2608            if thing._mock_from_kall:
2609                vals.append(thing)
2610            thing = thing._mock_parent
2611        return _CallList(reversed(vals))
2612
2613
2614call = _Call(from_kall=False)
2615
2616
2617def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2618                    _name=None, *, unsafe=False, **kwargs):
2619    """Create a mock object using another object as a spec. Attributes on the
2620    mock will use the corresponding attribute on the `spec` object as their
2621    spec.
2622
2623    Functions or methods being mocked will have their arguments checked
2624    to check that they are called with the correct signature.
2625
2626    If `spec_set` is True then attempting to set attributes that don't exist
2627    on the spec object will raise an `AttributeError`.
2628
2629    If a class is used as a spec then the return value of the mock (the
2630    instance of the class) will have the same spec. You can use a class as the
2631    spec for an instance object by passing `instance=True`. The returned mock
2632    will only be callable if instances of the mock are callable.
2633
2634    `create_autospec` will raise a `RuntimeError` if passed some common
2635    misspellings of the arguments autospec and spec_set. Pass the argument
2636    `unsafe` with the value True to disable that check.
2637
2638    `create_autospec` also takes arbitrary keyword arguments that are passed to
2639    the constructor of the created mock."""
2640    if _is_list(spec):
2641        # can't pass a list instance to the mock constructor as it will be
2642        # interpreted as a list of strings
2643        spec = type(spec)
2644
2645    is_type = isinstance(spec, type)
2646    if _is_instance_mock(spec):
2647        raise InvalidSpecError(f'Cannot autospec a Mock object. '
2648                               f'[object={spec!r}]')
2649    is_async_func = _is_async_func(spec)
2650    _kwargs = {'spec': spec}
2651    if spec_set:
2652        _kwargs = {'spec_set': spec}
2653    elif spec is None:
2654        # None we mock with a normal mock without a spec
2655        _kwargs = {}
2656    if _kwargs and instance:
2657        _kwargs['_spec_as_instance'] = True
2658    if not unsafe:
2659        _check_spec_arg_typos(kwargs)
2660
2661    _kwargs.update(kwargs)
2662
2663    Klass = MagicMock
2664    if inspect.isdatadescriptor(spec):
2665        # descriptors don't have a spec
2666        # because we don't know what type they return
2667        _kwargs = {}
2668    elif is_async_func:
2669        if instance:
2670            raise RuntimeError("Instance can not be True when create_autospec "
2671                               "is mocking an async function")
2672        Klass = AsyncMock
2673    elif not _callable(spec):
2674        Klass = NonCallableMagicMock
2675    elif is_type and instance and not _instance_callable(spec):
2676        Klass = NonCallableMagicMock
2677
2678    _name = _kwargs.pop('name', _name)
2679
2680    _new_name = _name
2681    if _parent is None:
2682        # for a top level object no _new_name should be set
2683        _new_name = ''
2684
2685    mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2686                 name=_name, **_kwargs)
2687
2688    if isinstance(spec, FunctionTypes):
2689        # should only happen at the top level because we don't
2690        # recurse for functions
2691        mock = _set_signature(mock, spec)
2692        if is_async_func:
2693            _setup_async_mock(mock)
2694    else:
2695        _check_signature(spec, mock, is_type, instance)
2696
2697    if _parent is not None and not instance:
2698        _parent._mock_children[_name] = mock
2699
2700    if is_type and not instance and 'return_value' not in kwargs:
2701        mock.return_value = create_autospec(spec, spec_set, instance=True,
2702                                            _name='()', _parent=mock)
2703
2704    for entry in dir(spec):
2705        if _is_magic(entry):
2706            # MagicMock already does the useful magic methods for us
2707            continue
2708
2709        # XXXX do we need a better way of getting attributes without
2710        # triggering code execution (?) Probably not - we need the actual
2711        # object to mock it so we would rather trigger a property than mock
2712        # the property descriptor. Likewise we want to mock out dynamically
2713        # provided attributes.
2714        # XXXX what about attributes that raise exceptions other than
2715        # AttributeError on being fetched?
2716        # we could be resilient against it, or catch and propagate the
2717        # exception when the attribute is fetched from the mock
2718        try:
2719            original = getattr(spec, entry)
2720        except AttributeError:
2721            continue
2722
2723        kwargs = {'spec': original}
2724        if spec_set:
2725            kwargs = {'spec_set': original}
2726
2727        if not isinstance(original, FunctionTypes):
2728            new = _SpecState(original, spec_set, mock, entry, instance)
2729            mock._mock_children[entry] = new
2730        else:
2731            parent = mock
2732            if isinstance(spec, FunctionTypes):
2733                parent = mock.mock
2734
2735            skipfirst = _must_skip(spec, entry, is_type)
2736            kwargs['_eat_self'] = skipfirst
2737            if iscoroutinefunction(original):
2738                child_klass = AsyncMock
2739            else:
2740                child_klass = MagicMock
2741            new = child_klass(parent=parent, name=entry, _new_name=entry,
2742                              _new_parent=parent,
2743                              **kwargs)
2744            mock._mock_children[entry] = new
2745            _check_signature(original, new, skipfirst=skipfirst)
2746
2747        # so functions created with _set_signature become instance attributes,
2748        # *plus* their underlying mock exists in _mock_children of the parent
2749        # mock. Adding to _mock_children may be unnecessary where we are also
2750        # setting as an instance attribute?
2751        if isinstance(new, FunctionTypes):
2752            setattr(mock, entry, new)
2753
2754    return mock
2755
2756
2757def _must_skip(spec, entry, is_type):
2758    """
2759    Return whether we should skip the first argument on spec's `entry`
2760    attribute.
2761    """
2762    if not isinstance(spec, type):
2763        if entry in getattr(spec, '__dict__', {}):
2764            # instance attribute - shouldn't skip
2765            return False
2766        spec = spec.__class__
2767
2768    for klass in spec.__mro__:
2769        result = klass.__dict__.get(entry, DEFAULT)
2770        if result is DEFAULT:
2771            continue
2772        if isinstance(result, (staticmethod, classmethod)):
2773            return False
2774        elif isinstance(result, FunctionTypes):
2775            # Normal method => skip if looked up on type
2776            # (if looked up on instance, self is already skipped)
2777            return is_type
2778        else:
2779            return False
2780
2781    # function is a dynamically provided attribute
2782    return is_type
2783
2784
2785class _SpecState(object):
2786
2787    def __init__(self, spec, spec_set=False, parent=None,
2788                 name=None, ids=None, instance=False):
2789        self.spec = spec
2790        self.ids = ids
2791        self.spec_set = spec_set
2792        self.parent = parent
2793        self.instance = instance
2794        self.name = name
2795
2796
2797FunctionTypes = (
2798    # python function
2799    type(create_autospec),
2800    # instance method
2801    type(ANY.__eq__),
2802)
2803
2804
2805file_spec = None
2806
2807
2808def _to_stream(read_data):
2809    if isinstance(read_data, bytes):
2810        return io.BytesIO(read_data)
2811    else:
2812        return io.StringIO(read_data)
2813
2814
2815def mock_open(mock=None, read_data=''):
2816    """
2817    A helper function to create a mock to replace the use of `open`. It works
2818    for `open` called directly or used as a context manager.
2819
2820    The `mock` argument is the mock object to configure. If `None` (the
2821    default) then a `MagicMock` will be created for you, with the API limited
2822    to methods or attributes available on standard file handles.
2823
2824    `read_data` is a string for the `read`, `readline` and `readlines` of the
2825    file handle to return.  This is an empty string by default.
2826    """
2827    _read_data = _to_stream(read_data)
2828    _state = [_read_data, None]
2829
2830    def _readlines_side_effect(*args, **kwargs):
2831        if handle.readlines.return_value is not None:
2832            return handle.readlines.return_value
2833        return _state[0].readlines(*args, **kwargs)
2834
2835    def _read_side_effect(*args, **kwargs):
2836        if handle.read.return_value is not None:
2837            return handle.read.return_value
2838        return _state[0].read(*args, **kwargs)
2839
2840    def _readline_side_effect(*args, **kwargs):
2841        yield from _iter_side_effect()
2842        while True:
2843            yield _state[0].readline(*args, **kwargs)
2844
2845    def _iter_side_effect():
2846        if handle.readline.return_value is not None:
2847            while True:
2848                yield handle.readline.return_value
2849        for line in _state[0]:
2850            yield line
2851
2852    def _next_side_effect():
2853        if handle.readline.return_value is not None:
2854            return handle.readline.return_value
2855        return next(_state[0])
2856
2857    global file_spec
2858    if file_spec is None:
2859        import _io
2860        file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2861
2862    if mock is None:
2863        mock = MagicMock(name='open', spec=open)
2864
2865    handle = MagicMock(spec=file_spec)
2866    handle.__enter__.return_value = handle
2867
2868    handle.write.return_value = None
2869    handle.read.return_value = None
2870    handle.readline.return_value = None
2871    handle.readlines.return_value = None
2872
2873    handle.read.side_effect = _read_side_effect
2874    _state[1] = _readline_side_effect()
2875    handle.readline.side_effect = _state[1]
2876    handle.readlines.side_effect = _readlines_side_effect
2877    handle.__iter__.side_effect = _iter_side_effect
2878    handle.__next__.side_effect = _next_side_effect
2879
2880    def reset_data(*args, **kwargs):
2881        _state[0] = _to_stream(read_data)
2882        if handle.readline.side_effect == _state[1]:
2883            # Only reset the side effect if the user hasn't overridden it.
2884            _state[1] = _readline_side_effect()
2885            handle.readline.side_effect = _state[1]
2886        return DEFAULT
2887
2888    mock.side_effect = reset_data
2889    mock.return_value = handle
2890    return mock
2891
2892
2893class PropertyMock(Mock):
2894    """
2895    A mock intended to be used as a property, or other descriptor, on a class.
2896    `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2897    a return value when it is fetched.
2898
2899    Fetching a `PropertyMock` instance from an object calls the mock, with
2900    no args. Setting it calls the mock with the value being set.
2901    """
2902    def _get_child_mock(self, /, **kwargs):
2903        return MagicMock(**kwargs)
2904
2905    def __get__(self, obj, obj_type=None):
2906        return self()
2907    def __set__(self, obj, val):
2908        self(val)
2909
2910
2911def seal(mock):
2912    """Disable the automatic generation of child mocks.
2913
2914    Given an input Mock, seals it to ensure no further mocks will be generated
2915    when accessing an attribute that was not already defined.
2916
2917    The operation recursively seals the mock passed in, meaning that
2918    the mock itself, any mocks generated by accessing one of its attributes,
2919    and all assigned mocks without a name or spec will be sealed.
2920    """
2921    mock._mock_sealed = True
2922    for attr in dir(mock):
2923        try:
2924            m = getattr(mock, attr)
2925        except AttributeError:
2926            continue
2927        if not isinstance(m, NonCallableMock):
2928            continue
2929        if isinstance(m._mock_children.get(attr), _SpecState):
2930            continue
2931        if m._mock_new_parent is mock:
2932            seal(m)
2933
2934
2935class _AsyncIterator:
2936    """
2937    Wraps an iterator in an asynchronous iterator.
2938    """
2939    def __init__(self, iterator):
2940        self.iterator = iterator
2941        code_mock = NonCallableMock(spec_set=CodeType)
2942        code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
2943        self.__dict__['__code__'] = code_mock
2944
2945    async def __anext__(self):
2946        try:
2947            return next(self.iterator)
2948        except StopIteration:
2949            pass
2950        raise StopAsyncIteration
2951