1"""
2Overview
3========
4
5The multiprocess plugin enables you to distribute your test run among a set of
6worker processes that run tests in parallel. This can speed up CPU-bound test
7runs (as long as the number of work processeses is around the number of
8processors or cores available), but is mainly useful for IO-bound tests that
9spend most of their time waiting for data to arrive from someplace else.
10
11.. note ::
12
13   See :doc:`../doc_tests/test_multiprocess/multiprocess` for
14   additional documentation and examples. Use of this plugin on python
15   2.5 or earlier requires the multiprocessing_ module, also available
16   from PyPI.
17
18.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
19
20How tests are distributed
21=========================
22
23The ideal case would be to dispatch each test to a worker process
24separately. This ideal is not attainable in all cases, however, because many
25test suites depend on context (class, module or package) fixtures.
26
27The plugin can't know (unless you tell it -- see below!) if a context fixture
28can be called many times concurrently (is re-entrant), or if it can be shared
29among tests running in different processes. Therefore, if a context has
30fixtures, the default behavior is to dispatch the entire suite to a worker as
31a unit.
32
33Controlling distribution
34^^^^^^^^^^^^^^^^^^^^^^^^
35
36There are two context-level variables that you can use to control this default
37behavior.
38
39If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
40in the context, and the plugin will dispatch tests in suites bound to that
41context as if the context had no fixtures. This means that the fixtures will
42execute concurrently and multiple times, typically once per test.
43
44If a context's fixtures can be shared by tests running in different processes
45-- such as a package-level fixture that starts an external http server or
46initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
47the context. These fixtures will then execute in the primary nose process, and
48tests in those contexts will be individually dispatched to run in parallel.
49
50How results are collected and reported
51======================================
52
53As each test or suite executes in a worker process, results (failures, errors,
54and specially handled exceptions like SkipTest) are collected in that
55process. When the worker process finishes, it returns results to the main
56nose process. There, any progress output is printed (dots!), and the
57results from the test run are combined into a consolidated result
58set. When results have been received for all dispatched tests, or all
59workers have died, the result summary is output as normal.
60
61Beware!
62=======
63
64Not all test suites will benefit from, or even operate correctly using, this
65plugin. For example, CPU-bound tests will run more slowly if you don't have
66multiple processors. There are also some differences in plugin
67interactions and behaviors due to the way in which tests are dispatched and
68loaded. In general, test loading under this plugin operates as if it were
69always in directed mode instead of discovered mode. For instance, doctests
70in test modules will always be found when using this plugin with the doctest
71plugin.
72
73But the biggest issue you will face is probably concurrency. Unless you
74have kept your tests as religiously pure unit tests, with no side-effects, no
75ordering issues, and no external dependencies, chances are you will experience
76odd, intermittent and unexplainable failures and errors when using this
77plugin. This doesn't necessarily mean the plugin is broken; it may mean that
78your test suite is not safe for concurrency.
79
80New Features in 1.1.0
81=====================
82
83* functions generated by test generators are now added to the worker queue
84  making them multi-threaded.
85* fixed timeout functionality, now functions will be terminated with a
86  TimedOutException exception when they exceed their execution time. The
87  worker processes are not terminated.
88* added ``--process-restartworker`` option to restart workers once they are
89  done, this helps control memory usage. Sometimes memory leaks can accumulate
90  making long runs very difficult.
91* added global _instantiate_plugins to configure which plugins are started
92  on the worker processes.
93
94"""
95
96import logging
97import os
98import sys
99import time
100import traceback
101import unittest
102import pickle
103import signal
104import nose.case
105from nose.core import TextTestRunner
106from nose import failure
107from nose import loader
108from nose.plugins.base import Plugin
109from nose.pyversion import bytes_
110from nose.result import TextTestResult
111from nose.suite import ContextSuite
112from nose.util import test_address
113try:
114    # 2.7+
115    from unittest.runner import _WritelnDecorator
116except ImportError:
117    from unittest import _WritelnDecorator
118from queue import Empty
119from warnings import warn
120try:
121    from io import StringIO
122except ImportError:
123    import io
124
125# this is a list of plugin classes that will be checked for and created inside
126# each worker process
127_instantiate_plugins = None
128
129log = logging.getLogger(__name__)
130
131Process = Queue = Pool = Event = Value = Array = None
132
133# have to inherit KeyboardInterrupt to it will interrupt process properly
134class TimedOutException(KeyboardInterrupt):
135    def __init__(self, value = "Timed Out"):
136        self.value = value
137    def __str__(self):
138        return repr(self.value)
139
140def _import_mp():
141    global Process, Queue, Pool, Event, Value, Array
142    try:
143        from multiprocessing import Manager, Process
144        #prevent the server process created in the manager which holds Python
145        #objects and allows other processes to manipulate them using proxies
146        #to interrupt on SIGINT (keyboardinterrupt) so that the communication
147        #channel between subprocesses and main process is still usable after
148        #ctrl+C is received in the main process.
149        old=signal.signal(signal.SIGINT, signal.SIG_IGN)
150        m = Manager()
151        #reset it back so main process will receive a KeyboardInterrupt
152        #exception on ctrl+c
153        signal.signal(signal.SIGINT, old)
154        Queue, Pool, Event, Value, Array = (
155                m.Queue, m.Pool, m.Event, m.Value, m.Array
156        )
157    except ImportError:
158        warn("multiprocessing module is not available, multiprocess plugin "
159             "cannot be used", RuntimeWarning)
160
161
162class TestLet:
163    def __init__(self, case):
164        try:
165            self._id = case.id()
166        except AttributeError:
167            pass
168        self._short_description = case.shortDescription()
169        self._str = str(case)
170
171    def id(self):
172        return self._id
173
174    def shortDescription(self):
175        return self._short_description
176
177    def __str__(self):
178        return self._str
179
180class MultiProcess(Plugin):
181    """
182    Run tests in multiple processes. Requires processing module.
183    """
184    score = 1000
185    status = {}
186
187    def options(self, parser, env):
188        """
189        Register command-line options.
190        """
191        parser.add_option("--processes", action="store",
192                          default=env.get('NOSE_PROCESSES', 0),
193                          dest="multiprocess_workers",
194                          metavar="NUM",
195                          help="Spread test run among this many processes. "
196                          "Set a number equal to the number of processors "
197                          "or cores in your machine for best results. "
198                          "Pass a negative number to have the number of "
199                          "processes automatically set to the number of "
200                          "cores. Passing 0 means to disable parallel "
201                          "testing. Default is 0 unless NOSE_PROCESSES is "
202                          "set. "
203                          "[NOSE_PROCESSES]")
204        parser.add_option("--process-timeout", action="store",
205                          default=env.get('NOSE_PROCESS_TIMEOUT', 10),
206                          dest="multiprocess_timeout",
207                          metavar="SECONDS",
208                          help="Set timeout for return of results from each "
209                          "test runner process. Default is 10. "
210                          "[NOSE_PROCESS_TIMEOUT]")
211        parser.add_option("--process-restartworker", action="store_true",
212                          default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
213                          dest="multiprocess_restartworker",
214                          help="If set, will restart each worker process once"
215                          " their tests are done, this helps control memory "
216                          "leaks from killing the system. "
217                          "[NOSE_PROCESS_RESTARTWORKER]")
218
219    def configure(self, options, config):
220        """
221        Configure plugin.
222        """
223        try:
224            self.status.pop('active')
225        except KeyError:
226            pass
227        if not hasattr(options, 'multiprocess_workers'):
228            self.enabled = False
229            return
230        # don't start inside of a worker process
231        if config.worker:
232            return
233        self.config = config
234        try:
235            workers = int(options.multiprocess_workers)
236        except (TypeError, ValueError):
237            workers = 0
238        if workers:
239            _import_mp()
240            if Process is None:
241                self.enabled = False
242                return
243            # Negative number of workers will cause multiprocessing to hang.
244            # Set the number of workers to the CPU count to avoid this.
245            if workers < 0:
246                try:
247                    import multiprocessing
248                    workers = multiprocessing.cpu_count()
249                except NotImplementedError:
250                    self.enabled = False
251                    return
252            self.enabled = True
253            self.config.multiprocess_workers = workers
254            t = float(options.multiprocess_timeout)
255            self.config.multiprocess_timeout = t
256            r = int(options.multiprocess_restartworker)
257            self.config.multiprocess_restartworker = r
258            self.status['active'] = True
259
260    def prepareTestLoader(self, loader):
261        """Remember loader class so MultiProcessTestRunner can instantiate
262        the right loader.
263        """
264        self.loaderClass = loader.__class__
265
266    def prepareTestRunner(self, runner):
267        """Replace test runner with MultiProcessTestRunner.
268        """
269        # replace with our runner class
270        return MultiProcessTestRunner(stream=runner.stream,
271                                      verbosity=self.config.verbosity,
272                                      config=self.config,
273                                      loaderClass=self.loaderClass)
274
275def signalhandler(sig, frame):
276    raise TimedOutException()
277
278class MultiProcessTestRunner(TextTestRunner):
279    waitkilltime = 5.0 # max time to wait to terminate a process that does not
280                       # respond to SIGILL
281    def __init__(self, **kw):
282        self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
283        super(MultiProcessTestRunner, self).__init__(**kw)
284
285    def collect(self, test, testQueue, tasks, to_teardown, result):
286        # dispatch and collect results
287        # put indexes only on queue because tests aren't picklable
288        for case in self.nextBatch(test):
289            log.debug("Next batch %s (%s)", case, type(case))
290            if (isinstance(case, nose.case.Test) and
291                isinstance(case.test, failure.Failure)):
292                log.debug("Case is a Failure")
293                case(result) # run here to capture the failure
294                continue
295            # handle shared fixtures
296            if isinstance(case, ContextSuite) and case.context is failure.Failure:
297                log.debug("Case is a Failure")
298                case(result) # run here to capture the failure
299                continue
300            elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
301                log.debug("%s has shared fixtures", case)
302                try:
303                    case.setUp()
304                except (KeyboardInterrupt, SystemExit):
305                    raise
306                except:
307                    log.debug("%s setup failed", sys.exc_info())
308                    result.addError(case, sys.exc_info())
309                else:
310                    to_teardown.append(case)
311                    if case.factory:
312                        ancestors=case.factory.context.get(case, [])
313                        for an in ancestors[:2]:
314                            #log.debug('reset ancestor %s', an)
315                            if getattr(an, '_multiprocess_shared_', False):
316                                an._multiprocess_can_split_=True
317                            #an._multiprocess_shared_=False
318                    self.collect(case, testQueue, tasks, to_teardown, result)
319
320            else:
321                test_addr = self.addtask(testQueue,tasks,case)
322                log.debug("Queued test %s (%s) to %s",
323                          len(tasks), test_addr, testQueue)
324
325    def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
326        currentaddr = Value('c',bytes_(''))
327        currentstart = Value('d',time.time())
328        keyboardCaught = Event()
329        p = Process(target=runner,
330                   args=(iworker, testQueue,
331                         resultQueue,
332                         currentaddr,
333                         currentstart,
334                         keyboardCaught,
335                         shouldStop,
336                         self.loaderClass,
337                         result.__class__,
338                         pickle.dumps(self.config)))
339        p.currentaddr = currentaddr
340        p.currentstart = currentstart
341        p.keyboardCaught = keyboardCaught
342        old = signal.signal(signal.SIGILL, signalhandler)
343        p.start()
344        signal.signal(signal.SIGILL, old)
345        return p
346
347    def run(self, test):
348        """
349        Execute the test (which may be a test suite). If the test is a suite,
350        distribute it out among as many processes as have been configured, at
351        as fine a level as is possible given the context fixtures defined in
352        the suite or any sub-suites.
353
354        """
355        log.debug("%s.run(%s) (%s)", self, test, os.getpid())
356        wrapper = self.config.plugins.prepareTest(test)
357        if wrapper is not None:
358            test = wrapper
359
360        # plugins can decorate or capture the output stream
361        wrapped = self.config.plugins.setOutputStream(self.stream)
362        if wrapped is not None:
363            self.stream = wrapped
364
365        testQueue = Queue()
366        resultQueue = Queue()
367        tasks = []
368        completed = []
369        workers = []
370        to_teardown = []
371        shouldStop = Event()
372
373        result = self._makeResult()
374        start = time.time()
375
376        self.collect(test, testQueue, tasks, to_teardown, result)
377
378        log.debug("Starting %s workers", self.config.multiprocess_workers)
379        for i in range(self.config.multiprocess_workers):
380            p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
381            workers.append(p)
382            log.debug("Started worker process %s", i+1)
383
384        total_tasks = len(tasks)
385        # need to keep track of the next time to check for timeouts in case
386        # more than one process times out at the same time.
387        nexttimeout=self.config.multiprocess_timeout
388        thrownError = None
389
390        try:
391            while tasks:
392                log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
393                          len(completed), total_tasks,nexttimeout)
394                try:
395                    iworker, addr, newtask_addrs, batch_result = resultQueue.get(
396                                                            timeout=nexttimeout)
397                    log.debug('Results received for worker %d, %s, new tasks: %d',
398                              iworker,addr,len(newtask_addrs))
399                    try:
400                        try:
401                            tasks.remove(addr)
402                        except ValueError:
403                            log.warn('worker %s failed to remove from tasks: %s',
404                                     iworker,addr)
405                        total_tasks += len(newtask_addrs)
406                        tasks.extend(newtask_addrs)
407                    except KeyError:
408                        log.debug("Got result for unknown task? %s", addr)
409                        log.debug("current: %s",str(list(tasks)[0]))
410                    else:
411                        completed.append([addr,batch_result])
412                    self.consolidate(result, batch_result)
413                    if (self.config.stopOnError
414                        and not result.wasSuccessful()):
415                        # set the stop condition
416                        shouldStop.set()
417                        break
418                    if self.config.multiprocess_restartworker:
419                        log.debug('joining worker %s',iworker)
420                        # wait for working, but not that important if worker
421                        # cannot be joined in fact, for workers that add to
422                        # testQueue, they will not terminate until all their
423                        # items are read
424                        workers[iworker].join(timeout=1)
425                        if not shouldStop.is_set() and not testQueue.empty():
426                            log.debug('starting new process on worker %s',iworker)
427                            workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
428                except Empty:
429                    log.debug("Timed out with %s tasks pending "
430                              "(empty testQueue=%r): %s",
431                              len(tasks),testQueue.empty(),str(tasks))
432                    any_alive = False
433                    for iworker, w in enumerate(workers):
434                        if w.is_alive():
435                            worker_addr = bytes_(w.currentaddr.value,'ascii')
436                            timeprocessing = time.time() - w.currentstart.value
437                            if ( len(worker_addr) == 0
438                                    and timeprocessing > self.config.multiprocess_timeout-0.1):
439                                log.debug('worker %d has finished its work item, '
440                                          'but is not exiting? do we wait for it?',
441                                          iworker)
442                            else:
443                                any_alive = True
444                            if (len(worker_addr) > 0
445                                and timeprocessing > self.config.multiprocess_timeout-0.1):
446                                log.debug('timed out worker %s: %s',
447                                          iworker,worker_addr)
448                                w.currentaddr.value = bytes_('')
449                                # If the process is in C++ code, sending a SIGILL
450                                # might not send a python KeybordInterrupt exception
451                                # therefore, send multiple signals until an
452                                # exception is caught. If this takes too long, then
453                                # terminate the process
454                                w.keyboardCaught.clear()
455                                startkilltime = time.time()
456                                while not w.keyboardCaught.is_set() and w.is_alive():
457                                    if time.time()-startkilltime > self.waitkilltime:
458                                        # have to terminate...
459                                        log.error("terminating worker %s",iworker)
460                                        w.terminate()
461                                        # there is a small probability that the
462                                        # terminated process might send a result,
463                                        # which has to be specially handled or
464                                        # else processes might get orphaned.
465                                        workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
466                                        break
467                                    os.kill(w.pid, signal.SIGILL)
468                                    time.sleep(0.1)
469                    if not any_alive and testQueue.empty():
470                        log.debug("All workers dead")
471                        break
472                nexttimeout=self.config.multiprocess_timeout
473                for w in workers:
474                    if w.is_alive() and len(w.currentaddr.value) > 0:
475                        timeprocessing = time.time()-w.currentstart.value
476                        if timeprocessing <= self.config.multiprocess_timeout:
477                            nexttimeout = min(nexttimeout,
478                                self.config.multiprocess_timeout-timeprocessing)
479            log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
480
481        except (KeyboardInterrupt, SystemExit) as e:
482            log.info('parent received ctrl-c when waiting for test results')
483            thrownError = e
484            #resultQueue.get(False)
485
486            result.addError(test, sys.exc_info())
487
488        try:
489            for case in to_teardown:
490                log.debug("Tearing down shared fixtures for %s", case)
491                try:
492                    case.tearDown()
493                except (KeyboardInterrupt, SystemExit):
494                    raise
495                except:
496                    result.addError(case, sys.exc_info())
497
498            stop = time.time()
499
500            # first write since can freeze on shutting down processes
501            result.printErrors()
502            result.printSummary(start, stop)
503            self.config.plugins.finalize(result)
504
505            if thrownError is None:
506                log.debug("Tell all workers to stop")
507                for w in workers:
508                    if w.is_alive():
509                        testQueue.put('STOP', block=False)
510
511            # wait for the workers to end
512            for iworker,worker in enumerate(workers):
513                if worker.is_alive():
514                    log.debug('joining worker %s',iworker)
515                    worker.join()
516                    if worker.is_alive():
517                        log.debug('failed to join worker %s',iworker)
518        except (KeyboardInterrupt, SystemExit):
519            log.info('parent received ctrl-c when shutting down: stop all processes')
520            for worker in workers:
521                if worker.is_alive():
522                    worker.terminate()
523
524            if thrownError: raise thrownError
525            else: raise
526
527        return result
528
529    def addtask(testQueue,tasks,case):
530        arg = None
531        if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
532            # this removes the top level descriptor and allows real function
533            # name to be returned
534            case.test.descriptor = None
535            arg = case.test.arg
536        test_addr = MultiProcessTestRunner.address(case)
537        testQueue.put((test_addr,arg), block=False)
538        if arg is not None:
539            test_addr += str(arg)
540        if tasks is not None:
541            tasks.append(test_addr)
542        return test_addr
543    addtask = staticmethod(addtask)
544
545    def address(case):
546        if hasattr(case, 'address'):
547            file, mod, call = case.address()
548        elif hasattr(case, 'context'):
549            file, mod, call = test_address(case.context)
550        else:
551            raise Exception("Unable to convert %s to address" % case)
552        parts = []
553        if file is None:
554            if mod is None:
555                raise Exception("Unaddressable case %s" % case)
556            else:
557                parts.append(mod)
558        else:
559            # strip __init__.py(c) from end of file part
560            # if present, having it there confuses loader
561            dirname, basename = os.path.split(file)
562            if basename.startswith('__init__'):
563                file = dirname
564            parts.append(file)
565        if call is not None:
566            parts.append(call)
567        return ':'.join(map(str, parts))
568    address = staticmethod(address)
569
570    def nextBatch(self, test):
571        # allows tests or suites to mark themselves as not safe
572        # for multiprocess execution
573        if hasattr(test, 'context'):
574            if not getattr(test.context, '_multiprocess_', True):
575                return
576
577        if ((isinstance(test, ContextSuite)
578             and test.hasFixtures(self.checkCanSplit))
579            or not getattr(test, 'can_split', True)
580            or not isinstance(test, unittest.TestSuite)):
581            # regular test case, or a suite with context fixtures
582
583            # special case: when run like nosetests path/to/module.py
584            # the top-level suite has only one item, and it shares
585            # the same context as that item. In that case, we want the
586            # item, not the top-level suite
587            if isinstance(test, ContextSuite):
588                contained = list(test)
589                if (len(contained) == 1
590                    and getattr(contained[0],
591                                'context', None) == test.context):
592                    test = contained[0]
593            yield test
594        else:
595            # Suite is without fixtures at this level; but it may have
596            # fixtures at any deeper level, so we need to examine it all
597            # the way down to the case level
598            for case in test:
599                for batch in self.nextBatch(case):
600                    yield batch
601
602    def checkCanSplit(context, fixt):
603        """
604        Callback that we use to check whether the fixtures found in a
605        context or ancestor are ones we care about.
606
607        Contexts can tell us that their fixtures are reentrant by setting
608        _multiprocess_can_split_. So if we see that, we return False to
609        disregard those fixtures.
610        """
611        if not fixt:
612            return False
613        if getattr(context, '_multiprocess_can_split_', False):
614            return False
615        return True
616    checkCanSplit = staticmethod(checkCanSplit)
617
618    def sharedFixtures(self, case):
619        context = getattr(case, 'context', None)
620        if not context:
621            return False
622        return getattr(context, '_multiprocess_shared_', False)
623
624    def consolidate(self, result, batch_result):
625        log.debug("batch result is %s" , batch_result)
626        try:
627            output, testsRun, failures, errors, errorClasses = batch_result
628        except ValueError:
629            log.debug("result in unexpected format %s", batch_result)
630            failure.Failure(*sys.exc_info())(result)
631            return
632        self.stream.write(output)
633        result.testsRun += testsRun
634        result.failures.extend(failures)
635        result.errors.extend(errors)
636        for key, (storage, label, isfail) in list(errorClasses.items()):
637            if key not in result.errorClasses:
638                # Ordinarily storage is result attribute
639                # but it's only processed through the errorClasses
640                # dict, so it's ok to fake it here
641                result.errorClasses[key] = ([], label, isfail)
642            mystorage, _junk, _junk = result.errorClasses[key]
643            mystorage.extend(storage)
644        log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
645
646
647def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
648           keyboardCaught, shouldStop, loaderClass, resultClass, config):
649    try:
650        try:
651            return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
652                    keyboardCaught, shouldStop, loaderClass, resultClass, config)
653        except KeyboardInterrupt:
654            log.debug('Worker %s keyboard interrupt, stopping',ix)
655    except Empty:
656        log.debug("Worker %s timed out waiting for tasks", ix)
657
658def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
659           keyboardCaught, shouldStop, loaderClass, resultClass, config):
660
661    config = pickle.loads(config)
662    dummy_parser = config.parserClass()
663    if _instantiate_plugins is not None:
664        for pluginclass in _instantiate_plugins:
665            plugin = pluginclass()
666            plugin.addOptions(dummy_parser,{})
667            config.plugins.addPlugin(plugin)
668    config.plugins.configure(config.options,config)
669    config.plugins.begin()
670    log.debug("Worker %s executing, pid=%d", ix,os.getpid())
671    loader = loaderClass(config=config)
672    loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
673
674    def get():
675        return testQueue.get(timeout=config.multiprocess_timeout)
676
677    def makeResult():
678        stream = _WritelnDecorator(StringIO())
679        result = resultClass(stream, descriptions=1,
680                             verbosity=config.verbosity,
681                             config=config)
682        plug_result = config.plugins.prepareTestResult(result)
683        if plug_result:
684            return plug_result
685        return result
686
687    def batch(result):
688        failures = [(TestLet(c), err) for c, err in result.failures]
689        errors = [(TestLet(c), err) for c, err in result.errors]
690        errorClasses = {}
691        for key, (storage, label, isfail) in list(result.errorClasses.items()):
692            errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
693                                 label, isfail)
694        return (
695            result.stream.getvalue(),
696            result.testsRun,
697            failures,
698            errors,
699            errorClasses)
700    for test_addr, arg in iter(get, 'STOP'):
701        if shouldStop.is_set():
702            log.exception('Worker %d STOPPED',ix)
703            break
704        result = makeResult()
705        test = loader.loadTestsFromNames([test_addr])
706        test.testQueue = testQueue
707        test.tasks = []
708        test.arg = arg
709        log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
710        try:
711            if arg is not None:
712                test_addr = test_addr + str(arg)
713            currentaddr.value = bytes_(test_addr)
714            currentstart.value = time.time()
715            test(result)
716            currentaddr.value = bytes_('')
717            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
718        except KeyboardInterrupt as e: #TimedOutException:
719            timeout = isinstance(e, TimedOutException)
720            if timeout:
721                keyboardCaught.set()
722            if len(currentaddr.value):
723                if timeout:
724                    msg = 'Worker %s timed out, failing current test %s'
725                else:
726                    msg = 'Worker %s keyboard interrupt, failing current test %s'
727                log.exception(msg,ix,test_addr)
728                currentaddr.value = bytes_('')
729                failure.Failure(*sys.exc_info())(result)
730                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
731            else:
732                if timeout:
733                    msg = 'Worker %s test %s timed out'
734                else:
735                    msg = 'Worker %s test %s keyboard interrupt'
736                log.debug(msg,ix,test_addr)
737                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
738            if not timeout:
739                raise
740        except SystemExit:
741            currentaddr.value = bytes_('')
742            log.exception('Worker %s system exit',ix)
743            raise
744        except:
745            currentaddr.value = bytes_('')
746            log.exception("Worker %s error running test or returning "
747                            "results",ix)
748            failure.Failure(*sys.exc_info())(result)
749            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
750        if config.multiprocess_restartworker:
751            break
752    log.debug("Worker %s ending", ix)
753
754
755class NoSharedFixtureContextSuite(ContextSuite):
756    """
757    Context suite that never fires shared fixtures.
758
759    When a context sets _multiprocess_shared_, fixtures in that context
760    are executed by the main process. Using this suite class prevents them
761    from executing in the runner process as well.
762
763    """
764    testQueue = None
765    tasks = None
766    arg = None
767    def setupContext(self, context):
768        if getattr(context, '_multiprocess_shared_', False):
769            return
770        super(NoSharedFixtureContextSuite, self).setupContext(context)
771
772    def teardownContext(self, context):
773        if getattr(context, '_multiprocess_shared_', False):
774            return
775        super(NoSharedFixtureContextSuite, self).teardownContext(context)
776    def run(self, result):
777        """Run tests in suite inside of suite fixtures.
778        """
779        # proxy the result for myself
780        log.debug("suite %s (%s) run called, tests: %s",
781                  id(self), self, self._tests)
782        if self.resultProxy:
783            result, orig = self.resultProxy(result, self), result
784        else:
785            result, orig = result, result
786        try:
787            #log.debug('setUp for %s', id(self));
788            self.setUp()
789        except KeyboardInterrupt:
790            raise
791        except:
792            self.error_context = 'setup'
793            result.addError(self, self._exc_info())
794            return
795        try:
796            for test in self._tests:
797                if (isinstance(test,nose.case.Test)
798                    and self.arg is not None):
799                    test.test.arg = self.arg
800                else:
801                    test.arg = self.arg
802                test.testQueue = self.testQueue
803                test.tasks = self.tasks
804                if result.shouldStop:
805                    log.debug("stopping")
806                    break
807                # each nose.case.Test will create its own result proxy
808                # so the cases need the original result, to avoid proxy
809                # chains
810                #log.debug('running test %s in suite %s', test, self);
811                try:
812                    test(orig)
813                except KeyboardInterrupt as e:
814                    timeout = isinstance(e, TimedOutException)
815                    if timeout:
816                        msg = 'Timeout when running test %s in suite %s'
817                    else:
818                        msg = 'KeyboardInterrupt when running test %s in suite %s'
819                    log.debug(msg, test, self)
820                    err = (TimedOutException,TimedOutException(str(test)),
821                           sys.exc_info()[2])
822                    test.config.plugins.addError(test,err)
823                    orig.addError(test,err)
824                    if not timeout:
825                        raise
826        finally:
827            self.has_run = True
828            try:
829                #log.debug('tearDown for %s', id(self));
830                self.tearDown()
831            except KeyboardInterrupt:
832                raise
833            except:
834                self.error_context = 'teardown'
835                result.addError(self, self._exc_info())
836