1# -*- test-case-name: vertex.test.test_subproducer -*- 2# Copyright 2005 Divmod, Inc. See LICENSE file for details 3 4from twisted.python import log 5 6class SuperProducer: 7 """I am a mixin which provides support for mixing in several producers to one 8 producer. I act as a consumer for my producers and as a producer for one 9 consumer. 10 11 I must be mixed into a protocol, or something else with a 'transport' attribute. 12 """ 13 14 producersPaused = False 15 16 def __init__(self): 17 self.producingTransports = {} 18 19 def pauseProducing(self): 20 self.producersPaused = True 21 for transport in self.producingTransports.keys(): 22 try: 23 transport.parentPauseProducing() 24 except: 25 del self.producingTransports[transport] 26 log.err() 27 28 def resumeProducing(self): 29 producersWerePaused = self.producersPaused 30 if producersWerePaused: 31 self.producersPaused = False 32 for transport in self.producingTransports.keys(): 33 try: 34 transport.parentResumeProducing() 35 except: 36 del self.producingTransports[transport] 37 log.err() 38 39 def stopProducing(self): 40 for transport in self.producingTransports.keys(): 41 try: 42 transport.parentStopProducing() 43 except: 44 log.err() 45 self.producingTransports = {} 46 47 def registerProducerFor(self, trans): 48 if not self.producersPaused: 49 trans.parentResumeProducing() 50 wasProducing = bool(self.producingTransports) 51 assert trans not in self.producingTransports 52 self.producingTransports[trans] = 1 53 if not wasProducing: 54 self.transport.registerProducer(self, False) 55 56 def unregisterProducerFor(self, trans): 57 if trans in self.producingTransports: 58 del self.producingTransports[trans] 59 if not self.producingTransports: 60 self.transport.unregisterProducer() 61 62 63class SubProducer: 64 """ I am a mixin that provides upwards-registration of my producer to a 65 SuperProducer instance. 66 """ 67 def __init__(self, superproducer): 68 self.superproducer = superproducer 69 self.producer = None 70 self.parentAcceptingData = True 71 self.peerAcceptingData = True 72 self.producerPaused = False 73 self.parentStopped = False 74 75 def maybeResumeProducing(self): 76 if ((self.producer is not None) and 77 ((not self.streamingProducer) or 78 (self.producerPaused)) and 79 (self.peerAcceptingData) and 80 (self.parentAcceptingData)): 81 self.producerPaused = False 82 self.producer.resumeProducing() 83 84 def maybePauseProducing(self): 85 if ((self.producer is not None) and 86 ((not self.peerAcceptingData) or 87 (not self.parentAcceptingData)) and 88 (not self.producerPaused)): 89 self.producerPaused = True 90 self.producer.pauseProducing() 91 92 def parentResumeProducing(self): 93 self.parentAcceptingData = True 94 self.maybeResumeProducing() 95 96 def parentPauseProducing(self): 97 self.parentAcceptingData = False 98 self.maybePauseProducing() 99 100 def parentStopProducing(self): 101 self.parentStopped = True 102 if self.producer is not None: 103 self.producer.stopProducing() 104 105 def choke(self): 106 self.peerAcceptingData = False 107 self.maybePauseProducing() 108 109 def unchoke(self): 110 self.peerAcceptingData = True 111 self.maybeResumeProducing() 112 113 def registerProducer(self, producer, streaming): 114 if self.parentStopped: 115 producer.stopProducing() 116 return 117 self.producer = producer 118 self.streamingProducer = streaming 119 self.superproducer.registerProducerFor(self) 120 121 def unregisterProducer(self): 122 if not self.parentStopped: 123 self.superproducer.unregisterProducerFor(self) 124 self.producer = None 125 126