1# Copyright (c) 2010-2016 testtools developers. See LICENSE for details.
2
3"""Individual test case execution for tests that return Deferreds.
4
5Example::
6
7    class TwistedTests(testtools.TestCase):
8
9        run_tests_with = AsynchronousDeferredRunTest
10
11        def test_something(self):
12            # Wait for 5 seconds and then fire with 'Foo'.
13            d = Deferred()
14            reactor.callLater(5, lambda: d.callback('Foo'))
15            d.addCallback(self.assertEqual, 'Foo')
16            return d
17
18When ``test_something`` is run, ``AsynchronousDeferredRunTest`` will run the
19reactor until ``d`` fires, and wait for all of its callbacks to be processed.
20"""
21
22__all__ = [
23    'AsynchronousDeferredRunTest',
24    'AsynchronousDeferredRunTestForBrokenTwisted',
25    'SynchronousDeferredRunTest',
26    'assert_fails_with',
27    ]
28
29import io
30import warnings
31import sys
32
33from fixtures import Fixture
34
35from testtools.content import Content, text_content
36from testtools.content_type import UTF8_TEXT
37from testtools.runtest import RunTest, _raise_force_fail_error
38from ._deferred import extract_result
39from ._spinner import (
40    NoResultError,
41    Spinner,
42    TimeoutError,
43    trap_unhandled_errors,
44    )
45
46from twisted.internet import defer
47try:
48    from twisted.logger import globalLogPublisher
49except ImportError:
50    globalLogPublisher = None
51from twisted.python import log
52try:
53    from twisted.trial.unittest import _LogObserver
54except ImportError:
55    from twisted.trial._synctest import _LogObserver
56
57
58class _DeferredRunTest(RunTest):
59    """Base for tests that return Deferreds."""
60
61    def _got_user_failure(self, failure, tb_label='traceback'):
62        """We got a failure from user code."""
63        return self._got_user_exception(
64            (failure.type, failure.value, failure.getTracebackObject()),
65            tb_label=tb_label)
66
67
68class SynchronousDeferredRunTest(_DeferredRunTest):
69    """Runner for tests that return synchronous Deferreds.
70
71    This runner doesn't touch the reactor at all. It assumes that tests return
72    Deferreds that have already fired.
73    """
74
75    def _run_user(self, function, *args):
76        d = defer.maybeDeferred(function, *args)
77        d.addErrback(self._got_user_failure)
78        result = extract_result(d)
79        return result
80
81
82def _get_global_publisher_and_observers():
83    """Return ``(log_publisher, observers)``.
84
85    Twisted 15.2.0 changed the logging framework. This method will always
86    return a tuple of the global log publisher and all observers associated
87    with that publisher.
88    """
89    if globalLogPublisher is not None:
90        # Twisted >= 15.2.0, with the new twisted.logger framework.
91        # log.theLogPublisher.observers will only contain legacy observers;
92        # we need to look at globalLogPublisher._observers, which contains
93        # both legacy and modern observers, and add and remove them via
94        # globalLogPublisher.  However, we must still add and remove the
95        # observers we want to run with via log.theLogPublisher, because
96        # _LogObserver may consider old keys and require them to be mapped.
97        publisher = globalLogPublisher
98        return (publisher, list(publisher._observers))
99    else:
100        publisher = log.theLogPublisher
101        return (publisher, list(publisher.observers))
102
103
104class _NoTwistedLogObservers(Fixture):
105    """Completely but temporarily remove all Twisted log observers."""
106
107    def _setUp(self):
108        publisher, real_observers = _get_global_publisher_and_observers()
109        for observer in reversed(real_observers):
110            publisher.removeObserver(observer)
111            self.addCleanup(publisher.addObserver, observer)
112
113
114class _TwistedLogObservers(Fixture):
115    """Temporarily add Twisted log observers."""
116
117    def __init__(self, observers):
118        super().__init__()
119        self._observers = observers
120        self._log_publisher = log.theLogPublisher
121
122    def _setUp(self):
123        for observer in self._observers:
124            self._log_publisher.addObserver(observer)
125            self.addCleanup(self._log_publisher.removeObserver, observer)
126
127
128class _ErrorObserver(Fixture):
129    """Capture errors logged while fixture is active."""
130
131    def __init__(self, error_observer):
132        super().__init__()
133        self._error_observer = error_observer
134
135    def _setUp(self):
136        self.useFixture(_TwistedLogObservers([self._error_observer.gotEvent]))
137
138    def flush_logged_errors(self, *error_types):
139        """Clear errors of the given types from the logs.
140
141        If no errors provided, clear all errors.
142
143        :return: An iterable of errors removed from the logs.
144        """
145        return self._error_observer.flushErrors(*error_types)
146
147
148class CaptureTwistedLogs(Fixture):
149    """Capture all the Twisted logs and add them as a detail.
150
151    Much of the time, you won't need to use this directly, as
152    :py:class:`AsynchronousDeferredRunTest` captures Twisted logs when the
153    ``store_twisted_logs`` is set to ``True`` (which it is by default).
154
155    However, if you want to do custom processing of Twisted's logs, then this
156    class can be useful.
157
158    For example::
159
160        class TwistedTests(TestCase):
161            run_tests_with(
162                partial(AsynchronousDeferredRunTest, store_twisted_logs=False))
163
164            def setUp(self):
165                super(TwistedTests, self).setUp()
166                twisted_logs = self.useFixture(CaptureTwistedLogs())
167                # ... do something with twisted_logs ...
168    """
169
170    LOG_DETAIL_NAME = 'twisted-log'
171
172    def _setUp(self):
173        logs = io.StringIO()
174        full_observer = log.FileLogObserver(logs)
175        self.useFixture(_TwistedLogObservers([full_observer.emit]))
176        self.addDetail(self.LOG_DETAIL_NAME, Content(
177            UTF8_TEXT, lambda: [logs.getvalue().encode("utf-8")]))
178
179
180def run_with_log_observers(observers, function, *args, **kwargs):
181    """Run 'function' with the given Twisted log observers."""
182    warnings.warn(
183        'run_with_log_observers is deprecated since 1.8.2.',
184        DeprecationWarning, stacklevel=2)
185    with _NoTwistedLogObservers():
186        with _TwistedLogObservers(observers):
187            return function(*args, **kwargs)
188
189
190# Observer of the Twisted log that we install during tests.
191#
192# This is a global so that users can call flush_logged_errors errors in their
193# test cases.
194_log_observer = _LogObserver()
195
196
197# XXX: Should really be in python-fixtures.
198# See https://github.com/testing-cabal/fixtures/pull/22.
199class _CompoundFixture(Fixture):
200    """A fixture that combines many fixtures."""
201
202    def __init__(self, fixtures):
203        super().__init__()
204        self._fixtures = fixtures
205
206    def _setUp(self):
207        for fixture in self._fixtures:
208            self.useFixture(fixture)
209
210
211def flush_logged_errors(*error_types):
212    """Flush errors of the given types from the global Twisted log.
213
214    Any errors logged during a test will be bubbled up to the test result,
215    marking the test as erroring. Use this function to declare that logged
216    errors were expected behavior.
217
218    For example::
219
220        try:
221            1/0
222        except ZeroDivisionError:
223            log.err()
224        # Prevent logged ZeroDivisionError from failing the test.
225        flush_logged_errors(ZeroDivisionError)
226
227    :param error_types: A variable argument list of exception types.
228    """
229    # XXX: jml: I would like to deprecate this in favour of
230    # _ErrorObserver.flush_logged_errors so that I can avoid mutable global
231    # state. However, I don't know how to make the correct instance of
232    # _ErrorObserver.flush_logged_errors available to the end user. I also
233    # don't yet have a clear deprecation/migration path.
234    return _log_observer.flushErrors(*error_types)
235
236
237class AsynchronousDeferredRunTest(_DeferredRunTest):
238    """Runner for tests that return Deferreds that fire asynchronously.
239
240    Use this runner when you have tests that return Deferreds that will
241    only fire if the reactor is left to spin for a while.
242    """
243
244    def __init__(self, case, handlers=None, last_resort=None, reactor=None,
245                 timeout=0.005, debug=False, suppress_twisted_logging=True,
246                 store_twisted_logs=True):
247        """Construct an ``AsynchronousDeferredRunTest``.
248
249        Please be sure to always use keyword syntax, not positional, as the
250        base class may add arguments in future - and for core code
251        compatibility with that we have to insert them before the local
252        parameters.
253
254        :param TestCase case: The `TestCase` to run.
255        :param handlers: A list of exception handlers (ExceptionType, handler)
256            where 'handler' is a callable that takes a `TestCase`, a
257            ``testtools.TestResult`` and the exception raised.
258        :param last_resort: Handler to call before re-raising uncatchable
259            exceptions (those for which there is no handler).
260        :param reactor: The Twisted reactor to use.  If not given, we use the
261            default reactor.
262        :param float timeout: The maximum time allowed for running a test.  The
263            default is 0.005s.
264        :param debug: Whether or not to enable Twisted's debugging.  Use this
265            to get information about unhandled Deferreds and left-over
266            DelayedCalls.  Defaults to False.
267        :param bool suppress_twisted_logging: If True, then suppress Twisted's
268            default logging while the test is being run. Defaults to True.
269        :param bool store_twisted_logs: If True, then store the Twisted logs
270            that took place during the run as the 'twisted-log' detail.
271            Defaults to True.
272        """
273        super().__init__(
274            case, handlers, last_resort)
275        if reactor is None:
276            from twisted.internet import reactor
277        self._reactor = reactor
278        self._timeout = timeout
279        self._debug = debug
280        self._suppress_twisted_logging = suppress_twisted_logging
281        self._store_twisted_logs = store_twisted_logs
282
283    @classmethod
284    def make_factory(cls, reactor=None, timeout=0.005, debug=False,
285                     suppress_twisted_logging=True, store_twisted_logs=True):
286        """Make a factory that conforms to the RunTest factory interface.
287
288        Example::
289
290            class SomeTests(TestCase):
291                # Timeout tests after two minutes.
292                run_tests_with = AsynchronousDeferredRunTest.make_factory(
293                    timeout=120)
294        """
295        # This is horrible, but it means that the return value of the method
296        # will be able to be assigned to a class variable *and* also be
297        # invoked directly.
298        class AsynchronousDeferredRunTestFactory:
299            def __call__(self, case, handlers=None, last_resort=None):
300                return cls(
301                    case, handlers, last_resort, reactor, timeout, debug,
302                    suppress_twisted_logging, store_twisted_logs,
303                )
304        return AsynchronousDeferredRunTestFactory()
305
306    @defer.inlineCallbacks
307    def _run_cleanups(self):
308        """Run the cleanups on the test case.
309
310        We expect that the cleanups on the test case can also return
311        asynchronous Deferreds.  As such, we take the responsibility for
312        running the cleanups, rather than letting TestCase do it.
313        """
314        last_exception = None
315        while self.case._cleanups:
316            f, args, kwargs = self.case._cleanups.pop()
317            d = defer.maybeDeferred(f, *args, **kwargs)
318            try:
319                yield d
320            except Exception:
321                exc_info = sys.exc_info()
322                self.case._report_traceback(exc_info)
323                last_exception = exc_info[1]
324        defer.returnValue(last_exception)
325
326    def _make_spinner(self):
327        """Make the `Spinner` to be used to run the tests."""
328        return Spinner(self._reactor, debug=self._debug)
329
330    def _run_deferred(self):
331        """Run the test, assuming everything in it is Deferred-returning.
332
333        This should return a Deferred that fires with True if the test was
334        successful and False if the test was not successful.  It should *not*
335        call addSuccess on the result, because there's reactor clean up that
336        we needs to be done afterwards.
337        """
338        fails = []
339
340        def fail_if_exception_caught(exception_caught):
341            if self.exception_caught == exception_caught:
342                fails.append(None)
343
344        def clean_up(ignored=None):
345            """Run the cleanups."""
346            d = self._run_cleanups()
347
348            def clean_up_done(result):
349                if result is not None:
350                    self._exceptions.append(result)
351                    fails.append(None)
352            return d.addCallback(clean_up_done)
353
354        def set_up_done(exception_caught):
355            """Set up is done, either clean up or run the test."""
356            if self.exception_caught == exception_caught:
357                fails.append(None)
358                return clean_up()
359            else:
360                d = self._run_user(self.case._run_test_method, self.result)
361                d.addCallback(fail_if_exception_caught)
362                d.addBoth(tear_down)
363                return d
364
365        def tear_down(ignored):
366            d = self._run_user(self.case._run_teardown, self.result)
367            d.addCallback(fail_if_exception_caught)
368            d.addBoth(clean_up)
369            return d
370
371        def force_failure(ignored):
372            if getattr(self.case, 'force_failure', None):
373                d = self._run_user(_raise_force_fail_error)
374                d.addCallback(fails.append)
375                return d
376
377        d = self._run_user(self.case._run_setup, self.result)
378        d.addCallback(set_up_done)
379        d.addBoth(force_failure)
380        d.addBoth(lambda ignored: len(fails) == 0)
381        return d
382
383    def _log_user_exception(self, e):
384        """Raise 'e' and report it as a user exception."""
385        try:
386            raise e
387        except e.__class__:
388            self._got_user_exception(sys.exc_info())
389
390    def _blocking_run_deferred(self, spinner):
391        try:
392            return trap_unhandled_errors(
393                spinner.run, self._timeout, self._run_deferred)
394        except NoResultError:
395            # We didn't get a result at all!  This could be for any number of
396            # reasons, but most likely someone hit Ctrl-C during the test.
397            self._got_user_exception(sys.exc_info())
398            self.result.stop()
399            return False, []
400        except TimeoutError:
401            # The function took too long to run.
402            self._log_user_exception(TimeoutError(self.case, self._timeout))
403            return False, []
404
405    def _get_log_fixture(self):
406        """Return the log fixture we're configured to use."""
407        fixtures = []
408        # TODO: Expose these fixtures and deprecate both of these options in
409        # favour of them.
410        if self._suppress_twisted_logging:
411            fixtures.append(_NoTwistedLogObservers())
412        if self._store_twisted_logs:
413            fixtures.append(CaptureTwistedLogs())
414        return _CompoundFixture(fixtures)
415
416    def _run_core(self):
417        # XXX: Blatting over the namespace of the test case isn't a nice thing
418        # to do. Find a better way of communicating between runtest and test
419        # case.
420        self.case.reactor = self._reactor
421        spinner = self._make_spinner()
422
423        # We can't just install these as fixtures on self.case, because we
424        # need the clean up to run even if the test times out.
425        #
426        # See https://bugs.launchpad.net/testtools/+bug/897196.
427        with self._get_log_fixture() as capture_logs:
428            for name, detail in capture_logs.getDetails().items():
429                self.case.addDetail(name, detail)
430            with _ErrorObserver(_log_observer) as error_fixture:
431                successful, unhandled = self._blocking_run_deferred(
432                    spinner)
433            for logged_error in error_fixture.flush_logged_errors():
434                successful = False
435                self._got_user_failure(
436                    logged_error, tb_label='logged-error')
437
438        if unhandled:
439            successful = False
440            for debug_info in unhandled:
441                f = debug_info.failResult
442                info = debug_info._getDebugTracebacks()
443                if info:
444                    self.case.addDetail(
445                        'unhandled-error-in-deferred-debug',
446                        text_content(info))
447                self._got_user_failure(f, 'unhandled-error-in-deferred')
448
449        junk = spinner.clear_junk()
450        if junk:
451            successful = False
452            self._log_user_exception(UncleanReactorError(junk))
453
454        if successful:
455            self.result.addSuccess(self.case, details=self.case.getDetails())
456
457    def _run_user(self, function, *args):
458        """Run a user-supplied function.
459
460        This just makes sure that it returns a Deferred, regardless of how the
461        user wrote it.
462        """
463        d = defer.maybeDeferred(function, *args)
464        return d.addErrback(self._got_user_failure)
465
466
467class AsynchronousDeferredRunTestForBrokenTwisted(AsynchronousDeferredRunTest):
468    """Test runner that works around Twisted brokenness re reactor junk.
469
470    There are many APIs within Twisted itself where a Deferred fires but
471    leaves cleanup work scheduled for the reactor to do.  Arguably, many of
472    these are bugs.  This runner iterates the reactor event loop a number of
473    times after every test, in order to shake out these buggy-but-commonplace
474    events.
475    """
476
477    def _make_spinner(self):
478        spinner = super()._make_spinner()
479        spinner._OBLIGATORY_REACTOR_ITERATIONS = 2
480        return spinner
481
482
483def assert_fails_with(d, *exc_types, **kwargs):
484    """Assert that ``d`` will fail with one of ``exc_types``.
485
486    The normal way to use this is to return the result of
487    ``assert_fails_with`` from your unit test.
488
489    Equivalent to Twisted's ``assertFailure``.
490
491    :param Deferred d: A ``Deferred`` that is expected to fail.
492    :param exc_types: The exception types that the Deferred is expected to
493        fail with.
494    :param type failureException: An optional keyword argument.  If provided,
495        will raise that exception instead of
496        ``testtools.TestCase.failureException``.
497    :return: A ``Deferred`` that will fail with an ``AssertionError`` if ``d``
498        does not fail with one of the exception types.
499    """
500    failureException = kwargs.pop('failureException', None)
501    if failureException is None:
502        # Avoid circular imports.
503        from testtools import TestCase
504        failureException = TestCase.failureException
505    expected_names = ", ".join(exc_type.__name__ for exc_type in exc_types)
506
507    def got_success(result):
508        raise failureException(
509            "{} not raised ({!r} returned)".format(expected_names, result))
510
511    def got_failure(failure):
512        if failure.check(*exc_types):
513            return failure.value
514        raise failureException("{} raised instead of {}:\n {}".format(
515            failure.type.__name__, expected_names, failure.getTraceback()))
516    return d.addCallbacks(got_success, got_failure)
517
518
519class UncleanReactorError(Exception):
520    """Raised when the reactor has junk in it."""
521
522    def __init__(self, junk):
523        Exception.__init__(
524            self,
525            "The reactor still thinks it needs to do things. Close all "
526            "connections, kill all processes and make sure all delayed "
527            "calls have either fired or been cancelled:\n%s"
528            % ''.join(map(self._get_junk_info, junk)))
529
530    def _get_junk_info(self, junk):
531        from twisted.internet.base import DelayedCall
532        if isinstance(junk, DelayedCall):
533            ret = str(junk)
534        else:
535            ret = repr(junk)
536        return '  {}\n'.format(ret)
537