1# -*- test-case-name: twisted.test.test_abstract -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Support for generic select()able objects.
7"""
8
9from __future__ import division, absolute_import
10
11from socket import AF_INET6, inet_pton, error
12
13from zope.interface import implementer
14
15# Twisted Imports
16from twisted.python.compat import _PY3, unicode, lazyByteSlice
17from twisted.python import reflect, failure
18from twisted.internet import interfaces, main
19
20if _PY3:
21    def _concatenate(bObj, offset, bArray):
22        # Python 3 lacks the buffer() builtin and the other primitives don't
23        # help in this case.  Just do the copy.  Perhaps later these buffers can
24        # be joined and FileDescriptor can use writev().  Or perhaps bytearrays
25        # would help.
26        return bObj[offset:] + b"".join(bArray)
27else:
28    def _concatenate(bObj, offset, bArray):
29        # Avoid one extra string copy by using a buffer to limit what we include
30        # in the result.
31        return buffer(bObj, offset) + b"".join(bArray)
32
33
34class _ConsumerMixin(object):
35    """
36    L{IConsumer} implementations can mix this in to get C{registerProducer} and
37    C{unregisterProducer} methods which take care of keeping track of a
38    producer's state.
39
40    Subclasses must provide three attributes which L{_ConsumerMixin} will read
41    but not write:
42
43      - connected: A C{bool} which is C{True} as long as the the consumer has
44        someplace to send bytes (for example, a TCP connection), and then
45        C{False} when it no longer does.
46
47      - disconnecting: A C{bool} which is C{False} until something like
48        L{ITransport.loseConnection} is called, indicating that the send buffer
49        should be flushed and the connection lost afterwards.  Afterwards,
50        C{True}.
51
52      - disconnected: A C{bool} which is C{False} until the consumer no longer
53        has a place to send bytes, then C{True}.
54
55    Subclasses must also override the C{startWriting} method.
56
57    @ivar producer: C{None} if no producer is registered, otherwise the
58        registered producer.
59
60    @ivar producerPaused: A flag indicating whether the producer is currently
61        paused.
62    @type producerPaused: L{bool}
63
64    @ivar streamingProducer: A flag indicating whether the producer was
65        registered as a streaming (ie push) producer or not (ie a pull
66        producer).  This will determine whether the consumer may ever need to
67        pause and resume it, or if it can merely call C{resumeProducing} on it
68        when buffer space is available.
69    @ivar streamingProducer: C{bool} or C{int}
70
71    """
72    producer = None
73    producerPaused = False
74    streamingProducer = False
75
76    def startWriting(self):
77        """
78        Override in a subclass to cause the reactor to monitor this selectable
79        for write events.  This will be called once in C{unregisterProducer} if
80        C{loseConnection} has previously been called, so that the connection can
81        actually close.
82        """
83        raise NotImplementedError("%r did not implement startWriting")
84
85
86    def registerProducer(self, producer, streaming):
87        """
88        Register to receive data from a producer.
89
90        This sets this selectable to be a consumer for a producer.  When this
91        selectable runs out of data on a write() call, it will ask the producer
92        to resumeProducing(). When the FileDescriptor's internal data buffer is
93        filled, it will ask the producer to pauseProducing(). If the connection
94        is lost, FileDescriptor calls producer's stopProducing() method.
95
96        If streaming is true, the producer should provide the IPushProducer
97        interface. Otherwise, it is assumed that producer provides the
98        IPullProducer interface. In this case, the producer won't be asked to
99        pauseProducing(), but it has to be careful to write() data only when its
100        resumeProducing() method is called.
101        """
102        if self.producer is not None:
103            raise RuntimeError(
104                "Cannot register producer %s, because producer %s was never "
105                "unregistered." % (producer, self.producer))
106        if self.disconnected:
107            producer.stopProducing()
108        else:
109            self.producer = producer
110            self.streamingProducer = streaming
111            if not streaming:
112                producer.resumeProducing()
113
114
115    def unregisterProducer(self):
116        """
117        Stop consuming data from a producer, without disconnecting.
118        """
119        self.producer = None
120        if self.connected and self.disconnecting:
121            self.startWriting()
122
123
124
125@implementer(interfaces.ILoggingContext)
126class _LogOwner(object):
127    """
128    Mixin to help implement L{interfaces.ILoggingContext} for transports which
129    have a protocol, the log prefix of which should also appear in the
130    transport's log prefix.
131    """
132
133    def _getLogPrefix(self, applicationObject):
134        """
135        Determine the log prefix to use for messages related to
136        C{applicationObject}, which may or may not be an
137        L{interfaces.ILoggingContext} provider.
138
139        @return: A C{str} giving the log prefix to use.
140        """
141        if interfaces.ILoggingContext.providedBy(applicationObject):
142            return applicationObject.logPrefix()
143        return applicationObject.__class__.__name__
144
145
146    def logPrefix(self):
147        """
148        Override this method to insert custom logging behavior.  Its
149        return value will be inserted in front of every line.  It may
150        be called more times than the number of output lines.
151        """
152        return "-"
153
154
155
156@implementer(
157    interfaces.IPushProducer, interfaces.IReadWriteDescriptor,
158    interfaces.IConsumer, interfaces.ITransport,
159    interfaces.IHalfCloseableDescriptor)
160class FileDescriptor(_ConsumerMixin, _LogOwner):
161    """
162    An object which can be operated on by select().
163
164    This is an abstract superclass of all objects which may be notified when
165    they are readable or writable; e.g. they have a file-descriptor that is
166    valid to be passed to select(2).
167    """
168    connected = 0
169    disconnected = 0
170    disconnecting = 0
171    _writeDisconnecting = False
172    _writeDisconnected = False
173    dataBuffer = b""
174    offset = 0
175
176    SEND_LIMIT = 128*1024
177
178    def __init__(self, reactor=None):
179        """
180        @param reactor: An L{IReactorFDSet} provider which this descriptor will
181            use to get readable and writeable event notifications.  If no value
182            is given, the global reactor will be used.
183        """
184        if not reactor:
185            from twisted.internet import reactor
186        self.reactor = reactor
187        self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
188        self._tempDataLen = 0
189
190
191    def connectionLost(self, reason):
192        """The connection was lost.
193
194        This is called when the connection on a selectable object has been
195        lost.  It will be called whether the connection was closed explicitly,
196        an exception occurred in an event handler, or the other end of the
197        connection closed it first.
198
199        Clean up state here, but make sure to call back up to FileDescriptor.
200        """
201        self.disconnected = 1
202        self.connected = 0
203        if self.producer is not None:
204            self.producer.stopProducing()
205            self.producer = None
206        self.stopReading()
207        self.stopWriting()
208
209
210    def writeSomeData(self, data):
211        """
212        Write as much as possible of the given data, immediately.
213
214        This is called to invoke the lower-level writing functionality, such
215        as a socket's send() method, or a file's write(); this method
216        returns an integer or an exception.  If an integer, it is the number
217        of bytes written (possibly zero); if an exception, it indicates the
218        connection was lost.
219        """
220        raise NotImplementedError("%s does not implement writeSomeData" %
221                                  reflect.qual(self.__class__))
222
223
224    def doRead(self):
225        """
226        Called when data is available for reading.
227
228        Subclasses must override this method. The result will be interpreted
229        in the same way as a result of doWrite().
230        """
231        raise NotImplementedError("%s does not implement doRead" %
232                                  reflect.qual(self.__class__))
233
234    def doWrite(self):
235        """
236        Called when data can be written.
237
238        @return: C{None} on success, an exception or a negative integer on
239            failure.
240
241        @see: L{twisted.internet.interfaces.IWriteDescriptor.doWrite}.
242        """
243        if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
244            # If there is currently less than SEND_LIMIT bytes left to send
245            # in the string, extend it with the array data.
246            self.dataBuffer = _concatenate(
247                self.dataBuffer, self.offset, self._tempDataBuffer)
248            self.offset = 0
249            self._tempDataBuffer = []
250            self._tempDataLen = 0
251
252        # Send as much data as you can.
253        if self.offset:
254            l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset))
255        else:
256            l = self.writeSomeData(self.dataBuffer)
257
258        # There is no writeSomeData implementation in Twisted which returns
259        # < 0, but the documentation for writeSomeData used to claim negative
260        # integers meant connection lost.  Keep supporting this here,
261        # although it may be worth deprecating and removing at some point.
262        if isinstance(l, Exception) or l < 0:
263            return l
264        self.offset += l
265        # If there is nothing left to send,
266        if self.offset == len(self.dataBuffer) and not self._tempDataLen:
267            self.dataBuffer = b""
268            self.offset = 0
269            # stop writing.
270            self.stopWriting()
271            # If I've got a producer who is supposed to supply me with data,
272            if self.producer is not None and ((not self.streamingProducer)
273                                              or self.producerPaused):
274                # tell them to supply some more.
275                self.producerPaused = False
276                self.producer.resumeProducing()
277            elif self.disconnecting:
278                # But if I was previously asked to let the connection die, do
279                # so.
280                return self._postLoseConnection()
281            elif self._writeDisconnecting:
282                # I was previously asked to half-close the connection.  We
283                # set _writeDisconnected before calling handler, in case the
284                # handler calls loseConnection(), which will want to check for
285                # this attribute.
286                self._writeDisconnected = True
287                result = self._closeWriteConnection()
288                return result
289        return None
290
291    def _postLoseConnection(self):
292        """Called after a loseConnection(), when all data has been written.
293
294        Whatever this returns is then returned by doWrite.
295        """
296        # default implementation, telling reactor we're finished
297        return main.CONNECTION_DONE
298
299    def _closeWriteConnection(self):
300        # override in subclasses
301        pass
302
303    def writeConnectionLost(self, reason):
304        # in current code should never be called
305        self.connectionLost(reason)
306
307    def readConnectionLost(self, reason):
308        # override in subclasses
309        self.connectionLost(reason)
310
311
312    def _isSendBufferFull(self):
313        """
314        Determine whether the user-space send buffer for this transport is full
315        or not.
316
317        When the buffer contains more than C{self.bufferSize} bytes, it is
318        considered full.  This might be improved by considering the size of the
319        kernel send buffer and how much of it is free.
320
321        @return: C{True} if it is full, C{False} otherwise.
322        """
323        return len(self.dataBuffer) + self._tempDataLen > self.bufferSize
324
325
326    def _maybePauseProducer(self):
327        """
328        Possibly pause a producer, if there is one and the send buffer is full.
329        """
330        # If we are responsible for pausing our producer,
331        if self.producer is not None and self.streamingProducer:
332            # and our buffer is full,
333            if self._isSendBufferFull():
334                # pause it.
335                self.producerPaused = True
336                self.producer.pauseProducing()
337
338
339    def write(self, data):
340        """Reliably write some data.
341
342        The data is buffered until the underlying file descriptor is ready
343        for writing. If there is more than C{self.bufferSize} data in the
344        buffer and this descriptor has a registered streaming producer, its
345        C{pauseProducing()} method will be called.
346        """
347        if isinstance(data, unicode): # no, really, I mean it
348            raise TypeError("Data must not be unicode")
349        if not self.connected or self._writeDisconnected:
350            return
351        if data:
352            self._tempDataBuffer.append(data)
353            self._tempDataLen += len(data)
354            self._maybePauseProducer()
355            self.startWriting()
356
357
358    def writeSequence(self, iovec):
359        """
360        Reliably write a sequence of data.
361
362        Currently, this is a convenience method roughly equivalent to::
363
364            for chunk in iovec:
365                fd.write(chunk)
366
367        It may have a more efficient implementation at a later time or in a
368        different reactor.
369
370        As with the C{write()} method, if a buffer size limit is reached and a
371        streaming producer is registered, it will be paused until the buffered
372        data is written to the underlying file descriptor.
373        """
374        for i in iovec:
375            if isinstance(i, unicode): # no, really, I mean it
376                raise TypeError("Data must not be unicode")
377        if not self.connected or not iovec or self._writeDisconnected:
378            return
379        self._tempDataBuffer.extend(iovec)
380        for i in iovec:
381            self._tempDataLen += len(i)
382        self._maybePauseProducer()
383        self.startWriting()
384
385
386    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
387        """Close the connection at the next available opportunity.
388
389        Call this to cause this FileDescriptor to lose its connection.  It will
390        first write any data that it has buffered.
391
392        If there is data buffered yet to be written, this method will cause the
393        transport to lose its connection as soon as it's done flushing its
394        write buffer.  If you have a producer registered, the connection won't
395        be closed until the producer is finished. Therefore, make sure you
396        unregister your producer when it's finished, or the connection will
397        never close.
398        """
399
400        if self.connected and not self.disconnecting:
401            if self._writeDisconnected:
402                # doWrite won't trigger the connection close anymore
403                self.stopReading()
404                self.stopWriting()
405                self.connectionLost(_connDone)
406            else:
407                self.stopReading()
408                self.startWriting()
409                self.disconnecting = 1
410
411    def loseWriteConnection(self):
412        self._writeDisconnecting = True
413        self.startWriting()
414
415    def stopReading(self):
416        """Stop waiting for read availability.
417
418        Call this to remove this selectable from being notified when it is
419        ready for reading.
420        """
421        self.reactor.removeReader(self)
422
423    def stopWriting(self):
424        """Stop waiting for write availability.
425
426        Call this to remove this selectable from being notified when it is ready
427        for writing.
428        """
429        self.reactor.removeWriter(self)
430
431    def startReading(self):
432        """Start waiting for read availability.
433        """
434        self.reactor.addReader(self)
435
436    def startWriting(self):
437        """Start waiting for write availability.
438
439        Call this to have this FileDescriptor be notified whenever it is ready for
440        writing.
441        """
442        self.reactor.addWriter(self)
443
444    # Producer/consumer implementation
445
446    # first, the consumer stuff.  This requires no additional work, as
447    # any object you can write to can be a consumer, really.
448
449    producer = None
450    bufferSize = 2**2**2**2
451
452    def stopConsuming(self):
453        """Stop consuming data.
454
455        This is called when a producer has lost its connection, to tell the
456        consumer to go lose its connection (and break potential circular
457        references).
458        """
459        self.unregisterProducer()
460        self.loseConnection()
461
462    # producer interface implementation
463
464    def resumeProducing(self):
465        if self.connected and not self.disconnecting:
466            self.startReading()
467
468    def pauseProducing(self):
469        self.stopReading()
470
471    def stopProducing(self):
472        self.loseConnection()
473
474
475    def fileno(self):
476        """File Descriptor number for select().
477
478        This method must be overridden or assigned in subclasses to
479        indicate a valid file descriptor for the operating system.
480        """
481        return -1
482
483
484def isIPAddress(addr):
485    """
486    Determine whether the given string represents an IPv4 address.
487
488    @type addr: C{str}
489    @param addr: A string which may or may not be the decimal dotted
490    representation of an IPv4 address.
491
492    @rtype: C{bool}
493    @return: C{True} if C{addr} represents an IPv4 address, C{False}
494    otherwise.
495    """
496    dottedParts = addr.split('.')
497    if len(dottedParts) == 4:
498        for octet in dottedParts:
499            try:
500                value = int(octet)
501            except ValueError:
502                return False
503            else:
504                if value < 0 or value > 255:
505                    return False
506        return True
507    return False
508
509
510def isIPv6Address(addr):
511    """
512    Determine whether the given string represents an IPv6 address.
513
514    @param addr: A string which may or may not be the hex
515        representation of an IPv6 address.
516    @type addr: C{str}
517
518    @return: C{True} if C{addr} represents an IPv6 address, C{False}
519        otherwise.
520    @rtype: C{bool}
521    """
522    if '%' in addr:
523        addr = addr.split('%', 1)[0]
524    if not addr:
525        return False
526    try:
527        # This might be a native implementation or the one from
528        # twisted.python.compat.
529        inet_pton(AF_INET6, addr)
530    except (ValueError, error):
531        return False
532    return True
533
534
535__all__ = ["FileDescriptor", "isIPAddress", "isIPv6Address"]
536