1# Copyright (c) 2018 gevent community 2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy 4# of this software and associated documentation files (the "Software"), to deal 5# in the Software without restriction, including without limitation the rights 6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 7# copies of the Software, and to permit persons to whom the Software is 8# furnished to do so, subject to the following conditions: 9# 10# The above copyright notice and this permission notice shall be included in 11# all copies or substantial portions of the Software. 12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 19# THE SOFTWARE. 20from __future__ import absolute_import, print_function, division 21 22from functools import wraps 23 24from gevent.hub import _get_hub 25 26from .hub import QuietHub 27 28from .patched_tests_setup import get_switch_expected 29 30def wrap_switch_count_check(method): 31 @wraps(method) 32 def wrapper(self, *args, **kwargs): 33 initial_switch_count = getattr(_get_hub(), 'switch_count', None) 34 self.switch_expected = getattr(self, 'switch_expected', True) 35 if initial_switch_count is not None: 36 fullname = getattr(self, 'fullname', None) 37 if self.switch_expected == 'default' and fullname: 38 self.switch_expected = get_switch_expected(fullname) 39 result = method(self, *args, **kwargs) 40 if initial_switch_count is not None and self.switch_expected is not None: 41 switch_count = _get_hub().switch_count - initial_switch_count 42 if self.switch_expected is True: 43 assert switch_count >= 0 44 if not switch_count: 45 raise AssertionError('%s did not switch' % fullname) 46 elif self.switch_expected is False: 47 if switch_count: 48 raise AssertionError('%s switched but not expected to' % fullname) 49 else: 50 raise AssertionError('Invalid value for switch_expected: %r' % (self.switch_expected, )) 51 return result 52 return wrapper 53 54 55 56 57class CountingHub(QuietHub): 58 59 switch_count = 0 60 61 def switch(self, *args): 62 # pylint:disable=arguments-differ 63 self.switch_count += 1 64 return QuietHub.switch(self, *args) 65