1# -*- test-case-name: twisted.test.test_abstract -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Support for generic select()able objects. 7""" 8 9from __future__ import division, absolute_import 10 11from socket import AF_INET6, inet_pton, error 12 13from zope.interface import implementer 14 15# Twisted Imports 16from twisted.python.compat import _PY3, unicode, lazyByteSlice 17from twisted.python import reflect, failure 18from twisted.internet import interfaces, main 19 20if _PY3: 21 def _concatenate(bObj, offset, bArray): 22 # Python 3 lacks the buffer() builtin and the other primitives don't 23 # help in this case. Just do the copy. Perhaps later these buffers can 24 # be joined and FileDescriptor can use writev(). Or perhaps bytearrays 25 # would help. 26 return bObj[offset:] + b"".join(bArray) 27else: 28 def _concatenate(bObj, offset, bArray): 29 # Avoid one extra string copy by using a buffer to limit what we include 30 # in the result. 31 return buffer(bObj, offset) + b"".join(bArray) 32 33 34class _ConsumerMixin(object): 35 """ 36 L{IConsumer} implementations can mix this in to get C{registerProducer} and 37 C{unregisterProducer} methods which take care of keeping track of a 38 producer's state. 39 40 Subclasses must provide three attributes which L{_ConsumerMixin} will read 41 but not write: 42 43 - connected: A C{bool} which is C{True} as long as the the consumer has 44 someplace to send bytes (for example, a TCP connection), and then 45 C{False} when it no longer does. 46 47 - disconnecting: A C{bool} which is C{False} until something like 48 L{ITransport.loseConnection} is called, indicating that the send buffer 49 should be flushed and the connection lost afterwards. Afterwards, 50 C{True}. 51 52 - disconnected: A C{bool} which is C{False} until the consumer no longer 53 has a place to send bytes, then C{True}. 54 55 Subclasses must also override the C{startWriting} method. 56 57 @ivar producer: C{None} if no producer is registered, otherwise the 58 registered producer. 59 60 @ivar producerPaused: A flag indicating whether the producer is currently 61 paused. 62 @type producerPaused: L{bool} 63 64 @ivar streamingProducer: A flag indicating whether the producer was 65 registered as a streaming (ie push) producer or not (ie a pull 66 producer). This will determine whether the consumer may ever need to 67 pause and resume it, or if it can merely call C{resumeProducing} on it 68 when buffer space is available. 69 @ivar streamingProducer: C{bool} or C{int} 70 71 """ 72 producer = None 73 producerPaused = False 74 streamingProducer = False 75 76 def startWriting(self): 77 """ 78 Override in a subclass to cause the reactor to monitor this selectable 79 for write events. This will be called once in C{unregisterProducer} if 80 C{loseConnection} has previously been called, so that the connection can 81 actually close. 82 """ 83 raise NotImplementedError("%r did not implement startWriting") 84 85 86 def registerProducer(self, producer, streaming): 87 """ 88 Register to receive data from a producer. 89 90 This sets this selectable to be a consumer for a producer. When this 91 selectable runs out of data on a write() call, it will ask the producer 92 to resumeProducing(). When the FileDescriptor's internal data buffer is 93 filled, it will ask the producer to pauseProducing(). If the connection 94 is lost, FileDescriptor calls producer's stopProducing() method. 95 96 If streaming is true, the producer should provide the IPushProducer 97 interface. Otherwise, it is assumed that producer provides the 98 IPullProducer interface. In this case, the producer won't be asked to 99 pauseProducing(), but it has to be careful to write() data only when its 100 resumeProducing() method is called. 101 """ 102 if self.producer is not None: 103 raise RuntimeError( 104 "Cannot register producer %s, because producer %s was never " 105 "unregistered." % (producer, self.producer)) 106 if self.disconnected: 107 producer.stopProducing() 108 else: 109 self.producer = producer 110 self.streamingProducer = streaming 111 if not streaming: 112 producer.resumeProducing() 113 114 115 def unregisterProducer(self): 116 """ 117 Stop consuming data from a producer, without disconnecting. 118 """ 119 self.producer = None 120 if self.connected and self.disconnecting: 121 self.startWriting() 122 123 124 125@implementer(interfaces.ILoggingContext) 126class _LogOwner(object): 127 """ 128 Mixin to help implement L{interfaces.ILoggingContext} for transports which 129 have a protocol, the log prefix of which should also appear in the 130 transport's log prefix. 131 """ 132 133 def _getLogPrefix(self, applicationObject): 134 """ 135 Determine the log prefix to use for messages related to 136 C{applicationObject}, which may or may not be an 137 L{interfaces.ILoggingContext} provider. 138 139 @return: A C{str} giving the log prefix to use. 140 """ 141 if interfaces.ILoggingContext.providedBy(applicationObject): 142 return applicationObject.logPrefix() 143 return applicationObject.__class__.__name__ 144 145 146 def logPrefix(self): 147 """ 148 Override this method to insert custom logging behavior. Its 149 return value will be inserted in front of every line. It may 150 be called more times than the number of output lines. 151 """ 152 return "-" 153 154 155 156@implementer( 157 interfaces.IPushProducer, interfaces.IReadWriteDescriptor, 158 interfaces.IConsumer, interfaces.ITransport, 159 interfaces.IHalfCloseableDescriptor) 160class FileDescriptor(_ConsumerMixin, _LogOwner): 161 """ 162 An object which can be operated on by select(). 163 164 This is an abstract superclass of all objects which may be notified when 165 they are readable or writable; e.g. they have a file-descriptor that is 166 valid to be passed to select(2). 167 """ 168 connected = 0 169 disconnected = 0 170 disconnecting = 0 171 _writeDisconnecting = False 172 _writeDisconnected = False 173 dataBuffer = b"" 174 offset = 0 175 176 SEND_LIMIT = 128*1024 177 178 def __init__(self, reactor=None): 179 """ 180 @param reactor: An L{IReactorFDSet} provider which this descriptor will 181 use to get readable and writeable event notifications. If no value 182 is given, the global reactor will be used. 183 """ 184 if not reactor: 185 from twisted.internet import reactor 186 self.reactor = reactor 187 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite 188 self._tempDataLen = 0 189 190 191 def connectionLost(self, reason): 192 """The connection was lost. 193 194 This is called when the connection on a selectable object has been 195 lost. It will be called whether the connection was closed explicitly, 196 an exception occurred in an event handler, or the other end of the 197 connection closed it first. 198 199 Clean up state here, but make sure to call back up to FileDescriptor. 200 """ 201 self.disconnected = 1 202 self.connected = 0 203 if self.producer is not None: 204 self.producer.stopProducing() 205 self.producer = None 206 self.stopReading() 207 self.stopWriting() 208 209 210 def writeSomeData(self, data): 211 """ 212 Write as much as possible of the given data, immediately. 213 214 This is called to invoke the lower-level writing functionality, such 215 as a socket's send() method, or a file's write(); this method 216 returns an integer or an exception. If an integer, it is the number 217 of bytes written (possibly zero); if an exception, it indicates the 218 connection was lost. 219 """ 220 raise NotImplementedError("%s does not implement writeSomeData" % 221 reflect.qual(self.__class__)) 222 223 224 def doRead(self): 225 """ 226 Called when data is available for reading. 227 228 Subclasses must override this method. The result will be interpreted 229 in the same way as a result of doWrite(). 230 """ 231 raise NotImplementedError("%s does not implement doRead" % 232 reflect.qual(self.__class__)) 233 234 def doWrite(self): 235 """ 236 Called when data can be written. 237 238 @return: C{None} on success, an exception or a negative integer on 239 failure. 240 241 @see: L{twisted.internet.interfaces.IWriteDescriptor.doWrite}. 242 """ 243 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: 244 # If there is currently less than SEND_LIMIT bytes left to send 245 # in the string, extend it with the array data. 246 self.dataBuffer = _concatenate( 247 self.dataBuffer, self.offset, self._tempDataBuffer) 248 self.offset = 0 249 self._tempDataBuffer = [] 250 self._tempDataLen = 0 251 252 # Send as much data as you can. 253 if self.offset: 254 l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset)) 255 else: 256 l = self.writeSomeData(self.dataBuffer) 257 258 # There is no writeSomeData implementation in Twisted which returns 259 # < 0, but the documentation for writeSomeData used to claim negative 260 # integers meant connection lost. Keep supporting this here, 261 # although it may be worth deprecating and removing at some point. 262 if isinstance(l, Exception) or l < 0: 263 return l 264 self.offset += l 265 # If there is nothing left to send, 266 if self.offset == len(self.dataBuffer) and not self._tempDataLen: 267 self.dataBuffer = b"" 268 self.offset = 0 269 # stop writing. 270 self.stopWriting() 271 # If I've got a producer who is supposed to supply me with data, 272 if self.producer is not None and ((not self.streamingProducer) 273 or self.producerPaused): 274 # tell them to supply some more. 275 self.producerPaused = False 276 self.producer.resumeProducing() 277 elif self.disconnecting: 278 # But if I was previously asked to let the connection die, do 279 # so. 280 return self._postLoseConnection() 281 elif self._writeDisconnecting: 282 # I was previously asked to half-close the connection. We 283 # set _writeDisconnected before calling handler, in case the 284 # handler calls loseConnection(), which will want to check for 285 # this attribute. 286 self._writeDisconnected = True 287 result = self._closeWriteConnection() 288 return result 289 return None 290 291 def _postLoseConnection(self): 292 """Called after a loseConnection(), when all data has been written. 293 294 Whatever this returns is then returned by doWrite. 295 """ 296 # default implementation, telling reactor we're finished 297 return main.CONNECTION_DONE 298 299 def _closeWriteConnection(self): 300 # override in subclasses 301 pass 302 303 def writeConnectionLost(self, reason): 304 # in current code should never be called 305 self.connectionLost(reason) 306 307 def readConnectionLost(self, reason): 308 # override in subclasses 309 self.connectionLost(reason) 310 311 312 def _isSendBufferFull(self): 313 """ 314 Determine whether the user-space send buffer for this transport is full 315 or not. 316 317 When the buffer contains more than C{self.bufferSize} bytes, it is 318 considered full. This might be improved by considering the size of the 319 kernel send buffer and how much of it is free. 320 321 @return: C{True} if it is full, C{False} otherwise. 322 """ 323 return len(self.dataBuffer) + self._tempDataLen > self.bufferSize 324 325 326 def _maybePauseProducer(self): 327 """ 328 Possibly pause a producer, if there is one and the send buffer is full. 329 """ 330 # If we are responsible for pausing our producer, 331 if self.producer is not None and self.streamingProducer: 332 # and our buffer is full, 333 if self._isSendBufferFull(): 334 # pause it. 335 self.producerPaused = True 336 self.producer.pauseProducing() 337 338 339 def write(self, data): 340 """Reliably write some data. 341 342 The data is buffered until the underlying file descriptor is ready 343 for writing. If there is more than C{self.bufferSize} data in the 344 buffer and this descriptor has a registered streaming producer, its 345 C{pauseProducing()} method will be called. 346 """ 347 if isinstance(data, unicode): # no, really, I mean it 348 raise TypeError("Data must not be unicode") 349 if not self.connected or self._writeDisconnected: 350 return 351 if data: 352 self._tempDataBuffer.append(data) 353 self._tempDataLen += len(data) 354 self._maybePauseProducer() 355 self.startWriting() 356 357 358 def writeSequence(self, iovec): 359 """ 360 Reliably write a sequence of data. 361 362 Currently, this is a convenience method roughly equivalent to:: 363 364 for chunk in iovec: 365 fd.write(chunk) 366 367 It may have a more efficient implementation at a later time or in a 368 different reactor. 369 370 As with the C{write()} method, if a buffer size limit is reached and a 371 streaming producer is registered, it will be paused until the buffered 372 data is written to the underlying file descriptor. 373 """ 374 for i in iovec: 375 if isinstance(i, unicode): # no, really, I mean it 376 raise TypeError("Data must not be unicode") 377 if not self.connected or not iovec or self._writeDisconnected: 378 return 379 self._tempDataBuffer.extend(iovec) 380 for i in iovec: 381 self._tempDataLen += len(i) 382 self._maybePauseProducer() 383 self.startWriting() 384 385 386 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): 387 """Close the connection at the next available opportunity. 388 389 Call this to cause this FileDescriptor to lose its connection. It will 390 first write any data that it has buffered. 391 392 If there is data buffered yet to be written, this method will cause the 393 transport to lose its connection as soon as it's done flushing its 394 write buffer. If you have a producer registered, the connection won't 395 be closed until the producer is finished. Therefore, make sure you 396 unregister your producer when it's finished, or the connection will 397 never close. 398 """ 399 400 if self.connected and not self.disconnecting: 401 if self._writeDisconnected: 402 # doWrite won't trigger the connection close anymore 403 self.stopReading() 404 self.stopWriting() 405 self.connectionLost(_connDone) 406 else: 407 self.stopReading() 408 self.startWriting() 409 self.disconnecting = 1 410 411 def loseWriteConnection(self): 412 self._writeDisconnecting = True 413 self.startWriting() 414 415 def stopReading(self): 416 """Stop waiting for read availability. 417 418 Call this to remove this selectable from being notified when it is 419 ready for reading. 420 """ 421 self.reactor.removeReader(self) 422 423 def stopWriting(self): 424 """Stop waiting for write availability. 425 426 Call this to remove this selectable from being notified when it is ready 427 for writing. 428 """ 429 self.reactor.removeWriter(self) 430 431 def startReading(self): 432 """Start waiting for read availability. 433 """ 434 self.reactor.addReader(self) 435 436 def startWriting(self): 437 """Start waiting for write availability. 438 439 Call this to have this FileDescriptor be notified whenever it is ready for 440 writing. 441 """ 442 self.reactor.addWriter(self) 443 444 # Producer/consumer implementation 445 446 # first, the consumer stuff. This requires no additional work, as 447 # any object you can write to can be a consumer, really. 448 449 producer = None 450 bufferSize = 2**2**2**2 451 452 def stopConsuming(self): 453 """Stop consuming data. 454 455 This is called when a producer has lost its connection, to tell the 456 consumer to go lose its connection (and break potential circular 457 references). 458 """ 459 self.unregisterProducer() 460 self.loseConnection() 461 462 # producer interface implementation 463 464 def resumeProducing(self): 465 if self.connected and not self.disconnecting: 466 self.startReading() 467 468 def pauseProducing(self): 469 self.stopReading() 470 471 def stopProducing(self): 472 self.loseConnection() 473 474 475 def fileno(self): 476 """File Descriptor number for select(). 477 478 This method must be overridden or assigned in subclasses to 479 indicate a valid file descriptor for the operating system. 480 """ 481 return -1 482 483 484def isIPAddress(addr): 485 """ 486 Determine whether the given string represents an IPv4 address. 487 488 @type addr: C{str} 489 @param addr: A string which may or may not be the decimal dotted 490 representation of an IPv4 address. 491 492 @rtype: C{bool} 493 @return: C{True} if C{addr} represents an IPv4 address, C{False} 494 otherwise. 495 """ 496 dottedParts = addr.split('.') 497 if len(dottedParts) == 4: 498 for octet in dottedParts: 499 try: 500 value = int(octet) 501 except ValueError: 502 return False 503 else: 504 if value < 0 or value > 255: 505 return False 506 return True 507 return False 508 509 510def isIPv6Address(addr): 511 """ 512 Determine whether the given string represents an IPv6 address. 513 514 @param addr: A string which may or may not be the hex 515 representation of an IPv6 address. 516 @type addr: C{str} 517 518 @return: C{True} if C{addr} represents an IPv6 address, C{False} 519 otherwise. 520 @rtype: C{bool} 521 """ 522 if '%' in addr: 523 addr = addr.split('%', 1)[0] 524 if not addr: 525 return False 526 try: 527 # This might be a native implementation or the one from 528 # twisted.python.compat. 529 inet_pton(AF_INET6, addr) 530 except (ValueError, error): 531 return False 532 return True 533 534 535__all__ = ["FileDescriptor", "isIPAddress", "isIPv6Address"] 536