1# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Support for results that aren't immediately available. 7 8Maintainer: Glyph Lefkowitz 9 10@var _NO_RESULT: The result used to represent the fact that there is no 11 result. B{Never ever ever use this as an actual result for a Deferred}. You 12 have been warned. 13 14@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred 15 chain. Always accompanied by a Deferred instance in the args tuple pointing 16 at the Deferred which is chained to the Deferred which has this marker. 17""" 18 19from __future__ import division, absolute_import 20 21import traceback 22import types 23import warnings 24from sys import exc_info 25from functools import wraps 26 27# Twisted imports 28from twisted.python.compat import cmp, comparable 29from twisted.python import lockfile, log, failure 30from twisted.python.deprecate import warnAboutFunction 31 32 33 34class AlreadyCalledError(Exception): 35 pass 36 37 38 39class CancelledError(Exception): 40 """ 41 This error is raised by default when a L{Deferred} is cancelled. 42 """ 43 44 45class TimeoutError(Exception): 46 """ 47 This exception is deprecated. It is used only by the deprecated 48 L{Deferred.setTimeout} method. 49 """ 50 51 52 53def logError(err): 54 log.err(err) 55 return err 56 57 58 59def succeed(result): 60 """ 61 Return a L{Deferred} that has already had C{.callback(result)} called. 62 63 This is useful when you're writing synchronous code to an 64 asynchronous interface: i.e., some code is calling you expecting a 65 L{Deferred} result, but you don't actually need to do anything 66 asynchronous. Just return C{defer.succeed(theResult)}. 67 68 See L{fail} for a version of this function that uses a failing 69 L{Deferred} rather than a successful one. 70 71 @param result: The result to give to the Deferred's 'callback' 72 method. 73 74 @rtype: L{Deferred} 75 """ 76 d = Deferred() 77 d.callback(result) 78 return d 79 80 81 82def fail(result=None): 83 """ 84 Return a L{Deferred} that has already had C{.errback(result)} called. 85 86 See L{succeed}'s docstring for rationale. 87 88 @param result: The same argument that L{Deferred.errback} takes. 89 90 @raise NoCurrentExceptionError: If C{result} is C{None} but there is no 91 current exception state. 92 93 @rtype: L{Deferred} 94 """ 95 d = Deferred() 96 d.errback(result) 97 return d 98 99 100 101def execute(callable, *args, **kw): 102 """ 103 Create a L{Deferred} from a callable and arguments. 104 105 Call the given function with the given arguments. Return a L{Deferred} 106 which has been fired with its callback as the result of that invocation 107 or its C{errback} with a L{Failure} for the exception thrown. 108 """ 109 try: 110 result = callable(*args, **kw) 111 except: 112 return fail() 113 else: 114 return succeed(result) 115 116 117 118def maybeDeferred(f, *args, **kw): 119 """ 120 Invoke a function that may or may not return a L{Deferred}. 121 122 Call the given function with the given arguments. If the returned 123 object is a L{Deferred}, return it. If the returned object is a L{Failure}, 124 wrap it with L{fail} and return it. Otherwise, wrap it in L{succeed} and 125 return it. If an exception is raised, convert it to a L{Failure}, wrap it 126 in L{fail}, and then return it. 127 128 @type f: Any callable 129 @param f: The callable to invoke 130 131 @param args: The arguments to pass to C{f} 132 @param kw: The keyword arguments to pass to C{f} 133 134 @rtype: L{Deferred} 135 @return: The result of the function call, wrapped in a L{Deferred} if 136 necessary. 137 """ 138 try: 139 result = f(*args, **kw) 140 except: 141 return fail(failure.Failure(captureVars=Deferred.debug)) 142 143 if isinstance(result, Deferred): 144 return result 145 elif isinstance(result, failure.Failure): 146 return fail(result) 147 else: 148 return succeed(result) 149 150 151 152def timeout(deferred): 153 deferred.errback(failure.Failure(TimeoutError("Callback timed out"))) 154 155 156 157def passthru(arg): 158 return arg 159 160 161 162def setDebugging(on): 163 """ 164 Enable or disable L{Deferred} debugging. 165 166 When debugging is on, the call stacks from creation and invocation are 167 recorded, and added to any L{AlreadyCalledErrors} we raise. 168 """ 169 Deferred.debug=bool(on) 170 171 172 173def getDebugging(): 174 """ 175 Determine whether L{Deferred} debugging is enabled. 176 """ 177 return Deferred.debug 178 179 180# See module docstring. 181_NO_RESULT = object() 182_CONTINUE = object() 183 184 185 186class Deferred: 187 """ 188 This is a callback which will be put off until later. 189 190 Why do we want this? Well, in cases where a function in a threaded 191 program would block until it gets a result, for Twisted it should 192 not block. Instead, it should return a L{Deferred}. 193 194 This can be implemented for protocols that run over the network by 195 writing an asynchronous protocol for L{twisted.internet}. For methods 196 that come from outside packages that are not under our control, we use 197 threads (see for example L{twisted.enterprise.adbapi}). 198 199 For more information about Deferreds, see doc/core/howto/defer.html or 200 U{http://twistedmatrix.com/documents/current/core/howto/defer.html} 201 202 When creating a Deferred, you may provide a canceller function, which 203 will be called by d.cancel() to let you do any clean-up necessary if the 204 user decides not to wait for the deferred to complete. 205 206 @ivar called: A flag which is C{False} until either C{callback} or 207 C{errback} is called and afterwards always C{True}. 208 @type called: C{bool} 209 210 @ivar paused: A counter of how many unmatched C{pause} calls have been made 211 on this instance. 212 @type paused: C{int} 213 214 @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism 215 which is C{True} if the Deferred has no canceller and has been 216 cancelled, C{False} otherwise. If C{True}, it can be expected that 217 C{callback} or C{errback} will eventually be called and the result 218 should be silently discarded. 219 @type _suppressAlreadyCalled: C{bool} 220 221 @ivar _runningCallbacks: A flag which is C{True} while this instance is 222 executing its callback chain, used to stop recursive execution of 223 L{_runCallbacks} 224 @type _runningCallbacks: C{bool} 225 226 @ivar _chainedTo: If this Deferred is waiting for the result of another 227 Deferred, this is a reference to the other Deferred. Otherwise, C{None}. 228 """ 229 230 called = False 231 paused = 0 232 _debugInfo = None 233 _suppressAlreadyCalled = False 234 235 # Are we currently running a user-installed callback? Meant to prevent 236 # recursive running of callbacks when a reentrant call to add a callback is 237 # used. 238 _runningCallbacks = False 239 240 # Keep this class attribute for now, for compatibility with code that 241 # sets it directly. 242 debug = False 243 244 _chainedTo = None 245 246 def __init__(self, canceller=None): 247 """ 248 Initialize a L{Deferred}. 249 250 @param canceller: a callable used to stop the pending operation 251 scheduled by this L{Deferred} when L{Deferred.cancel} is 252 invoked. The canceller will be passed the deferred whose 253 cancelation is requested (i.e., self). 254 255 If a canceller is not given, or does not invoke its argument's 256 C{callback} or C{errback} method, L{Deferred.cancel} will 257 invoke L{Deferred.errback} with a L{CancelledError}. 258 259 Note that if a canceller is not given, C{callback} or 260 C{errback} may still be invoked exactly once, even though 261 defer.py will have already invoked C{errback}, as described 262 above. This allows clients of code which returns a L{Deferred} 263 to cancel it without requiring the L{Deferred} instantiator to 264 provide any specific implementation support for cancellation. 265 New in 10.1. 266 267 @type canceller: a 1-argument callable which takes a L{Deferred}. The 268 return result is ignored. 269 """ 270 self.callbacks = [] 271 self._canceller = canceller 272 if self.debug: 273 self._debugInfo = DebugInfo() 274 self._debugInfo.creator = traceback.format_stack()[:-1] 275 276 277 def addCallbacks(self, callback, errback=None, 278 callbackArgs=None, callbackKeywords=None, 279 errbackArgs=None, errbackKeywords=None): 280 """ 281 Add a pair of callbacks (success and error) to this L{Deferred}. 282 283 These will be executed when the 'master' callback is run. 284 285 @return: C{self}. 286 @rtype: a L{Deferred} 287 """ 288 assert callable(callback) 289 assert errback == None or callable(errback) 290 cbs = ((callback, callbackArgs, callbackKeywords), 291 (errback or (passthru), errbackArgs, errbackKeywords)) 292 self.callbacks.append(cbs) 293 294 if self.called: 295 self._runCallbacks() 296 return self 297 298 299 def addCallback(self, callback, *args, **kw): 300 """ 301 Convenience method for adding just a callback. 302 303 See L{addCallbacks}. 304 """ 305 return self.addCallbacks(callback, callbackArgs=args, 306 callbackKeywords=kw) 307 308 309 def addErrback(self, errback, *args, **kw): 310 """ 311 Convenience method for adding just an errback. 312 313 See L{addCallbacks}. 314 """ 315 return self.addCallbacks(passthru, errback, 316 errbackArgs=args, 317 errbackKeywords=kw) 318 319 320 def addBoth(self, callback, *args, **kw): 321 """ 322 Convenience method for adding a single callable as both a callback 323 and an errback. 324 325 See L{addCallbacks}. 326 """ 327 return self.addCallbacks(callback, callback, 328 callbackArgs=args, errbackArgs=args, 329 callbackKeywords=kw, errbackKeywords=kw) 330 331 332 def chainDeferred(self, d): 333 """ 334 Chain another L{Deferred} to this L{Deferred}. 335 336 This method adds callbacks to this L{Deferred} to call C{d}'s callback 337 or errback, as appropriate. It is merely a shorthand way of performing 338 the following:: 339 340 self.addCallbacks(d.callback, d.errback) 341 342 When you chain a deferred d2 to another deferred d1 with 343 d1.chainDeferred(d2), you are making d2 participate in the callback 344 chain of d1. Thus any event that fires d1 will also fire d2. 345 However, the converse is B{not} true; if d2 is fired d1 will not be 346 affected. 347 348 Note that unlike the case where chaining is caused by a L{Deferred} 349 being returned from a callback, it is possible to cause the call 350 stack size limit to be exceeded by chaining many L{Deferred}s 351 together with C{chainDeferred}. 352 353 @return: C{self}. 354 @rtype: a L{Deferred} 355 """ 356 d._chainedTo = self 357 return self.addCallbacks(d.callback, d.errback) 358 359 360 def callback(self, result): 361 """ 362 Run all success callbacks that have been added to this L{Deferred}. 363 364 Each callback will have its result passed as the first argument to 365 the next; this way, the callbacks act as a 'processing chain'. If 366 the success-callback returns a L{Failure} or raises an L{Exception}, 367 processing will continue on the *error* callback chain. If a 368 callback (or errback) returns another L{Deferred}, this L{Deferred} 369 will be chained to it (and further callbacks will not run until that 370 L{Deferred} has a result). 371 372 An instance of L{Deferred} may only have either L{callback} or 373 L{errback} called on it, and only once. 374 375 @param result: The object which will be passed to the first callback 376 added to this L{Deferred} (via L{addCallback}). 377 378 @raise AlreadyCalledError: If L{callback} or L{errback} has already been 379 called on this L{Deferred}. 380 """ 381 assert not isinstance(result, Deferred) 382 self._startRunCallbacks(result) 383 384 385 def errback(self, fail=None): 386 """ 387 Run all error callbacks that have been added to this L{Deferred}. 388 389 Each callback will have its result passed as the first 390 argument to the next; this way, the callbacks act as a 391 'processing chain'. Also, if the error-callback returns a non-Failure 392 or doesn't raise an L{Exception}, processing will continue on the 393 *success*-callback chain. 394 395 If the argument that's passed to me is not a L{failure.Failure} instance, 396 it will be embedded in one. If no argument is passed, a 397 L{failure.Failure} instance will be created based on the current 398 traceback stack. 399 400 Passing a string as `fail' is deprecated, and will be punished with 401 a warning message. 402 403 An instance of L{Deferred} may only have either L{callback} or 404 L{errback} called on it, and only once. 405 406 @param fail: The L{Failure} object which will be passed to the first 407 errback added to this L{Deferred} (via L{addErrback}). 408 Alternatively, a L{Exception} instance from which a L{Failure} will 409 be constructed (with no traceback) or C{None} to create a L{Failure} 410 instance from the current exception state (with a traceback). 411 412 @raise AlreadyCalledError: If L{callback} or L{errback} has already been 413 called on this L{Deferred}. 414 415 @raise NoCurrentExceptionError: If C{fail} is C{None} but there is 416 no current exception state. 417 """ 418 if fail is None: 419 fail = failure.Failure(captureVars=self.debug) 420 elif not isinstance(fail, failure.Failure): 421 fail = failure.Failure(fail) 422 423 self._startRunCallbacks(fail) 424 425 426 def pause(self): 427 """ 428 Stop processing on a L{Deferred} until L{unpause}() is called. 429 """ 430 self.paused = self.paused + 1 431 432 433 def unpause(self): 434 """ 435 Process all callbacks made since L{pause}() was called. 436 """ 437 self.paused = self.paused - 1 438 if self.paused: 439 return 440 if self.called: 441 self._runCallbacks() 442 443 444 def cancel(self): 445 """ 446 Cancel this L{Deferred}. 447 448 If the L{Deferred} has not yet had its C{errback} or C{callback} method 449 invoked, call the canceller function provided to the constructor. If 450 that function does not invoke C{callback} or C{errback}, or if no 451 canceller function was provided, errback with L{CancelledError}. 452 453 If this L{Deferred} is waiting on another L{Deferred}, forward the 454 cancellation to the other L{Deferred}. 455 """ 456 if not self.called: 457 canceller = self._canceller 458 if canceller: 459 canceller(self) 460 else: 461 # Arrange to eat the callback that will eventually be fired 462 # since there was no real canceller. 463 self._suppressAlreadyCalled = True 464 if not self.called: 465 # There was no canceller, or the canceller didn't call 466 # callback or errback. 467 self.errback(failure.Failure(CancelledError())) 468 elif isinstance(self.result, Deferred): 469 # Waiting for another deferred -- cancel it instead. 470 self.result.cancel() 471 472 473 def _startRunCallbacks(self, result): 474 if self.called: 475 if self._suppressAlreadyCalled: 476 self._suppressAlreadyCalled = False 477 return 478 if self.debug: 479 if self._debugInfo is None: 480 self._debugInfo = DebugInfo() 481 extra = "\n" + self._debugInfo._getDebugTracebacks() 482 raise AlreadyCalledError(extra) 483 raise AlreadyCalledError 484 if self.debug: 485 if self._debugInfo is None: 486 self._debugInfo = DebugInfo() 487 self._debugInfo.invoker = traceback.format_stack()[:-2] 488 self.called = True 489 self.result = result 490 self._runCallbacks() 491 492 493 def _continuation(self): 494 """ 495 Build a tuple of callback and errback with L{_continue} to be used by 496 L{_addContinue} and L{_removeContinue} on another Deferred. 497 """ 498 return ((_CONTINUE, (self,), None), 499 (_CONTINUE, (self,), None)) 500 501 502 def _runCallbacks(self): 503 """ 504 Run the chain of callbacks once a result is available. 505 506 This consists of a simple loop over all of the callbacks, calling each 507 with the current result and making the current result equal to the 508 return value (or raised exception) of that call. 509 510 If C{self._runningCallbacks} is true, this loop won't run at all, since 511 it is already running above us on the call stack. If C{self.paused} is 512 true, the loop also won't run, because that's what it means to be 513 paused. 514 515 The loop will terminate before processing all of the callbacks if a 516 C{Deferred} without a result is encountered. 517 518 If a C{Deferred} I{with} a result is encountered, that result is taken 519 and the loop proceeds. 520 521 @note: The implementation is complicated slightly by the fact that 522 chaining (associating two Deferreds with each other such that one 523 will wait for the result of the other, as happens when a Deferred is 524 returned from a callback on another Deferred) is supported 525 iteratively rather than recursively, to avoid running out of stack 526 frames when processing long chains. 527 """ 528 if self._runningCallbacks: 529 # Don't recursively run callbacks 530 return 531 532 # Keep track of all the Deferreds encountered while propagating results 533 # up a chain. The way a Deferred gets onto this stack is by having 534 # added its _continuation() to the callbacks list of a second Deferred 535 # and then that second Deferred being fired. ie, if ever had _chainedTo 536 # set to something other than None, you might end up on this stack. 537 chain = [self] 538 539 while chain: 540 current = chain[-1] 541 542 if current.paused: 543 # This Deferred isn't going to produce a result at all. All the 544 # Deferreds up the chain waiting on it will just have to... 545 # wait. 546 return 547 548 finished = True 549 current._chainedTo = None 550 while current.callbacks: 551 item = current.callbacks.pop(0) 552 callback, args, kw = item[ 553 isinstance(current.result, failure.Failure)] 554 args = args or () 555 kw = kw or {} 556 557 # Avoid recursion if we can. 558 if callback is _CONTINUE: 559 # Give the waiting Deferred our current result and then 560 # forget about that result ourselves. 561 chainee = args[0] 562 chainee.result = current.result 563 current.result = None 564 # Making sure to update _debugInfo 565 if current._debugInfo is not None: 566 current._debugInfo.failResult = None 567 chainee.paused -= 1 568 chain.append(chainee) 569 # Delay cleaning this Deferred and popping it from the chain 570 # until after we've dealt with chainee. 571 finished = False 572 break 573 574 try: 575 current._runningCallbacks = True 576 try: 577 current.result = callback(current.result, *args, **kw) 578 if current.result is current: 579 warnAboutFunction( 580 callback, 581 "Callback returned the Deferred " 582 "it was attached to; this breaks the " 583 "callback chain and will raise an " 584 "exception in the future.") 585 finally: 586 current._runningCallbacks = False 587 except: 588 # Including full frame information in the Failure is quite 589 # expensive, so we avoid it unless self.debug is set. 590 current.result = failure.Failure(captureVars=self.debug) 591 else: 592 if isinstance(current.result, Deferred): 593 # The result is another Deferred. If it has a result, 594 # we can take it and keep going. 595 resultResult = getattr(current.result, 'result', _NO_RESULT) 596 if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused: 597 # Nope, it didn't. Pause and chain. 598 current.pause() 599 current._chainedTo = current.result 600 # Note: current.result has no result, so it's not 601 # running its callbacks right now. Therefore we can 602 # append to the callbacks list directly instead of 603 # using addCallbacks. 604 current.result.callbacks.append(current._continuation()) 605 break 606 else: 607 # Yep, it did. Steal it. 608 current.result.result = None 609 # Make sure _debugInfo's failure state is updated. 610 if current.result._debugInfo is not None: 611 current.result._debugInfo.failResult = None 612 current.result = resultResult 613 614 if finished: 615 # As much of the callback chain - perhaps all of it - as can be 616 # processed right now has been. The current Deferred is waiting on 617 # another Deferred or for more callbacks. Before finishing with it, 618 # make sure its _debugInfo is in the proper state. 619 if isinstance(current.result, failure.Failure): 620 # Stash the Failure in the _debugInfo for unhandled error 621 # reporting. 622 current.result.cleanFailure() 623 if current._debugInfo is None: 624 current._debugInfo = DebugInfo() 625 current._debugInfo.failResult = current.result 626 else: 627 # Clear out any Failure in the _debugInfo, since the result 628 # is no longer a Failure. 629 if current._debugInfo is not None: 630 current._debugInfo.failResult = None 631 632 # This Deferred is done, pop it from the chain and move back up 633 # to the Deferred which supplied us with our result. 634 chain.pop() 635 636 637 def __str__(self): 638 """ 639 Return a string representation of this C{Deferred}. 640 """ 641 cname = self.__class__.__name__ 642 result = getattr(self, 'result', _NO_RESULT) 643 myID = id(self) 644 if self._chainedTo is not None: 645 result = ' waiting on Deferred at 0x%x' % (id(self._chainedTo),) 646 elif result is _NO_RESULT: 647 result = '' 648 else: 649 result = ' current result: %r' % (result,) 650 return "<%s at 0x%x%s>" % (cname, myID, result) 651 __repr__ = __str__ 652 653 654 655class DebugInfo: 656 """ 657 Deferred debug helper. 658 """ 659 660 failResult = None 661 662 def _getDebugTracebacks(self): 663 info = '' 664 if hasattr(self, "creator"): 665 info += " C: Deferred was created:\n C:" 666 info += "".join(self.creator).rstrip().replace("\n","\n C:") 667 info += "\n" 668 if hasattr(self, "invoker"): 669 info += " I: First Invoker was:\n I:" 670 info += "".join(self.invoker).rstrip().replace("\n","\n I:") 671 info += "\n" 672 return info 673 674 675 def __del__(self): 676 """ 677 Print tracebacks and die. 678 679 If the *last* (and I do mean *last*) callback leaves me in an error 680 state, print a traceback (if said errback is a L{Failure}). 681 """ 682 if self.failResult is not None: 683 log.msg("Unhandled error in Deferred:", isError=True) 684 debugInfo = self._getDebugTracebacks() 685 if debugInfo != '': 686 log.msg("(debug: " + debugInfo + ")", isError=True) 687 log.err(self.failResult) 688 689 690 691@comparable 692class FirstError(Exception): 693 """ 694 First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set. 695 696 @ivar subFailure: The L{Failure} that occurred. 697 @type subFailure: L{Failure} 698 699 @ivar index: The index of the L{Deferred} in the L{DeferredList} where 700 it happened. 701 @type index: C{int} 702 """ 703 def __init__(self, failure, index): 704 Exception.__init__(self, failure, index) 705 self.subFailure = failure 706 self.index = index 707 708 709 def __repr__(self): 710 """ 711 The I{repr} of L{FirstError} instances includes the repr of the 712 wrapped failure's exception and the index of the L{FirstError}. 713 """ 714 return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value) 715 716 717 def __str__(self): 718 """ 719 The I{str} of L{FirstError} instances includes the I{str} of the 720 entire wrapped failure (including its traceback and exception) and 721 the index of the L{FirstError}. 722 """ 723 return 'FirstError[#%d, %s]' % (self.index, self.subFailure) 724 725 726 def __cmp__(self, other): 727 """ 728 Comparison between L{FirstError} and other L{FirstError} instances 729 is defined as the comparison of the index and sub-failure of each 730 instance. L{FirstError} instances don't compare equal to anything 731 that isn't a L{FirstError} instance. 732 733 @since: 8.2 734 """ 735 if isinstance(other, FirstError): 736 return cmp( 737 (self.index, self.subFailure), 738 (other.index, other.subFailure)) 739 return -1 740 741 742 743class DeferredList(Deferred): 744 """ 745 L{DeferredList} is a tool for collecting the results of several Deferreds. 746 747 This tracks a list of L{Deferred}s for their results, and makes a single 748 callback when they have all completed. By default, the ultimate result is a 749 list of (success, result) tuples, 'success' being a boolean. 750 L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and 751 errbacks can be added to it in the same way. 752 753 L{DeferredList} is implemented by adding callbacks and errbacks to each 754 L{Deferred} in the list passed to it. This means callbacks and errbacks 755 added to the Deferreds before they are passed to L{DeferredList} will change 756 the result that L{DeferredList} sees (i.e., L{DeferredList} is not special). 757 Callbacks and errbacks can also be added to the Deferreds after they are 758 passed to L{DeferredList} and L{DeferredList} may change the result that 759 they see. 760 761 See the documentation for the C{__init__} arguments for more information. 762 763 @ivar _deferredList: The C{list} of L{Deferred}s to track. 764 """ 765 766 fireOnOneCallback = False 767 fireOnOneErrback = False 768 769 def __init__(self, deferredList, fireOnOneCallback=False, 770 fireOnOneErrback=False, consumeErrors=False): 771 """ 772 Initialize a DeferredList. 773 774 @param deferredList: The list of deferreds to track. 775 @type deferredList: C{list} of L{Deferred}s 776 777 @param fireOnOneCallback: (keyword param) a flag indicating that this 778 L{DeferredList} will fire when the first L{Deferred} in 779 C{deferredList} fires with a non-failure result without waiting for 780 any of the other Deferreds. When this flag is set, the DeferredList 781 will fire with a two-tuple: the first element is the result of the 782 Deferred which fired; the second element is the index in 783 C{deferredList} of that Deferred. 784 @type fireOnOneCallback: C{bool} 785 786 @param fireOnOneErrback: (keyword param) a flag indicating that this 787 L{DeferredList} will fire when the first L{Deferred} in 788 C{deferredList} fires with a failure result without waiting for any 789 of the other Deferreds. When this flag is set, if a Deferred in the 790 list errbacks, the DeferredList will errback with a L{FirstError} 791 failure wrapping the failure of that Deferred. 792 @type fireOnOneErrback: C{bool} 793 794 @param consumeErrors: (keyword param) a flag indicating that failures in 795 any of the included L{Deferreds} should not be propagated to 796 errbacks added to the individual L{Deferreds} after this 797 L{DeferredList} is constructed. After constructing the 798 L{DeferredList}, any errors in the individual L{Deferred}s will be 799 converted to a callback result of C{None}. This is useful to 800 prevent spurious 'Unhandled error in Deferred' messages from being 801 logged. This does not prevent C{fireOnOneErrback} from working. 802 @type consumeErrors: C{bool} 803 """ 804 self._deferredList = list(deferredList) 805 self.resultList = [None] * len(self._deferredList) 806 Deferred.__init__(self) 807 if len(self._deferredList) == 0 and not fireOnOneCallback: 808 self.callback(self.resultList) 809 810 # These flags need to be set *before* attaching callbacks to the 811 # deferreds, because the callbacks use these flags, and will run 812 # synchronously if any of the deferreds are already fired. 813 self.fireOnOneCallback = fireOnOneCallback 814 self.fireOnOneErrback = fireOnOneErrback 815 self.consumeErrors = consumeErrors 816 self.finishedCount = 0 817 818 index = 0 819 for deferred in self._deferredList: 820 deferred.addCallbacks(self._cbDeferred, self._cbDeferred, 821 callbackArgs=(index,SUCCESS), 822 errbackArgs=(index,FAILURE)) 823 index = index + 1 824 825 826 def _cbDeferred(self, result, index, succeeded): 827 """ 828 (internal) Callback for when one of my deferreds fires. 829 """ 830 self.resultList[index] = (succeeded, result) 831 832 self.finishedCount += 1 833 if not self.called: 834 if succeeded == SUCCESS and self.fireOnOneCallback: 835 self.callback((result, index)) 836 elif succeeded == FAILURE and self.fireOnOneErrback: 837 self.errback(failure.Failure(FirstError(result, index))) 838 elif self.finishedCount == len(self.resultList): 839 self.callback(self.resultList) 840 841 if succeeded == FAILURE and self.consumeErrors: 842 result = None 843 844 return result 845 846 847 def cancel(self): 848 """ 849 Cancel this L{DeferredList}. 850 851 If the L{DeferredList} hasn't fired yet, cancel every L{Deferred} in 852 the list. 853 854 If the L{DeferredList} has fired, including the case where the 855 C{fireOnOneCallback}/C{fireOnOneErrback} flag is set and the 856 L{DeferredList} fires because one L{Deferred} in the list fires with a 857 non-failure/failure result, do nothing in the C{cancel} method. 858 """ 859 if not self.called: 860 for deferred in self._deferredList: 861 try: 862 deferred.cancel() 863 except: 864 log.err( 865 _why="Exception raised from user supplied canceller") 866 867 868def _parseDListResult(l, fireOnOneErrback=False): 869 if __debug__: 870 for success, value in l: 871 assert success 872 return [x[1] for x in l] 873 874 875 876def gatherResults(deferredList, consumeErrors=False): 877 """ 878 Returns, via a L{Deferred}, a list with the results of the given 879 L{Deferred}s - in effect, a "join" of multiple deferred operations. 880 881 The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s 882 have fired, or when any one of them has failed. 883 884 This method can be cancelled by calling the C{cancel} method of the 885 L{Deferred}, all the L{Deferred}s in the list will be cancelled. 886 887 This differs from L{DeferredList} in that you don't need to parse 888 the result for success/failure. 889 890 @type deferredList: C{list} of L{Deferred}s 891 892 @param consumeErrors: (keyword param) a flag, defaulting to False, 893 indicating that failures in any of the given L{Deferreds} should not be 894 propagated to errbacks added to the individual L{Deferreds} after this 895 L{gatherResults} invocation. Any such errors in the individual 896 L{Deferred}s will be converted to a callback result of C{None}. This 897 is useful to prevent spurious 'Unhandled error in Deferred' messages 898 from being logged. This parameter is available since 11.1.0. 899 @type consumeErrors: C{bool} 900 """ 901 d = DeferredList(deferredList, fireOnOneErrback=True, 902 consumeErrors=consumeErrors) 903 d.addCallback(_parseDListResult) 904 return d 905 906 907 908# Constants for use with DeferredList 909 910SUCCESS = True 911FAILURE = False 912 913 914 915## deferredGenerator 916 917class waitForDeferred: 918 """ 919 See L{deferredGenerator}. 920 """ 921 922 def __init__(self, d): 923 if not isinstance(d, Deferred): 924 raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,)) 925 self.d = d 926 927 928 def getResult(self): 929 if isinstance(self.result, failure.Failure): 930 self.result.raiseException() 931 return self.result 932 933 934 935def _deferGenerator(g, deferred): 936 """ 937 See L{deferredGenerator}. 938 """ 939 result = None 940 941 # This function is complicated by the need to prevent unbounded recursion 942 # arising from repeatedly yielding immediately ready deferreds. This while 943 # loop and the waiting variable solve that by manually unfolding the 944 # recursion. 945 946 waiting = [True, # defgen is waiting for result? 947 None] # result 948 949 while 1: 950 try: 951 result = next(g) 952 except StopIteration: 953 deferred.callback(result) 954 return deferred 955 except: 956 deferred.errback() 957 return deferred 958 959 # Deferred.callback(Deferred) raises an error; we catch this case 960 # early here and give a nicer error message to the user in case 961 # they yield a Deferred. 962 if isinstance(result, Deferred): 963 return fail(TypeError("Yield waitForDeferred(d), not d!")) 964 965 if isinstance(result, waitForDeferred): 966 # a waitForDeferred was yielded, get the result. 967 # Pass result in so it don't get changed going around the loop 968 # This isn't a problem for waiting, as it's only reused if 969 # gotResult has already been executed. 970 def gotResult(r, result=result): 971 result.result = r 972 if waiting[0]: 973 waiting[0] = False 974 waiting[1] = r 975 else: 976 _deferGenerator(g, deferred) 977 result.d.addBoth(gotResult) 978 if waiting[0]: 979 # Haven't called back yet, set flag so that we get reinvoked 980 # and return from the loop 981 waiting[0] = False 982 return deferred 983 # Reset waiting to initial values for next loop 984 waiting[0] = True 985 waiting[1] = None 986 987 result = None 988 989 990 991def deferredGenerator(f): 992 """ 993 L{deferredGenerator} and L{waitForDeferred} help you write 994 L{Deferred}-using code that looks like a regular sequential function. 995 Consider the use of L{inlineCallbacks} instead, which can accomplish 996 the same thing in a more concise manner. 997 998 There are two important functions involved: L{waitForDeferred}, and 999 L{deferredGenerator}. They are used together, like this:: 1000 1001 @deferredGenerator 1002 def thingummy(): 1003 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) 1004 yield thing 1005 thing = thing.getResult() 1006 print thing #the result! hoorj! 1007 1008 L{waitForDeferred} returns something that you should immediately yield; when 1009 your generator is resumed, calling C{thing.getResult()} will either give you 1010 the result of the L{Deferred} if it was a success, or raise an exception if it 1011 was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do 1012 not call it, I{your program will not work}. 1013 1014 L{deferredGenerator} takes one of these waitForDeferred-using generator 1015 functions and converts it into a function that returns a L{Deferred}. The 1016 result of the L{Deferred} will be the last value that your generator yielded 1017 unless the last value is a L{waitForDeferred} instance, in which case the 1018 result will be C{None}. If the function raises an unhandled exception, the 1019 L{Deferred} will errback instead. Remember that C{return result} won't work; 1020 use C{yield result; return} in place of that. 1021 1022 Note that not yielding anything from your generator will make the L{Deferred} 1023 result in C{None}. Yielding a L{Deferred} from your generator is also an error 1024 condition; always yield C{waitForDeferred(d)} instead. 1025 1026 The L{Deferred} returned from your deferred generator may also errback if your 1027 generator raised an exception. For example:: 1028 1029 @deferredGenerator 1030 def thingummy(): 1031 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) 1032 yield thing 1033 thing = thing.getResult() 1034 if thing == 'I love Twisted': 1035 # will become the result of the Deferred 1036 yield 'TWISTED IS GREAT!' 1037 return 1038 else: 1039 # will trigger an errback 1040 raise Exception('DESTROY ALL LIFE') 1041 1042 Put succinctly, these functions connect deferred-using code with this 'fake 1043 blocking' style in both directions: L{waitForDeferred} converts from a 1044 L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the 1045 'blocking' style to a L{Deferred}. 1046 """ 1047 @wraps(f) 1048 def unwindGenerator(*args, **kwargs): 1049 return _deferGenerator(f(*args, **kwargs), Deferred()) 1050 return unwindGenerator 1051 1052 1053## inlineCallbacks 1054 1055 1056 1057class _DefGen_Return(BaseException): 1058 def __init__(self, value): 1059 self.value = value 1060 1061 1062 1063def returnValue(val): 1064 """ 1065 Return val from a L{inlineCallbacks} generator. 1066 1067 Note: this is currently implemented by raising an exception 1068 derived from L{BaseException}. You might want to change any 1069 'except:' clauses to an 'except Exception:' clause so as not to 1070 catch this exception. 1071 1072 Also: while this function currently will work when called from 1073 within arbitrary functions called from within the generator, do 1074 not rely upon this behavior. 1075 """ 1076 raise _DefGen_Return(val) 1077 1078 1079 1080def _inlineCallbacks(result, g, deferred): 1081 """ 1082 See L{inlineCallbacks}. 1083 """ 1084 # This function is complicated by the need to prevent unbounded recursion 1085 # arising from repeatedly yielding immediately ready deferreds. This while 1086 # loop and the waiting variable solve that by manually unfolding the 1087 # recursion. 1088 1089 waiting = [True, # waiting for result? 1090 None] # result 1091 1092 while 1: 1093 try: 1094 # Send the last result back as the result of the yield expression. 1095 isFailure = isinstance(result, failure.Failure) 1096 if isFailure: 1097 result = result.throwExceptionIntoGenerator(g) 1098 else: 1099 result = g.send(result) 1100 except StopIteration: 1101 # fell off the end, or "return" statement 1102 deferred.callback(None) 1103 return deferred 1104 except _DefGen_Return as e: 1105 # returnValue() was called; time to give a result to the original 1106 # Deferred. First though, let's try to identify the potentially 1107 # confusing situation which results when returnValue() is 1108 # accidentally invoked from a different function, one that wasn't 1109 # decorated with @inlineCallbacks. 1110 1111 # The traceback starts in this frame (the one for 1112 # _inlineCallbacks); the next one down should be the application 1113 # code. 1114 appCodeTrace = exc_info()[2].tb_next 1115 if isFailure: 1116 # If we invoked this generator frame by throwing an exception 1117 # into it, then throwExceptionIntoGenerator will consume an 1118 # additional stack frame itself, so we need to skip that too. 1119 appCodeTrace = appCodeTrace.tb_next 1120 # Now that we've identified the frame being exited by the 1121 # exception, let's figure out if returnValue was called from it 1122 # directly. returnValue itself consumes a stack frame, so the 1123 # application code will have a tb_next, but it will *not* have a 1124 # second tb_next. 1125 if appCodeTrace.tb_next.tb_next: 1126 # If returnValue was invoked non-local to the frame which it is 1127 # exiting, identify the frame that ultimately invoked 1128 # returnValue so that we can warn the user, as this behavior is 1129 # confusing. 1130 ultimateTrace = appCodeTrace 1131 while ultimateTrace.tb_next.tb_next: 1132 ultimateTrace = ultimateTrace.tb_next 1133 filename = ultimateTrace.tb_frame.f_code.co_filename 1134 lineno = ultimateTrace.tb_lineno 1135 warnings.warn_explicit( 1136 "returnValue() in %r causing %r to exit: " 1137 "returnValue should only be invoked by functions decorated " 1138 "with inlineCallbacks" % ( 1139 ultimateTrace.tb_frame.f_code.co_name, 1140 appCodeTrace.tb_frame.f_code.co_name), 1141 DeprecationWarning, filename, lineno) 1142 deferred.callback(e.value) 1143 return deferred 1144 except: 1145 deferred.errback() 1146 return deferred 1147 1148 if isinstance(result, Deferred): 1149 # a deferred was yielded, get the result. 1150 def gotResult(r): 1151 if waiting[0]: 1152 waiting[0] = False 1153 waiting[1] = r 1154 else: 1155 _inlineCallbacks(r, g, deferred) 1156 1157 result.addBoth(gotResult) 1158 if waiting[0]: 1159 # Haven't called back yet, set flag so that we get reinvoked 1160 # and return from the loop 1161 waiting[0] = False 1162 return deferred 1163 1164 result = waiting[1] 1165 # Reset waiting to initial values for next loop. gotResult uses 1166 # waiting, but this isn't a problem because gotResult is only 1167 # executed once, and if it hasn't been executed yet, the return 1168 # branch above would have been taken. 1169 1170 1171 waiting[0] = True 1172 waiting[1] = None 1173 1174 1175 return deferred 1176 1177 1178 1179def inlineCallbacks(f): 1180 """ 1181 inlineCallbacks helps you write L{Deferred}-using code that looks like a 1182 regular sequential function. For example:: 1183 1184 @inlineCallBacks 1185 def thingummy(): 1186 thing = yield makeSomeRequestResultingInDeferred() 1187 print(thing) # the result! hoorj! 1188 1189 When you call anything that results in a L{Deferred}, you can simply yield it; 1190 your generator will automatically be resumed when the Deferred's result is 1191 available. The generator will be sent the result of the L{Deferred} with the 1192 'send' method on generators, or if the result was a failure, 'throw'. 1193 1194 Things that are not L{Deferred}s may also be yielded, and your generator 1195 will be resumed with the same object sent back. This means C{yield} 1196 performs an operation roughly equivalent to L{maybeDeferred}. 1197 1198 Your inlineCallbacks-enabled generator will return a L{Deferred} object, which 1199 will result in the return value of the generator (or will fail with a 1200 failure object if your generator raises an unhandled exception). Note that 1201 you can't use C{return result} to return a value; use C{returnValue(result)} 1202 instead. Falling off the end of the generator, or simply using C{return} 1203 will cause the L{Deferred} to have a result of C{None}. 1204 1205 Be aware that L{returnValue} will not accept a L{Deferred} as a parameter. 1206 If you believe the thing you'd like to return could be a L{Deferred}, do 1207 this:: 1208 1209 result = yield result 1210 returnValue(result) 1211 1212 The L{Deferred} returned from your deferred generator may errback if your 1213 generator raised an exception:: 1214 1215 @inlineCallbacks 1216 def thingummy(): 1217 thing = yield makeSomeRequestResultingInDeferred() 1218 if thing == 'I love Twisted': 1219 # will become the result of the Deferred 1220 returnValue('TWISTED IS GREAT!') 1221 else: 1222 # will trigger an errback 1223 raise Exception('DESTROY ALL LIFE') 1224 """ 1225 @wraps(f) 1226 def unwindGenerator(*args, **kwargs): 1227 try: 1228 gen = f(*args, **kwargs) 1229 except _DefGen_Return: 1230 raise TypeError( 1231 "inlineCallbacks requires %r to produce a generator; instead" 1232 "caught returnValue being used in a non-generator" % (f,)) 1233 if not isinstance(gen, types.GeneratorType): 1234 raise TypeError( 1235 "inlineCallbacks requires %r to produce a generator; " 1236 "instead got %r" % (f, gen)) 1237 return _inlineCallbacks(None, gen, Deferred()) 1238 return unwindGenerator 1239 1240 1241## DeferredLock/DeferredQueue 1242 1243class _ConcurrencyPrimitive(object): 1244 def __init__(self): 1245 self.waiting = [] 1246 1247 1248 def _releaseAndReturn(self, r): 1249 self.release() 1250 return r 1251 1252 1253 def run(*args, **kwargs): 1254 """ 1255 Acquire, run, release. 1256 1257 This function takes a callable as its first argument and any 1258 number of other positional and keyword arguments. When the 1259 lock or semaphore is acquired, the callable will be invoked 1260 with those arguments. 1261 1262 The callable may return a L{Deferred}; if it does, the lock or 1263 semaphore won't be released until that L{Deferred} fires. 1264 1265 @return: L{Deferred} of function result. 1266 """ 1267 if len(args) < 2: 1268 if not args: 1269 raise TypeError("run() takes at least 2 arguments, none given.") 1270 raise TypeError("%s.run() takes at least 2 arguments, 1 given" % ( 1271 args[0].__class__.__name__,)) 1272 self, f = args[:2] 1273 args = args[2:] 1274 1275 def execute(ignoredResult): 1276 d = maybeDeferred(f, *args, **kwargs) 1277 d.addBoth(self._releaseAndReturn) 1278 return d 1279 1280 d = self.acquire() 1281 d.addCallback(execute) 1282 return d 1283 1284 1285 1286class DeferredLock(_ConcurrencyPrimitive): 1287 """ 1288 A lock for event driven systems. 1289 1290 @ivar locked: C{True} when this Lock has been acquired, false at all other 1291 times. Do not change this value, but it is useful to examine for the 1292 equivalent of a "non-blocking" acquisition. 1293 """ 1294 1295 locked = False 1296 1297 1298 def _cancelAcquire(self, d): 1299 """ 1300 Remove a deferred d from our waiting list, as the deferred has been 1301 canceled. 1302 1303 Note: We do not need to wrap this in a try/except to catch d not 1304 being in self.waiting because this canceller will not be called if 1305 d has fired. release() pops a deferred out of self.waiting and 1306 calls it, so the canceller will no longer be called. 1307 1308 @param d: The deferred that has been canceled. 1309 """ 1310 self.waiting.remove(d) 1311 1312 1313 def acquire(self): 1314 """ 1315 Attempt to acquire the lock. Returns a L{Deferred} that fires on 1316 lock acquisition with the L{DeferredLock} as the value. If the lock 1317 is locked, then the Deferred is placed at the end of a waiting list. 1318 1319 @return: a L{Deferred} which fires on lock acquisition. 1320 @rtype: a L{Deferred} 1321 """ 1322 d = Deferred(canceller=self._cancelAcquire) 1323 if self.locked: 1324 self.waiting.append(d) 1325 else: 1326 self.locked = True 1327 d.callback(self) 1328 return d 1329 1330 1331 def release(self): 1332 """ 1333 Release the lock. If there is a waiting list, then the first 1334 L{Deferred} in that waiting list will be called back. 1335 1336 Should be called by whomever did the L{acquire}() when the shared 1337 resource is free. 1338 """ 1339 assert self.locked, "Tried to release an unlocked lock" 1340 self.locked = False 1341 if self.waiting: 1342 # someone is waiting to acquire lock 1343 self.locked = True 1344 d = self.waiting.pop(0) 1345 d.callback(self) 1346 1347 1348 1349class DeferredSemaphore(_ConcurrencyPrimitive): 1350 """ 1351 A semaphore for event driven systems. 1352 1353 If you are looking into this as a means of limiting parallelism, you might 1354 find L{twisted.internet.task.Cooperator} more useful. 1355 1356 @ivar tokens: At most this many users may acquire this semaphore at 1357 once. 1358 @type tokens: C{int} 1359 1360 @ivar limit: The difference between C{tokens} and the number of users 1361 which have currently acquired this semaphore. 1362 @type limit: C{int} 1363 """ 1364 1365 def __init__(self, tokens): 1366 _ConcurrencyPrimitive.__init__(self) 1367 if tokens < 1: 1368 raise ValueError("DeferredSemaphore requires tokens >= 1") 1369 self.tokens = tokens 1370 self.limit = tokens 1371 1372 1373 def _cancelAcquire(self, d): 1374 """ 1375 Remove a deferred d from our waiting list, as the deferred has been 1376 canceled. 1377 1378 Note: We do not need to wrap this in a try/except to catch d not 1379 being in self.waiting because this canceller will not be called if 1380 d has fired. release() pops a deferred out of self.waiting and 1381 calls it, so the canceller will no longer be called. 1382 1383 @param d: The deferred that has been canceled. 1384 """ 1385 self.waiting.remove(d) 1386 1387 1388 def acquire(self): 1389 """ 1390 Attempt to acquire the token. 1391 1392 @return: a L{Deferred} which fires on token acquisition. 1393 """ 1394 assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative" 1395 d = Deferred(canceller=self._cancelAcquire) 1396 if not self.tokens: 1397 self.waiting.append(d) 1398 else: 1399 self.tokens = self.tokens - 1 1400 d.callback(self) 1401 return d 1402 1403 1404 def release(self): 1405 """ 1406 Release the token. 1407 1408 Should be called by whoever did the L{acquire}() when the shared 1409 resource is free. 1410 """ 1411 assert self.tokens < self.limit, "Someone released me too many times: too many tokens!" 1412 self.tokens = self.tokens + 1 1413 if self.waiting: 1414 # someone is waiting to acquire token 1415 self.tokens = self.tokens - 1 1416 d = self.waiting.pop(0) 1417 d.callback(self) 1418 1419 1420 1421class QueueOverflow(Exception): 1422 pass 1423 1424 1425 1426class QueueUnderflow(Exception): 1427 pass 1428 1429 1430 1431class DeferredQueue(object): 1432 """ 1433 An event driven queue. 1434 1435 Objects may be added as usual to this queue. When an attempt is 1436 made to retrieve an object when the queue is empty, a L{Deferred} is 1437 returned which will fire when an object becomes available. 1438 1439 @ivar size: The maximum number of objects to allow into the queue 1440 at a time. When an attempt to add a new object would exceed this 1441 limit, L{QueueOverflow} is raised synchronously. C{None} for no limit. 1442 1443 @ivar backlog: The maximum number of L{Deferred} gets to allow at 1444 one time. When an attempt is made to get an object which would 1445 exceed this limit, L{QueueUnderflow} is raised synchronously. C{None} 1446 for no limit. 1447 """ 1448 1449 def __init__(self, size=None, backlog=None): 1450 self.waiting = [] 1451 self.pending = [] 1452 self.size = size 1453 self.backlog = backlog 1454 1455 1456 def _cancelGet(self, d): 1457 """ 1458 Remove a deferred d from our waiting list, as the deferred has been 1459 canceled. 1460 1461 Note: We do not need to wrap this in a try/except to catch d not 1462 being in self.waiting because this canceller will not be called if 1463 d has fired. put() pops a deferred out of self.waiting and calls 1464 it, so the canceller will no longer be called. 1465 1466 @param d: The deferred that has been canceled. 1467 """ 1468 self.waiting.remove(d) 1469 1470 1471 def put(self, obj): 1472 """ 1473 Add an object to this queue. 1474 1475 @raise QueueOverflow: Too many objects are in this queue. 1476 """ 1477 if self.waiting: 1478 self.waiting.pop(0).callback(obj) 1479 elif self.size is None or len(self.pending) < self.size: 1480 self.pending.append(obj) 1481 else: 1482 raise QueueOverflow() 1483 1484 1485 def get(self): 1486 """ 1487 Attempt to retrieve and remove an object from the queue. 1488 1489 @return: a L{Deferred} which fires with the next object available in 1490 the queue. 1491 1492 @raise QueueUnderflow: Too many (more than C{backlog}) 1493 L{Deferred}s are already waiting for an object from this queue. 1494 """ 1495 if self.pending: 1496 return succeed(self.pending.pop(0)) 1497 elif self.backlog is None or len(self.waiting) < self.backlog: 1498 d = Deferred(canceller=self._cancelGet) 1499 self.waiting.append(d) 1500 return d 1501 else: 1502 raise QueueUnderflow() 1503 1504 1505 1506class AlreadyTryingToLockError(Exception): 1507 """ 1508 Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a 1509 single L{DeferredFilesystemLock}. 1510 """ 1511 1512 1513 1514class DeferredFilesystemLock(lockfile.FilesystemLock): 1515 """ 1516 A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is 1517 acquired. 1518 1519 @ivar _scheduler: The object in charge of scheduling retries. In this 1520 implementation this is parameterized for testing. 1521 1522 @ivar _interval: The retry interval for an L{IReactorTime} based scheduler. 1523 1524 @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage 1525 the next retry for aquiring the lock. 1526 1527 @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout 1528 argument. This is in charge of timing out our attempt to acquire the 1529 lock. 1530 """ 1531 _interval = 1 1532 _tryLockCall = None 1533 _timeoutCall = None 1534 1535 1536 def __init__(self, name, scheduler=None): 1537 """ 1538 @param name: The name of the lock to acquire 1539 @param scheduler: An object which provides L{IReactorTime} 1540 """ 1541 lockfile.FilesystemLock.__init__(self, name) 1542 1543 if scheduler is None: 1544 from twisted.internet import reactor 1545 scheduler = reactor 1546 1547 self._scheduler = scheduler 1548 1549 1550 def deferUntilLocked(self, timeout=None): 1551 """ 1552 Wait until we acquire this lock. This method is not safe for 1553 concurrent use. 1554 1555 @type timeout: C{float} or C{int} 1556 @param timeout: the number of seconds after which to time out if the 1557 lock has not been acquired. 1558 1559 @return: a L{Deferred} which will callback when the lock is acquired, or 1560 errback with a L{TimeoutError} after timing out or an 1561 L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already 1562 been called and not successfully locked the file. 1563 """ 1564 if self._tryLockCall is not None: 1565 return fail( 1566 AlreadyTryingToLockError( 1567 "deferUntilLocked isn't safe for concurrent use.")) 1568 1569 def _cancelLock(reason): 1570 """ 1571 Cancel a L{DeferredFilesystemLock.deferUntilLocked} call. 1572 1573 @type reason: L{failure.Failure} 1574 @param reason: The reason why the call is cancelled. 1575 """ 1576 self._tryLockCall.cancel() 1577 self._tryLockCall = None 1578 if self._timeoutCall is not None and self._timeoutCall.active(): 1579 self._timeoutCall.cancel() 1580 self._timeoutCall = None 1581 1582 if self.lock(): 1583 d.callback(None) 1584 else: 1585 d.errback(reason) 1586 1587 d = Deferred(lambda deferred: _cancelLock(CancelledError())) 1588 1589 def _tryLock(): 1590 if self.lock(): 1591 if self._timeoutCall is not None: 1592 self._timeoutCall.cancel() 1593 self._timeoutCall = None 1594 1595 self._tryLockCall = None 1596 1597 d.callback(None) 1598 else: 1599 if timeout is not None and self._timeoutCall is None: 1600 reason = failure.Failure(TimeoutError( 1601 "Timed out aquiring lock: %s after %fs" % ( 1602 self.name, 1603 timeout))) 1604 self._timeoutCall = self._scheduler.callLater( 1605 timeout, _cancelLock, reason) 1606 1607 self._tryLockCall = self._scheduler.callLater( 1608 self._interval, _tryLock) 1609 1610 _tryLock() 1611 1612 return d 1613 1614 1615 1616__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS", 1617 "AlreadyCalledError", "TimeoutError", "gatherResults", 1618 "maybeDeferred", 1619 "waitForDeferred", "deferredGenerator", "inlineCallbacks", 1620 "returnValue", 1621 "DeferredLock", "DeferredSemaphore", "DeferredQueue", 1622 "DeferredFilesystemLock", "AlreadyTryingToLockError", 1623 ] 1624