1# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Support for results that aren't immediately available.
7
8Maintainer: Glyph Lefkowitz
9
10@var _NO_RESULT: The result used to represent the fact that there is no
11    result. B{Never ever ever use this as an actual result for a Deferred}.  You
12    have been warned.
13
14@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
15    chain.  Always accompanied by a Deferred instance in the args tuple pointing
16    at the Deferred which is chained to the Deferred which has this marker.
17"""
18
19from __future__ import division, absolute_import
20
21import traceback
22import types
23import warnings
24from sys import exc_info
25from functools import wraps
26
27# Twisted imports
28from twisted.python.compat import cmp, comparable
29from twisted.python import lockfile, log, failure
30from twisted.python.deprecate import warnAboutFunction
31
32
33
34class AlreadyCalledError(Exception):
35    pass
36
37
38
39class CancelledError(Exception):
40    """
41    This error is raised by default when a L{Deferred} is cancelled.
42    """
43
44
45class TimeoutError(Exception):
46    """
47    This exception is deprecated.  It is used only by the deprecated
48    L{Deferred.setTimeout} method.
49    """
50
51
52
53def logError(err):
54    log.err(err)
55    return err
56
57
58
59def succeed(result):
60    """
61    Return a L{Deferred} that has already had C{.callback(result)} called.
62
63    This is useful when you're writing synchronous code to an
64    asynchronous interface: i.e., some code is calling you expecting a
65    L{Deferred} result, but you don't actually need to do anything
66    asynchronous. Just return C{defer.succeed(theResult)}.
67
68    See L{fail} for a version of this function that uses a failing
69    L{Deferred} rather than a successful one.
70
71    @param result: The result to give to the Deferred's 'callback'
72           method.
73
74    @rtype: L{Deferred}
75    """
76    d = Deferred()
77    d.callback(result)
78    return d
79
80
81
82def fail(result=None):
83    """
84    Return a L{Deferred} that has already had C{.errback(result)} called.
85
86    See L{succeed}'s docstring for rationale.
87
88    @param result: The same argument that L{Deferred.errback} takes.
89
90    @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
91        current exception state.
92
93    @rtype: L{Deferred}
94    """
95    d = Deferred()
96    d.errback(result)
97    return d
98
99
100
101def execute(callable, *args, **kw):
102    """
103    Create a L{Deferred} from a callable and arguments.
104
105    Call the given function with the given arguments.  Return a L{Deferred}
106    which has been fired with its callback as the result of that invocation
107    or its C{errback} with a L{Failure} for the exception thrown.
108    """
109    try:
110        result = callable(*args, **kw)
111    except:
112        return fail()
113    else:
114        return succeed(result)
115
116
117
118def maybeDeferred(f, *args, **kw):
119    """
120    Invoke a function that may or may not return a L{Deferred}.
121
122    Call the given function with the given arguments.  If the returned
123    object is a L{Deferred}, return it.  If the returned object is a L{Failure},
124    wrap it with L{fail} and return it.  Otherwise, wrap it in L{succeed} and
125    return it.  If an exception is raised, convert it to a L{Failure}, wrap it
126    in L{fail}, and then return it.
127
128    @type f: Any callable
129    @param f: The callable to invoke
130
131    @param args: The arguments to pass to C{f}
132    @param kw: The keyword arguments to pass to C{f}
133
134    @rtype: L{Deferred}
135    @return: The result of the function call, wrapped in a L{Deferred} if
136    necessary.
137    """
138    try:
139        result = f(*args, **kw)
140    except:
141        return fail(failure.Failure(captureVars=Deferred.debug))
142
143    if isinstance(result, Deferred):
144        return result
145    elif isinstance(result, failure.Failure):
146        return fail(result)
147    else:
148        return succeed(result)
149
150
151
152def timeout(deferred):
153    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
154
155
156
157def passthru(arg):
158    return arg
159
160
161
162def setDebugging(on):
163    """
164    Enable or disable L{Deferred} debugging.
165
166    When debugging is on, the call stacks from creation and invocation are
167    recorded, and added to any L{AlreadyCalledErrors} we raise.
168    """
169    Deferred.debug=bool(on)
170
171
172
173def getDebugging():
174    """
175    Determine whether L{Deferred} debugging is enabled.
176    """
177    return Deferred.debug
178
179
180# See module docstring.
181_NO_RESULT = object()
182_CONTINUE = object()
183
184
185
186class Deferred:
187    """
188    This is a callback which will be put off until later.
189
190    Why do we want this? Well, in cases where a function in a threaded
191    program would block until it gets a result, for Twisted it should
192    not block. Instead, it should return a L{Deferred}.
193
194    This can be implemented for protocols that run over the network by
195    writing an asynchronous protocol for L{twisted.internet}. For methods
196    that come from outside packages that are not under our control, we use
197    threads (see for example L{twisted.enterprise.adbapi}).
198
199    For more information about Deferreds, see doc/core/howto/defer.html or
200    U{http://twistedmatrix.com/documents/current/core/howto/defer.html}
201
202    When creating a Deferred, you may provide a canceller function, which
203    will be called by d.cancel() to let you do any clean-up necessary if the
204    user decides not to wait for the deferred to complete.
205
206    @ivar called: A flag which is C{False} until either C{callback} or
207        C{errback} is called and afterwards always C{True}.
208    @type called: C{bool}
209
210    @ivar paused: A counter of how many unmatched C{pause} calls have been made
211        on this instance.
212    @type paused: C{int}
213
214    @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism
215        which is C{True} if the Deferred has no canceller and has been
216        cancelled, C{False} otherwise.  If C{True}, it can be expected that
217        C{callback} or C{errback} will eventually be called and the result
218        should be silently discarded.
219    @type _suppressAlreadyCalled: C{bool}
220
221    @ivar _runningCallbacks: A flag which is C{True} while this instance is
222        executing its callback chain, used to stop recursive execution of
223        L{_runCallbacks}
224    @type _runningCallbacks: C{bool}
225
226    @ivar _chainedTo: If this Deferred is waiting for the result of another
227        Deferred, this is a reference to the other Deferred.  Otherwise, C{None}.
228    """
229
230    called = False
231    paused = 0
232    _debugInfo = None
233    _suppressAlreadyCalled = False
234
235    # Are we currently running a user-installed callback?  Meant to prevent
236    # recursive running of callbacks when a reentrant call to add a callback is
237    # used.
238    _runningCallbacks = False
239
240    # Keep this class attribute for now, for compatibility with code that
241    # sets it directly.
242    debug = False
243
244    _chainedTo = None
245
246    def __init__(self, canceller=None):
247        """
248        Initialize a L{Deferred}.
249
250        @param canceller: a callable used to stop the pending operation
251            scheduled by this L{Deferred} when L{Deferred.cancel} is
252            invoked. The canceller will be passed the deferred whose
253            cancelation is requested (i.e., self).
254
255            If a canceller is not given, or does not invoke its argument's
256            C{callback} or C{errback} method, L{Deferred.cancel} will
257            invoke L{Deferred.errback} with a L{CancelledError}.
258
259            Note that if a canceller is not given, C{callback} or
260            C{errback} may still be invoked exactly once, even though
261            defer.py will have already invoked C{errback}, as described
262            above.  This allows clients of code which returns a L{Deferred}
263            to cancel it without requiring the L{Deferred} instantiator to
264            provide any specific implementation support for cancellation.
265            New in 10.1.
266
267        @type canceller: a 1-argument callable which takes a L{Deferred}. The
268            return result is ignored.
269        """
270        self.callbacks = []
271        self._canceller = canceller
272        if self.debug:
273            self._debugInfo = DebugInfo()
274            self._debugInfo.creator = traceback.format_stack()[:-1]
275
276
277    def addCallbacks(self, callback, errback=None,
278                     callbackArgs=None, callbackKeywords=None,
279                     errbackArgs=None, errbackKeywords=None):
280        """
281        Add a pair of callbacks (success and error) to this L{Deferred}.
282
283        These will be executed when the 'master' callback is run.
284
285        @return: C{self}.
286        @rtype: a L{Deferred}
287        """
288        assert callable(callback)
289        assert errback == None or callable(errback)
290        cbs = ((callback, callbackArgs, callbackKeywords),
291               (errback or (passthru), errbackArgs, errbackKeywords))
292        self.callbacks.append(cbs)
293
294        if self.called:
295            self._runCallbacks()
296        return self
297
298
299    def addCallback(self, callback, *args, **kw):
300        """
301        Convenience method for adding just a callback.
302
303        See L{addCallbacks}.
304        """
305        return self.addCallbacks(callback, callbackArgs=args,
306                                 callbackKeywords=kw)
307
308
309    def addErrback(self, errback, *args, **kw):
310        """
311        Convenience method for adding just an errback.
312
313        See L{addCallbacks}.
314        """
315        return self.addCallbacks(passthru, errback,
316                                 errbackArgs=args,
317                                 errbackKeywords=kw)
318
319
320    def addBoth(self, callback, *args, **kw):
321        """
322        Convenience method for adding a single callable as both a callback
323        and an errback.
324
325        See L{addCallbacks}.
326        """
327        return self.addCallbacks(callback, callback,
328                                 callbackArgs=args, errbackArgs=args,
329                                 callbackKeywords=kw, errbackKeywords=kw)
330
331
332    def chainDeferred(self, d):
333        """
334        Chain another L{Deferred} to this L{Deferred}.
335
336        This method adds callbacks to this L{Deferred} to call C{d}'s callback
337        or errback, as appropriate. It is merely a shorthand way of performing
338        the following::
339
340            self.addCallbacks(d.callback, d.errback)
341
342        When you chain a deferred d2 to another deferred d1 with
343        d1.chainDeferred(d2), you are making d2 participate in the callback
344        chain of d1. Thus any event that fires d1 will also fire d2.
345        However, the converse is B{not} true; if d2 is fired d1 will not be
346        affected.
347
348        Note that unlike the case where chaining is caused by a L{Deferred}
349        being returned from a callback, it is possible to cause the call
350        stack size limit to be exceeded by chaining many L{Deferred}s
351        together with C{chainDeferred}.
352
353        @return: C{self}.
354        @rtype: a L{Deferred}
355        """
356        d._chainedTo = self
357        return self.addCallbacks(d.callback, d.errback)
358
359
360    def callback(self, result):
361        """
362        Run all success callbacks that have been added to this L{Deferred}.
363
364        Each callback will have its result passed as the first argument to
365        the next; this way, the callbacks act as a 'processing chain'.  If
366        the success-callback returns a L{Failure} or raises an L{Exception},
367        processing will continue on the *error* callback chain.  If a
368        callback (or errback) returns another L{Deferred}, this L{Deferred}
369        will be chained to it (and further callbacks will not run until that
370        L{Deferred} has a result).
371
372        An instance of L{Deferred} may only have either L{callback} or
373        L{errback} called on it, and only once.
374
375        @param result: The object which will be passed to the first callback
376            added to this L{Deferred} (via L{addCallback}).
377
378        @raise AlreadyCalledError: If L{callback} or L{errback} has already been
379            called on this L{Deferred}.
380        """
381        assert not isinstance(result, Deferred)
382        self._startRunCallbacks(result)
383
384
385    def errback(self, fail=None):
386        """
387        Run all error callbacks that have been added to this L{Deferred}.
388
389        Each callback will have its result passed as the first
390        argument to the next; this way, the callbacks act as a
391        'processing chain'. Also, if the error-callback returns a non-Failure
392        or doesn't raise an L{Exception}, processing will continue on the
393        *success*-callback chain.
394
395        If the argument that's passed to me is not a L{failure.Failure} instance,
396        it will be embedded in one. If no argument is passed, a
397        L{failure.Failure} instance will be created based on the current
398        traceback stack.
399
400        Passing a string as `fail' is deprecated, and will be punished with
401        a warning message.
402
403        An instance of L{Deferred} may only have either L{callback} or
404        L{errback} called on it, and only once.
405
406        @param fail: The L{Failure} object which will be passed to the first
407            errback added to this L{Deferred} (via L{addErrback}).
408            Alternatively, a L{Exception} instance from which a L{Failure} will
409            be constructed (with no traceback) or C{None} to create a L{Failure}
410            instance from the current exception state (with a traceback).
411
412        @raise AlreadyCalledError: If L{callback} or L{errback} has already been
413            called on this L{Deferred}.
414
415        @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
416            no current exception state.
417        """
418        if fail is None:
419            fail = failure.Failure(captureVars=self.debug)
420        elif not isinstance(fail, failure.Failure):
421            fail = failure.Failure(fail)
422
423        self._startRunCallbacks(fail)
424
425
426    def pause(self):
427        """
428        Stop processing on a L{Deferred} until L{unpause}() is called.
429        """
430        self.paused = self.paused + 1
431
432
433    def unpause(self):
434        """
435        Process all callbacks made since L{pause}() was called.
436        """
437        self.paused = self.paused - 1
438        if self.paused:
439            return
440        if self.called:
441            self._runCallbacks()
442
443
444    def cancel(self):
445        """
446        Cancel this L{Deferred}.
447
448        If the L{Deferred} has not yet had its C{errback} or C{callback} method
449        invoked, call the canceller function provided to the constructor. If
450        that function does not invoke C{callback} or C{errback}, or if no
451        canceller function was provided, errback with L{CancelledError}.
452
453        If this L{Deferred} is waiting on another L{Deferred}, forward the
454        cancellation to the other L{Deferred}.
455        """
456        if not self.called:
457            canceller = self._canceller
458            if canceller:
459                canceller(self)
460            else:
461                # Arrange to eat the callback that will eventually be fired
462                # since there was no real canceller.
463                self._suppressAlreadyCalled = True
464            if not self.called:
465                # There was no canceller, or the canceller didn't call
466                # callback or errback.
467                self.errback(failure.Failure(CancelledError()))
468        elif isinstance(self.result, Deferred):
469            # Waiting for another deferred -- cancel it instead.
470            self.result.cancel()
471
472
473    def _startRunCallbacks(self, result):
474        if self.called:
475            if self._suppressAlreadyCalled:
476                self._suppressAlreadyCalled = False
477                return
478            if self.debug:
479                if self._debugInfo is None:
480                    self._debugInfo = DebugInfo()
481                extra = "\n" + self._debugInfo._getDebugTracebacks()
482                raise AlreadyCalledError(extra)
483            raise AlreadyCalledError
484        if self.debug:
485            if self._debugInfo is None:
486                self._debugInfo = DebugInfo()
487            self._debugInfo.invoker = traceback.format_stack()[:-2]
488        self.called = True
489        self.result = result
490        self._runCallbacks()
491
492
493    def _continuation(self):
494        """
495        Build a tuple of callback and errback with L{_continue} to be used by
496        L{_addContinue} and L{_removeContinue} on another Deferred.
497        """
498        return ((_CONTINUE, (self,), None),
499                (_CONTINUE, (self,), None))
500
501
502    def _runCallbacks(self):
503        """
504        Run the chain of callbacks once a result is available.
505
506        This consists of a simple loop over all of the callbacks, calling each
507        with the current result and making the current result equal to the
508        return value (or raised exception) of that call.
509
510        If C{self._runningCallbacks} is true, this loop won't run at all, since
511        it is already running above us on the call stack.  If C{self.paused} is
512        true, the loop also won't run, because that's what it means to be
513        paused.
514
515        The loop will terminate before processing all of the callbacks if a
516        C{Deferred} without a result is encountered.
517
518        If a C{Deferred} I{with} a result is encountered, that result is taken
519        and the loop proceeds.
520
521        @note: The implementation is complicated slightly by the fact that
522            chaining (associating two Deferreds with each other such that one
523            will wait for the result of the other, as happens when a Deferred is
524            returned from a callback on another Deferred) is supported
525            iteratively rather than recursively, to avoid running out of stack
526            frames when processing long chains.
527        """
528        if self._runningCallbacks:
529            # Don't recursively run callbacks
530            return
531
532        # Keep track of all the Deferreds encountered while propagating results
533        # up a chain.  The way a Deferred gets onto this stack is by having
534        # added its _continuation() to the callbacks list of a second Deferred
535        # and then that second Deferred being fired.  ie, if ever had _chainedTo
536        # set to something other than None, you might end up on this stack.
537        chain = [self]
538
539        while chain:
540            current = chain[-1]
541
542            if current.paused:
543                # This Deferred isn't going to produce a result at all.  All the
544                # Deferreds up the chain waiting on it will just have to...
545                # wait.
546                return
547
548            finished = True
549            current._chainedTo = None
550            while current.callbacks:
551                item = current.callbacks.pop(0)
552                callback, args, kw = item[
553                    isinstance(current.result, failure.Failure)]
554                args = args or ()
555                kw = kw or {}
556
557                # Avoid recursion if we can.
558                if callback is _CONTINUE:
559                    # Give the waiting Deferred our current result and then
560                    # forget about that result ourselves.
561                    chainee = args[0]
562                    chainee.result = current.result
563                    current.result = None
564                    # Making sure to update _debugInfo
565                    if current._debugInfo is not None:
566                        current._debugInfo.failResult = None
567                    chainee.paused -= 1
568                    chain.append(chainee)
569                    # Delay cleaning this Deferred and popping it from the chain
570                    # until after we've dealt with chainee.
571                    finished = False
572                    break
573
574                try:
575                    current._runningCallbacks = True
576                    try:
577                        current.result = callback(current.result, *args, **kw)
578                        if current.result is current:
579                            warnAboutFunction(
580                                callback,
581                                "Callback returned the Deferred "
582                                "it was attached to; this breaks the "
583                                "callback chain and will raise an "
584                                "exception in the future.")
585                    finally:
586                        current._runningCallbacks = False
587                except:
588                    # Including full frame information in the Failure is quite
589                    # expensive, so we avoid it unless self.debug is set.
590                    current.result = failure.Failure(captureVars=self.debug)
591                else:
592                    if isinstance(current.result, Deferred):
593                        # The result is another Deferred.  If it has a result,
594                        # we can take it and keep going.
595                        resultResult = getattr(current.result, 'result', _NO_RESULT)
596                        if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
597                            # Nope, it didn't.  Pause and chain.
598                            current.pause()
599                            current._chainedTo = current.result
600                            # Note: current.result has no result, so it's not
601                            # running its callbacks right now.  Therefore we can
602                            # append to the callbacks list directly instead of
603                            # using addCallbacks.
604                            current.result.callbacks.append(current._continuation())
605                            break
606                        else:
607                            # Yep, it did.  Steal it.
608                            current.result.result = None
609                            # Make sure _debugInfo's failure state is updated.
610                            if current.result._debugInfo is not None:
611                                current.result._debugInfo.failResult = None
612                            current.result = resultResult
613
614            if finished:
615                # As much of the callback chain - perhaps all of it - as can be
616                # processed right now has been.  The current Deferred is waiting on
617                # another Deferred or for more callbacks.  Before finishing with it,
618                # make sure its _debugInfo is in the proper state.
619                if isinstance(current.result, failure.Failure):
620                    # Stash the Failure in the _debugInfo for unhandled error
621                    # reporting.
622                    current.result.cleanFailure()
623                    if current._debugInfo is None:
624                        current._debugInfo = DebugInfo()
625                    current._debugInfo.failResult = current.result
626                else:
627                    # Clear out any Failure in the _debugInfo, since the result
628                    # is no longer a Failure.
629                    if current._debugInfo is not None:
630                        current._debugInfo.failResult = None
631
632                # This Deferred is done, pop it from the chain and move back up
633                # to the Deferred which supplied us with our result.
634                chain.pop()
635
636
637    def __str__(self):
638        """
639        Return a string representation of this C{Deferred}.
640        """
641        cname = self.__class__.__name__
642        result = getattr(self, 'result', _NO_RESULT)
643        myID = id(self)
644        if self._chainedTo is not None:
645            result = ' waiting on Deferred at 0x%x' % (id(self._chainedTo),)
646        elif result is _NO_RESULT:
647            result = ''
648        else:
649            result = ' current result: %r' % (result,)
650        return "<%s at 0x%x%s>" % (cname, myID, result)
651    __repr__ = __str__
652
653
654
655class DebugInfo:
656    """
657    Deferred debug helper.
658    """
659
660    failResult = None
661
662    def _getDebugTracebacks(self):
663        info = ''
664        if hasattr(self, "creator"):
665            info += " C: Deferred was created:\n C:"
666            info += "".join(self.creator).rstrip().replace("\n","\n C:")
667            info += "\n"
668        if hasattr(self, "invoker"):
669            info += " I: First Invoker was:\n I:"
670            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
671            info += "\n"
672        return info
673
674
675    def __del__(self):
676        """
677        Print tracebacks and die.
678
679        If the *last* (and I do mean *last*) callback leaves me in an error
680        state, print a traceback (if said errback is a L{Failure}).
681        """
682        if self.failResult is not None:
683            log.msg("Unhandled error in Deferred:", isError=True)
684            debugInfo = self._getDebugTracebacks()
685            if debugInfo != '':
686                log.msg("(debug: " + debugInfo + ")", isError=True)
687            log.err(self.failResult)
688
689
690
691@comparable
692class FirstError(Exception):
693    """
694    First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
695
696    @ivar subFailure: The L{Failure} that occurred.
697    @type subFailure: L{Failure}
698
699    @ivar index: The index of the L{Deferred} in the L{DeferredList} where
700        it happened.
701    @type index: C{int}
702    """
703    def __init__(self, failure, index):
704        Exception.__init__(self, failure, index)
705        self.subFailure = failure
706        self.index = index
707
708
709    def __repr__(self):
710        """
711        The I{repr} of L{FirstError} instances includes the repr of the
712        wrapped failure's exception and the index of the L{FirstError}.
713        """
714        return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
715
716
717    def __str__(self):
718        """
719        The I{str} of L{FirstError} instances includes the I{str} of the
720        entire wrapped failure (including its traceback and exception) and
721        the index of the L{FirstError}.
722        """
723        return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
724
725
726    def __cmp__(self, other):
727        """
728        Comparison between L{FirstError} and other L{FirstError} instances
729        is defined as the comparison of the index and sub-failure of each
730        instance.  L{FirstError} instances don't compare equal to anything
731        that isn't a L{FirstError} instance.
732
733        @since: 8.2
734        """
735        if isinstance(other, FirstError):
736            return cmp(
737                (self.index, self.subFailure),
738                (other.index, other.subFailure))
739        return -1
740
741
742
743class DeferredList(Deferred):
744    """
745    L{DeferredList} is a tool for collecting the results of several Deferreds.
746
747    This tracks a list of L{Deferred}s for their results, and makes a single
748    callback when they have all completed.  By default, the ultimate result is a
749    list of (success, result) tuples, 'success' being a boolean.
750    L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and
751    errbacks can be added to it in the same way.
752
753    L{DeferredList} is implemented by adding callbacks and errbacks to each
754    L{Deferred} in the list passed to it.  This means callbacks and errbacks
755    added to the Deferreds before they are passed to L{DeferredList} will change
756    the result that L{DeferredList} sees (i.e., L{DeferredList} is not special).
757    Callbacks and errbacks can also be added to the Deferreds after they are
758    passed to L{DeferredList} and L{DeferredList} may change the result that
759    they see.
760
761    See the documentation for the C{__init__} arguments for more information.
762
763    @ivar _deferredList: The C{list} of L{Deferred}s to track.
764    """
765
766    fireOnOneCallback = False
767    fireOnOneErrback = False
768
769    def __init__(self, deferredList, fireOnOneCallback=False,
770                 fireOnOneErrback=False, consumeErrors=False):
771        """
772        Initialize a DeferredList.
773
774        @param deferredList: The list of deferreds to track.
775        @type deferredList:  C{list} of L{Deferred}s
776
777        @param fireOnOneCallback: (keyword param) a flag indicating that this
778            L{DeferredList} will fire when the first L{Deferred} in
779            C{deferredList} fires with a non-failure result without waiting for
780            any of the other Deferreds.  When this flag is set, the DeferredList
781            will fire with a two-tuple: the first element is the result of the
782            Deferred which fired; the second element is the index in
783            C{deferredList} of that Deferred.
784        @type fireOnOneCallback: C{bool}
785
786        @param fireOnOneErrback: (keyword param) a flag indicating that this
787            L{DeferredList} will fire when the first L{Deferred} in
788            C{deferredList} fires with a failure result without waiting for any
789            of the other Deferreds.  When this flag is set, if a Deferred in the
790            list errbacks, the DeferredList will errback with a L{FirstError}
791            failure wrapping the failure of that Deferred.
792        @type fireOnOneErrback: C{bool}
793
794        @param consumeErrors: (keyword param) a flag indicating that failures in
795            any of the included L{Deferreds} should not be propagated to
796            errbacks added to the individual L{Deferreds} after this
797            L{DeferredList} is constructed.  After constructing the
798            L{DeferredList}, any errors in the individual L{Deferred}s will be
799            converted to a callback result of C{None}.  This is useful to
800            prevent spurious 'Unhandled error in Deferred' messages from being
801            logged.  This does not prevent C{fireOnOneErrback} from working.
802        @type consumeErrors: C{bool}
803        """
804        self._deferredList = list(deferredList)
805        self.resultList = [None] * len(self._deferredList)
806        Deferred.__init__(self)
807        if len(self._deferredList) == 0 and not fireOnOneCallback:
808            self.callback(self.resultList)
809
810        # These flags need to be set *before* attaching callbacks to the
811        # deferreds, because the callbacks use these flags, and will run
812        # synchronously if any of the deferreds are already fired.
813        self.fireOnOneCallback = fireOnOneCallback
814        self.fireOnOneErrback = fireOnOneErrback
815        self.consumeErrors = consumeErrors
816        self.finishedCount = 0
817
818        index = 0
819        for deferred in self._deferredList:
820            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
821                                  callbackArgs=(index,SUCCESS),
822                                  errbackArgs=(index,FAILURE))
823            index = index + 1
824
825
826    def _cbDeferred(self, result, index, succeeded):
827        """
828        (internal) Callback for when one of my deferreds fires.
829        """
830        self.resultList[index] = (succeeded, result)
831
832        self.finishedCount += 1
833        if not self.called:
834            if succeeded == SUCCESS and self.fireOnOneCallback:
835                self.callback((result, index))
836            elif succeeded == FAILURE and self.fireOnOneErrback:
837                self.errback(failure.Failure(FirstError(result, index)))
838            elif self.finishedCount == len(self.resultList):
839                self.callback(self.resultList)
840
841        if succeeded == FAILURE and self.consumeErrors:
842            result = None
843
844        return result
845
846
847    def cancel(self):
848        """
849        Cancel this L{DeferredList}.
850
851        If the L{DeferredList} hasn't fired yet, cancel every L{Deferred} in
852        the list.
853
854        If the L{DeferredList} has fired, including the case where the
855        C{fireOnOneCallback}/C{fireOnOneErrback} flag is set and the
856        L{DeferredList} fires because one L{Deferred} in the list fires with a
857        non-failure/failure result, do nothing in the C{cancel} method.
858        """
859        if not self.called:
860            for deferred in self._deferredList:
861                try:
862                    deferred.cancel()
863                except:
864                    log.err(
865                        _why="Exception raised from user supplied canceller")
866
867
868def _parseDListResult(l, fireOnOneErrback=False):
869    if __debug__:
870        for success, value in l:
871            assert success
872    return [x[1] for x in l]
873
874
875
876def gatherResults(deferredList, consumeErrors=False):
877    """
878    Returns, via a L{Deferred}, a list with the results of the given
879    L{Deferred}s - in effect, a "join" of multiple deferred operations.
880
881    The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s
882    have fired, or when any one of them has failed.
883
884    This method can be cancelled by calling the C{cancel} method of the
885    L{Deferred}, all the L{Deferred}s in the list will be cancelled.
886
887    This differs from L{DeferredList} in that you don't need to parse
888    the result for success/failure.
889
890    @type deferredList:  C{list} of L{Deferred}s
891
892    @param consumeErrors: (keyword param) a flag, defaulting to False,
893        indicating that failures in any of the given L{Deferreds} should not be
894        propagated to errbacks added to the individual L{Deferreds} after this
895        L{gatherResults} invocation.  Any such errors in the individual
896        L{Deferred}s will be converted to a callback result of C{None}.  This
897        is useful to prevent spurious 'Unhandled error in Deferred' messages
898        from being logged.  This parameter is available since 11.1.0.
899    @type consumeErrors: C{bool}
900    """
901    d = DeferredList(deferredList, fireOnOneErrback=True,
902                                   consumeErrors=consumeErrors)
903    d.addCallback(_parseDListResult)
904    return d
905
906
907
908# Constants for use with DeferredList
909
910SUCCESS = True
911FAILURE = False
912
913
914
915## deferredGenerator
916
917class waitForDeferred:
918    """
919    See L{deferredGenerator}.
920    """
921
922    def __init__(self, d):
923        if not isinstance(d, Deferred):
924            raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
925        self.d = d
926
927
928    def getResult(self):
929        if isinstance(self.result, failure.Failure):
930            self.result.raiseException()
931        return self.result
932
933
934
935def _deferGenerator(g, deferred):
936    """
937    See L{deferredGenerator}.
938    """
939    result = None
940
941    # This function is complicated by the need to prevent unbounded recursion
942    # arising from repeatedly yielding immediately ready deferreds.  This while
943    # loop and the waiting variable solve that by manually unfolding the
944    # recursion.
945
946    waiting = [True, # defgen is waiting for result?
947               None] # result
948
949    while 1:
950        try:
951            result = next(g)
952        except StopIteration:
953            deferred.callback(result)
954            return deferred
955        except:
956            deferred.errback()
957            return deferred
958
959        # Deferred.callback(Deferred) raises an error; we catch this case
960        # early here and give a nicer error message to the user in case
961        # they yield a Deferred.
962        if isinstance(result, Deferred):
963            return fail(TypeError("Yield waitForDeferred(d), not d!"))
964
965        if isinstance(result, waitForDeferred):
966            # a waitForDeferred was yielded, get the result.
967            # Pass result in so it don't get changed going around the loop
968            # This isn't a problem for waiting, as it's only reused if
969            # gotResult has already been executed.
970            def gotResult(r, result=result):
971                result.result = r
972                if waiting[0]:
973                    waiting[0] = False
974                    waiting[1] = r
975                else:
976                    _deferGenerator(g, deferred)
977            result.d.addBoth(gotResult)
978            if waiting[0]:
979                # Haven't called back yet, set flag so that we get reinvoked
980                # and return from the loop
981                waiting[0] = False
982                return deferred
983            # Reset waiting to initial values for next loop
984            waiting[0] = True
985            waiting[1] = None
986
987            result = None
988
989
990
991def deferredGenerator(f):
992    """
993    L{deferredGenerator} and L{waitForDeferred} help you write
994    L{Deferred}-using code that looks like a regular sequential function.
995    Consider the use of L{inlineCallbacks} instead, which can accomplish
996    the same thing in a more concise manner.
997
998    There are two important functions involved: L{waitForDeferred}, and
999    L{deferredGenerator}.  They are used together, like this::
1000
1001        @deferredGenerator
1002        def thingummy():
1003            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
1004            yield thing
1005            thing = thing.getResult()
1006            print thing #the result! hoorj!
1007
1008    L{waitForDeferred} returns something that you should immediately yield; when
1009    your generator is resumed, calling C{thing.getResult()} will either give you
1010    the result of the L{Deferred} if it was a success, or raise an exception if it
1011    was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
1012    not call it, I{your program will not work}.
1013
1014    L{deferredGenerator} takes one of these waitForDeferred-using generator
1015    functions and converts it into a function that returns a L{Deferred}. The
1016    result of the L{Deferred} will be the last value that your generator yielded
1017    unless the last value is a L{waitForDeferred} instance, in which case the
1018    result will be C{None}.  If the function raises an unhandled exception, the
1019    L{Deferred} will errback instead.  Remember that C{return result} won't work;
1020    use C{yield result; return} in place of that.
1021
1022    Note that not yielding anything from your generator will make the L{Deferred}
1023    result in C{None}. Yielding a L{Deferred} from your generator is also an error
1024    condition; always yield C{waitForDeferred(d)} instead.
1025
1026    The L{Deferred} returned from your deferred generator may also errback if your
1027    generator raised an exception.  For example::
1028
1029        @deferredGenerator
1030        def thingummy():
1031            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
1032            yield thing
1033            thing = thing.getResult()
1034            if thing == 'I love Twisted':
1035                # will become the result of the Deferred
1036                yield 'TWISTED IS GREAT!'
1037                return
1038            else:
1039                # will trigger an errback
1040                raise Exception('DESTROY ALL LIFE')
1041
1042    Put succinctly, these functions connect deferred-using code with this 'fake
1043    blocking' style in both directions: L{waitForDeferred} converts from a
1044    L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
1045    'blocking' style to a L{Deferred}.
1046    """
1047    @wraps(f)
1048    def unwindGenerator(*args, **kwargs):
1049        return _deferGenerator(f(*args, **kwargs), Deferred())
1050    return unwindGenerator
1051
1052
1053## inlineCallbacks
1054
1055
1056
1057class _DefGen_Return(BaseException):
1058    def __init__(self, value):
1059        self.value = value
1060
1061
1062
1063def returnValue(val):
1064    """
1065    Return val from a L{inlineCallbacks} generator.
1066
1067    Note: this is currently implemented by raising an exception
1068    derived from L{BaseException}.  You might want to change any
1069    'except:' clauses to an 'except Exception:' clause so as not to
1070    catch this exception.
1071
1072    Also: while this function currently will work when called from
1073    within arbitrary functions called from within the generator, do
1074    not rely upon this behavior.
1075    """
1076    raise _DefGen_Return(val)
1077
1078
1079
1080def _inlineCallbacks(result, g, deferred):
1081    """
1082    See L{inlineCallbacks}.
1083    """
1084    # This function is complicated by the need to prevent unbounded recursion
1085    # arising from repeatedly yielding immediately ready deferreds.  This while
1086    # loop and the waiting variable solve that by manually unfolding the
1087    # recursion.
1088
1089    waiting = [True, # waiting for result?
1090               None] # result
1091
1092    while 1:
1093        try:
1094            # Send the last result back as the result of the yield expression.
1095            isFailure = isinstance(result, failure.Failure)
1096            if isFailure:
1097                result = result.throwExceptionIntoGenerator(g)
1098            else:
1099                result = g.send(result)
1100        except StopIteration:
1101            # fell off the end, or "return" statement
1102            deferred.callback(None)
1103            return deferred
1104        except _DefGen_Return as e:
1105            # returnValue() was called; time to give a result to the original
1106            # Deferred.  First though, let's try to identify the potentially
1107            # confusing situation which results when returnValue() is
1108            # accidentally invoked from a different function, one that wasn't
1109            # decorated with @inlineCallbacks.
1110
1111            # The traceback starts in this frame (the one for
1112            # _inlineCallbacks); the next one down should be the application
1113            # code.
1114            appCodeTrace = exc_info()[2].tb_next
1115            if isFailure:
1116                # If we invoked this generator frame by throwing an exception
1117                # into it, then throwExceptionIntoGenerator will consume an
1118                # additional stack frame itself, so we need to skip that too.
1119                appCodeTrace = appCodeTrace.tb_next
1120            # Now that we've identified the frame being exited by the
1121            # exception, let's figure out if returnValue was called from it
1122            # directly.  returnValue itself consumes a stack frame, so the
1123            # application code will have a tb_next, but it will *not* have a
1124            # second tb_next.
1125            if appCodeTrace.tb_next.tb_next:
1126                # If returnValue was invoked non-local to the frame which it is
1127                # exiting, identify the frame that ultimately invoked
1128                # returnValue so that we can warn the user, as this behavior is
1129                # confusing.
1130                ultimateTrace = appCodeTrace
1131                while ultimateTrace.tb_next.tb_next:
1132                    ultimateTrace = ultimateTrace.tb_next
1133                filename = ultimateTrace.tb_frame.f_code.co_filename
1134                lineno = ultimateTrace.tb_lineno
1135                warnings.warn_explicit(
1136                    "returnValue() in %r causing %r to exit: "
1137                    "returnValue should only be invoked by functions decorated "
1138                    "with inlineCallbacks" % (
1139                        ultimateTrace.tb_frame.f_code.co_name,
1140                        appCodeTrace.tb_frame.f_code.co_name),
1141                    DeprecationWarning, filename, lineno)
1142            deferred.callback(e.value)
1143            return deferred
1144        except:
1145            deferred.errback()
1146            return deferred
1147
1148        if isinstance(result, Deferred):
1149            # a deferred was yielded, get the result.
1150            def gotResult(r):
1151                if waiting[0]:
1152                    waiting[0] = False
1153                    waiting[1] = r
1154                else:
1155                    _inlineCallbacks(r, g, deferred)
1156
1157            result.addBoth(gotResult)
1158            if waiting[0]:
1159                # Haven't called back yet, set flag so that we get reinvoked
1160                # and return from the loop
1161                waiting[0] = False
1162                return deferred
1163
1164            result = waiting[1]
1165            # Reset waiting to initial values for next loop.  gotResult uses
1166            # waiting, but this isn't a problem because gotResult is only
1167            # executed once, and if it hasn't been executed yet, the return
1168            # branch above would have been taken.
1169
1170
1171            waiting[0] = True
1172            waiting[1] = None
1173
1174
1175    return deferred
1176
1177
1178
1179def inlineCallbacks(f):
1180    """
1181    inlineCallbacks helps you write L{Deferred}-using code that looks like a
1182    regular sequential function. For example::
1183
1184        @inlineCallBacks
1185        def thingummy():
1186            thing = yield makeSomeRequestResultingInDeferred()
1187            print(thing)  # the result! hoorj!
1188
1189    When you call anything that results in a L{Deferred}, you can simply yield it;
1190    your generator will automatically be resumed when the Deferred's result is
1191    available. The generator will be sent the result of the L{Deferred} with the
1192    'send' method on generators, or if the result was a failure, 'throw'.
1193
1194    Things that are not L{Deferred}s may also be yielded, and your generator
1195    will be resumed with the same object sent back. This means C{yield}
1196    performs an operation roughly equivalent to L{maybeDeferred}.
1197
1198    Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
1199    will result in the return value of the generator (or will fail with a
1200    failure object if your generator raises an unhandled exception). Note that
1201    you can't use C{return result} to return a value; use C{returnValue(result)}
1202    instead. Falling off the end of the generator, or simply using C{return}
1203    will cause the L{Deferred} to have a result of C{None}.
1204
1205    Be aware that L{returnValue} will not accept a L{Deferred} as a parameter.
1206    If you believe the thing you'd like to return could be a L{Deferred}, do
1207    this::
1208
1209        result = yield result
1210        returnValue(result)
1211
1212    The L{Deferred} returned from your deferred generator may errback if your
1213    generator raised an exception::
1214
1215        @inlineCallbacks
1216        def thingummy():
1217            thing = yield makeSomeRequestResultingInDeferred()
1218            if thing == 'I love Twisted':
1219                # will become the result of the Deferred
1220                returnValue('TWISTED IS GREAT!')
1221            else:
1222                # will trigger an errback
1223                raise Exception('DESTROY ALL LIFE')
1224    """
1225    @wraps(f)
1226    def unwindGenerator(*args, **kwargs):
1227        try:
1228            gen = f(*args, **kwargs)
1229        except _DefGen_Return:
1230            raise TypeError(
1231                "inlineCallbacks requires %r to produce a generator; instead"
1232                "caught returnValue being used in a non-generator" % (f,))
1233        if not isinstance(gen, types.GeneratorType):
1234            raise TypeError(
1235                "inlineCallbacks requires %r to produce a generator; "
1236                "instead got %r" % (f, gen))
1237        return _inlineCallbacks(None, gen, Deferred())
1238    return unwindGenerator
1239
1240
1241## DeferredLock/DeferredQueue
1242
1243class _ConcurrencyPrimitive(object):
1244    def __init__(self):
1245        self.waiting = []
1246
1247
1248    def _releaseAndReturn(self, r):
1249        self.release()
1250        return r
1251
1252
1253    def run(*args, **kwargs):
1254        """
1255        Acquire, run, release.
1256
1257        This function takes a callable as its first argument and any
1258        number of other positional and keyword arguments.  When the
1259        lock or semaphore is acquired, the callable will be invoked
1260        with those arguments.
1261
1262        The callable may return a L{Deferred}; if it does, the lock or
1263        semaphore won't be released until that L{Deferred} fires.
1264
1265        @return: L{Deferred} of function result.
1266        """
1267        if len(args) < 2:
1268            if not args:
1269                raise TypeError("run() takes at least 2 arguments, none given.")
1270            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
1271                args[0].__class__.__name__,))
1272        self, f = args[:2]
1273        args = args[2:]
1274
1275        def execute(ignoredResult):
1276            d = maybeDeferred(f, *args, **kwargs)
1277            d.addBoth(self._releaseAndReturn)
1278            return d
1279
1280        d = self.acquire()
1281        d.addCallback(execute)
1282        return d
1283
1284
1285
1286class DeferredLock(_ConcurrencyPrimitive):
1287    """
1288    A lock for event driven systems.
1289
1290    @ivar locked: C{True} when this Lock has been acquired, false at all other
1291        times.  Do not change this value, but it is useful to examine for the
1292        equivalent of a "non-blocking" acquisition.
1293    """
1294
1295    locked = False
1296
1297
1298    def _cancelAcquire(self, d):
1299        """
1300        Remove a deferred d from our waiting list, as the deferred has been
1301        canceled.
1302
1303        Note: We do not need to wrap this in a try/except to catch d not
1304        being in self.waiting because this canceller will not be called if
1305        d has fired. release() pops a deferred out of self.waiting and
1306        calls it, so the canceller will no longer be called.
1307
1308        @param d: The deferred that has been canceled.
1309        """
1310        self.waiting.remove(d)
1311
1312
1313    def acquire(self):
1314        """
1315        Attempt to acquire the lock.  Returns a L{Deferred} that fires on
1316        lock acquisition with the L{DeferredLock} as the value.  If the lock
1317        is locked, then the Deferred is placed at the end of a waiting list.
1318
1319        @return: a L{Deferred} which fires on lock acquisition.
1320        @rtype: a L{Deferred}
1321        """
1322        d = Deferred(canceller=self._cancelAcquire)
1323        if self.locked:
1324            self.waiting.append(d)
1325        else:
1326            self.locked = True
1327            d.callback(self)
1328        return d
1329
1330
1331    def release(self):
1332        """
1333        Release the lock.  If there is a waiting list, then the first
1334        L{Deferred} in that waiting list will be called back.
1335
1336        Should be called by whomever did the L{acquire}() when the shared
1337        resource is free.
1338        """
1339        assert self.locked, "Tried to release an unlocked lock"
1340        self.locked = False
1341        if self.waiting:
1342            # someone is waiting to acquire lock
1343            self.locked = True
1344            d = self.waiting.pop(0)
1345            d.callback(self)
1346
1347
1348
1349class DeferredSemaphore(_ConcurrencyPrimitive):
1350    """
1351    A semaphore for event driven systems.
1352
1353    If you are looking into this as a means of limiting parallelism, you might
1354    find L{twisted.internet.task.Cooperator} more useful.
1355
1356    @ivar tokens: At most this many users may acquire this semaphore at
1357        once.
1358    @type tokens: C{int}
1359
1360    @ivar limit: The difference between C{tokens} and the number of users
1361        which have currently acquired this semaphore.
1362    @type limit: C{int}
1363    """
1364
1365    def __init__(self, tokens):
1366        _ConcurrencyPrimitive.__init__(self)
1367        if tokens < 1:
1368            raise ValueError("DeferredSemaphore requires tokens >= 1")
1369        self.tokens = tokens
1370        self.limit = tokens
1371
1372
1373    def _cancelAcquire(self, d):
1374        """
1375        Remove a deferred d from our waiting list, as the deferred has been
1376        canceled.
1377
1378        Note: We do not need to wrap this in a try/except to catch d not
1379        being in self.waiting because this canceller will not be called if
1380        d has fired. release() pops a deferred out of self.waiting and
1381        calls it, so the canceller will no longer be called.
1382
1383        @param d: The deferred that has been canceled.
1384        """
1385        self.waiting.remove(d)
1386
1387
1388    def acquire(self):
1389        """
1390        Attempt to acquire the token.
1391
1392        @return: a L{Deferred} which fires on token acquisition.
1393        """
1394        assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
1395        d = Deferred(canceller=self._cancelAcquire)
1396        if not self.tokens:
1397            self.waiting.append(d)
1398        else:
1399            self.tokens = self.tokens - 1
1400            d.callback(self)
1401        return d
1402
1403
1404    def release(self):
1405        """
1406        Release the token.
1407
1408        Should be called by whoever did the L{acquire}() when the shared
1409        resource is free.
1410        """
1411        assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
1412        self.tokens = self.tokens + 1
1413        if self.waiting:
1414            # someone is waiting to acquire token
1415            self.tokens = self.tokens - 1
1416            d = self.waiting.pop(0)
1417            d.callback(self)
1418
1419
1420
1421class QueueOverflow(Exception):
1422    pass
1423
1424
1425
1426class QueueUnderflow(Exception):
1427    pass
1428
1429
1430
1431class DeferredQueue(object):
1432    """
1433    An event driven queue.
1434
1435    Objects may be added as usual to this queue.  When an attempt is
1436    made to retrieve an object when the queue is empty, a L{Deferred} is
1437    returned which will fire when an object becomes available.
1438
1439    @ivar size: The maximum number of objects to allow into the queue
1440    at a time.  When an attempt to add a new object would exceed this
1441    limit, L{QueueOverflow} is raised synchronously.  C{None} for no limit.
1442
1443    @ivar backlog: The maximum number of L{Deferred} gets to allow at
1444    one time.  When an attempt is made to get an object which would
1445    exceed this limit, L{QueueUnderflow} is raised synchronously.  C{None}
1446    for no limit.
1447    """
1448
1449    def __init__(self, size=None, backlog=None):
1450        self.waiting = []
1451        self.pending = []
1452        self.size = size
1453        self.backlog = backlog
1454
1455
1456    def _cancelGet(self, d):
1457        """
1458        Remove a deferred d from our waiting list, as the deferred has been
1459        canceled.
1460
1461        Note: We do not need to wrap this in a try/except to catch d not
1462        being in self.waiting because this canceller will not be called if
1463        d has fired. put() pops a deferred out of self.waiting and calls
1464        it, so the canceller will no longer be called.
1465
1466        @param d: The deferred that has been canceled.
1467        """
1468        self.waiting.remove(d)
1469
1470
1471    def put(self, obj):
1472        """
1473        Add an object to this queue.
1474
1475        @raise QueueOverflow: Too many objects are in this queue.
1476        """
1477        if self.waiting:
1478            self.waiting.pop(0).callback(obj)
1479        elif self.size is None or len(self.pending) < self.size:
1480            self.pending.append(obj)
1481        else:
1482            raise QueueOverflow()
1483
1484
1485    def get(self):
1486        """
1487        Attempt to retrieve and remove an object from the queue.
1488
1489        @return: a L{Deferred} which fires with the next object available in
1490        the queue.
1491
1492        @raise QueueUnderflow: Too many (more than C{backlog})
1493        L{Deferred}s are already waiting for an object from this queue.
1494        """
1495        if self.pending:
1496            return succeed(self.pending.pop(0))
1497        elif self.backlog is None or len(self.waiting) < self.backlog:
1498            d = Deferred(canceller=self._cancelGet)
1499            self.waiting.append(d)
1500            return d
1501        else:
1502            raise QueueUnderflow()
1503
1504
1505
1506class AlreadyTryingToLockError(Exception):
1507    """
1508    Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
1509    single L{DeferredFilesystemLock}.
1510    """
1511
1512
1513
1514class DeferredFilesystemLock(lockfile.FilesystemLock):
1515    """
1516    A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
1517    acquired.
1518
1519    @ivar _scheduler: The object in charge of scheduling retries. In this
1520        implementation this is parameterized for testing.
1521
1522    @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1523
1524    @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
1525        the next retry for aquiring the lock.
1526
1527    @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
1528        argument.  This is in charge of timing out our attempt to acquire the
1529        lock.
1530    """
1531    _interval = 1
1532    _tryLockCall = None
1533    _timeoutCall = None
1534
1535
1536    def __init__(self, name, scheduler=None):
1537        """
1538        @param name: The name of the lock to acquire
1539        @param scheduler: An object which provides L{IReactorTime}
1540        """
1541        lockfile.FilesystemLock.__init__(self, name)
1542
1543        if scheduler is None:
1544            from twisted.internet import reactor
1545            scheduler = reactor
1546
1547        self._scheduler = scheduler
1548
1549
1550    def deferUntilLocked(self, timeout=None):
1551        """
1552        Wait until we acquire this lock.  This method is not safe for
1553        concurrent use.
1554
1555        @type timeout: C{float} or C{int}
1556        @param timeout: the number of seconds after which to time out if the
1557            lock has not been acquired.
1558
1559        @return: a L{Deferred} which will callback when the lock is acquired, or
1560            errback with a L{TimeoutError} after timing out or an
1561            L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1562            been called and not successfully locked the file.
1563        """
1564        if self._tryLockCall is not None:
1565            return fail(
1566                AlreadyTryingToLockError(
1567                    "deferUntilLocked isn't safe for concurrent use."))
1568
1569        def _cancelLock(reason):
1570            """
1571            Cancel a L{DeferredFilesystemLock.deferUntilLocked} call.
1572
1573            @type reason: L{failure.Failure}
1574            @param reason: The reason why the call is cancelled.
1575            """
1576            self._tryLockCall.cancel()
1577            self._tryLockCall = None
1578            if self._timeoutCall is not None and self._timeoutCall.active():
1579                self._timeoutCall.cancel()
1580                self._timeoutCall = None
1581
1582            if self.lock():
1583                d.callback(None)
1584            else:
1585                d.errback(reason)
1586
1587        d = Deferred(lambda deferred: _cancelLock(CancelledError()))
1588
1589        def _tryLock():
1590            if self.lock():
1591                if self._timeoutCall is not None:
1592                    self._timeoutCall.cancel()
1593                    self._timeoutCall = None
1594
1595                self._tryLockCall = None
1596
1597                d.callback(None)
1598            else:
1599                if timeout is not None and self._timeoutCall is None:
1600                    reason = failure.Failure(TimeoutError(
1601                        "Timed out aquiring lock: %s after %fs" % (
1602                            self.name,
1603                            timeout)))
1604                    self._timeoutCall = self._scheduler.callLater(
1605                        timeout, _cancelLock, reason)
1606
1607                self._tryLockCall = self._scheduler.callLater(
1608                    self._interval, _tryLock)
1609
1610        _tryLock()
1611
1612        return d
1613
1614
1615
1616__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1617           "AlreadyCalledError", "TimeoutError", "gatherResults",
1618           "maybeDeferred",
1619           "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1620           "returnValue",
1621           "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1622           "DeferredFilesystemLock", "AlreadyTryingToLockError",
1623          ]
1624