1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5An epoll() based implementation of the twisted main loop.
6
7To install the event loop (and you should do this before any connections,
8listeners or connectors are added)::
9
10    from twisted.internet import epollreactor
11    epollreactor.install()
12"""
13
14from __future__ import division, absolute_import
15
16from select import epoll, EPOLLHUP, EPOLLERR, EPOLLIN, EPOLLOUT
17import errno
18
19from zope.interface import implementer
20
21from twisted.internet.interfaces import IReactorFDSet
22
23from twisted.python import log
24from twisted.internet import posixbase
25
26
27
28@implementer(IReactorFDSet)
29class _ContinuousPolling(posixbase._PollLikeMixin,
30                         posixbase._DisconnectSelectableMixin):
31    """
32    Schedule reads and writes based on the passage of time, rather than
33    notification.
34
35    This is useful for supporting polling filesystem files, which C{epoll(7)}
36    does not support.
37
38    The implementation uses L{posixbase._PollLikeMixin}, which is a bit hacky,
39    but re-implementing and testing the relevant code yet again is
40    unappealing.
41
42    @ivar _reactor: The L{EPollReactor} that is using this instance.
43
44    @ivar _loop: A C{LoopingCall} that drives the polling, or C{None}.
45
46    @ivar _readers: A C{set} of C{FileDescriptor} objects that should be read
47        from.
48
49    @ivar _writers: A C{set} of C{FileDescriptor} objects that should be
50        written to.
51    """
52
53    # Attributes for _PollLikeMixin
54    _POLL_DISCONNECTED = 1
55    _POLL_IN = 2
56    _POLL_OUT = 4
57
58
59    def __init__(self, reactor):
60        self._reactor = reactor
61        self._loop = None
62        self._readers = set()
63        self._writers = set()
64
65
66    def _checkLoop(self):
67        """
68        Start or stop a C{LoopingCall} based on whether there are readers and
69        writers.
70        """
71        if self._readers or self._writers:
72            if self._loop is None:
73                from twisted.internet.task import LoopingCall, _EPSILON
74                self._loop = LoopingCall(self.iterate)
75                self._loop.clock = self._reactor
76                # LoopingCall seems unhappy with timeout of 0, so use very
77                # small number:
78                self._loop.start(_EPSILON, now=False)
79        elif self._loop:
80            self._loop.stop()
81            self._loop = None
82
83
84    def iterate(self):
85        """
86        Call C{doRead} and C{doWrite} on all readers and writers respectively.
87        """
88        for reader in list(self._readers):
89            self._doReadOrWrite(reader, reader, self._POLL_IN)
90        for reader in list(self._writers):
91            self._doReadOrWrite(reader, reader, self._POLL_OUT)
92
93
94    def addReader(self, reader):
95        """
96        Add a C{FileDescriptor} for notification of data available to read.
97        """
98        self._readers.add(reader)
99        self._checkLoop()
100
101
102    def addWriter(self, writer):
103        """
104        Add a C{FileDescriptor} for notification of data available to write.
105        """
106        self._writers.add(writer)
107        self._checkLoop()
108
109
110    def removeReader(self, reader):
111        """
112        Remove a C{FileDescriptor} from notification of data available to read.
113        """
114        try:
115            self._readers.remove(reader)
116        except KeyError:
117            return
118        self._checkLoop()
119
120
121    def removeWriter(self, writer):
122        """
123        Remove a C{FileDescriptor} from notification of data available to
124        write.
125        """
126        try:
127            self._writers.remove(writer)
128        except KeyError:
129            return
130        self._checkLoop()
131
132
133    def removeAll(self):
134        """
135        Remove all readers and writers.
136        """
137        result = list(self._readers | self._writers)
138        # Don't reset to new value, since self.isWriting and .isReading refer
139        # to the existing instance:
140        self._readers.clear()
141        self._writers.clear()
142        return result
143
144
145    def getReaders(self):
146        """
147        Return a list of the readers.
148        """
149        return list(self._readers)
150
151
152    def getWriters(self):
153        """
154        Return a list of the writers.
155        """
156        return list(self._writers)
157
158
159    def isReading(self, fd):
160        """
161        Checks if the file descriptor is currently being observed for read
162        readiness.
163
164        @param fd: The file descriptor being checked.
165        @type fd: L{twisted.internet.abstract.FileDescriptor}
166        @return: C{True} if the file descriptor is being observed for read
167            readiness, C{False} otherwise.
168        @rtype: C{bool}
169        """
170        return fd in self._readers
171
172
173    def isWriting(self, fd):
174        """
175        Checks if the file descriptor is currently being observed for write
176        readiness.
177
178        @param fd: The file descriptor being checked.
179        @type fd: L{twisted.internet.abstract.FileDescriptor}
180        @return: C{True} if the file descriptor is being observed for write
181            readiness, C{False} otherwise.
182        @rtype: C{bool}
183        """
184        return fd in self._writers
185
186
187
188@implementer(IReactorFDSet)
189class EPollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):
190    """
191    A reactor that uses epoll(7).
192
193    @ivar _poller: A C{epoll} which will be used to check for I/O
194        readiness.
195
196    @ivar _selectables: A dictionary mapping integer file descriptors to
197        instances of C{FileDescriptor} which have been registered with the
198        reactor.  All C{FileDescriptors} which are currently receiving read or
199        write readiness notifications will be present as values in this
200        dictionary.
201
202    @ivar _reads: A set containing integer file descriptors.  Values in this
203        set will be registered with C{_poller} for read readiness notifications
204        which will be dispatched to the corresponding C{FileDescriptor}
205        instances in C{_selectables}.
206
207    @ivar _writes: A set containing integer file descriptors.  Values in this
208        set will be registered with C{_poller} for write readiness
209        notifications which will be dispatched to the corresponding
210        C{FileDescriptor} instances in C{_selectables}.
211
212    @ivar _continuousPolling: A L{_ContinuousPolling} instance, used to handle
213        file descriptors (e.g. filesytem files) that are not supported by
214        C{epoll(7)}.
215    """
216
217    # Attributes for _PollLikeMixin
218    _POLL_DISCONNECTED = (EPOLLHUP | EPOLLERR)
219    _POLL_IN = EPOLLIN
220    _POLL_OUT = EPOLLOUT
221
222    def __init__(self):
223        """
224        Initialize epoll object, file descriptor tracking dictionaries, and the
225        base class.
226        """
227        # Create the poller we're going to use.  The 1024 here is just a hint
228        # to the kernel, it is not a hard maximum.  After Linux 2.6.8, the size
229        # argument is completely ignored.
230        self._poller = epoll(1024)
231        self._reads = set()
232        self._writes = set()
233        self._selectables = {}
234        self._continuousPolling = _ContinuousPolling(self)
235        posixbase.PosixReactorBase.__init__(self)
236
237
238    def _add(self, xer, primary, other, selectables, event, antievent):
239        """
240        Private method for adding a descriptor from the event loop.
241
242        It takes care of adding it if  new or modifying it if already added
243        for another state (read -> read/write for example).
244        """
245        fd = xer.fileno()
246        if fd not in primary:
247            flags = event
248            # epoll_ctl can raise all kinds of IOErrors, and every one
249            # indicates a bug either in the reactor or application-code.
250            # Let them all through so someone sees a traceback and fixes
251            # something.  We'll do the same thing for every other call to
252            # this method in this file.
253            if fd in other:
254                flags |= antievent
255                self._poller.modify(fd, flags)
256            else:
257                self._poller.register(fd, flags)
258
259            # Update our own tracking state *only* after the epoll call has
260            # succeeded.  Otherwise we may get out of sync.
261            primary.add(fd)
262            selectables[fd] = xer
263
264
265    def addReader(self, reader):
266        """
267        Add a FileDescriptor for notification of data available to read.
268        """
269        try:
270            self._add(reader, self._reads, self._writes, self._selectables,
271                      EPOLLIN, EPOLLOUT)
272        except IOError as e:
273            if e.errno == errno.EPERM:
274                # epoll(7) doesn't support certain file descriptors,
275                # e.g. filesystem files, so for those we just poll
276                # continuously:
277                self._continuousPolling.addReader(reader)
278            else:
279                raise
280
281
282    def addWriter(self, writer):
283        """
284        Add a FileDescriptor for notification of data available to write.
285        """
286        try:
287            self._add(writer, self._writes, self._reads, self._selectables,
288                      EPOLLOUT, EPOLLIN)
289        except IOError as e:
290            if e.errno == errno.EPERM:
291                # epoll(7) doesn't support certain file descriptors,
292                # e.g. filesystem files, so for those we just poll
293                # continuously:
294                self._continuousPolling.addWriter(writer)
295            else:
296                raise
297
298
299    def _remove(self, xer, primary, other, selectables, event, antievent):
300        """
301        Private method for removing a descriptor from the event loop.
302
303        It does the inverse job of _add, and also add a check in case of the fd
304        has gone away.
305        """
306        fd = xer.fileno()
307        if fd == -1:
308            for fd, fdes in selectables.items():
309                if xer is fdes:
310                    break
311            else:
312                return
313        if fd in primary:
314            if fd in other:
315                flags = antievent
316                # See comment above modify call in _add.
317                self._poller.modify(fd, flags)
318            else:
319                del selectables[fd]
320                # See comment above _control call in _add.
321                self._poller.unregister(fd)
322            primary.remove(fd)
323
324
325    def removeReader(self, reader):
326        """
327        Remove a Selectable for notification of data available to read.
328        """
329        if self._continuousPolling.isReading(reader):
330            self._continuousPolling.removeReader(reader)
331            return
332        self._remove(reader, self._reads, self._writes, self._selectables,
333                     EPOLLIN, EPOLLOUT)
334
335
336    def removeWriter(self, writer):
337        """
338        Remove a Selectable for notification of data available to write.
339        """
340        if self._continuousPolling.isWriting(writer):
341            self._continuousPolling.removeWriter(writer)
342            return
343        self._remove(writer, self._writes, self._reads, self._selectables,
344                     EPOLLOUT, EPOLLIN)
345
346
347    def removeAll(self):
348        """
349        Remove all selectables, and return a list of them.
350        """
351        return (self._removeAll(
352                [self._selectables[fd] for fd in self._reads],
353                [self._selectables[fd] for fd in self._writes]) +
354                self._continuousPolling.removeAll())
355
356
357    def getReaders(self):
358        return ([self._selectables[fd] for fd in self._reads] +
359                self._continuousPolling.getReaders())
360
361
362    def getWriters(self):
363        return ([self._selectables[fd] for fd in self._writes] +
364                self._continuousPolling.getWriters())
365
366
367    def doPoll(self, timeout):
368        """
369        Poll the poller for new events.
370        """
371        if timeout is None:
372            timeout = -1  # Wait indefinitely.
373
374        try:
375            # Limit the number of events to the number of io objects we're
376            # currently tracking (because that's maybe a good heuristic) and
377            # the amount of time we block to the value specified by our
378            # caller.
379            l = self._poller.poll(timeout, len(self._selectables))
380        except IOError as err:
381            if err.errno == errno.EINTR:
382                return
383            # See epoll_wait(2) for documentation on the other conditions
384            # under which this can fail.  They can only be due to a serious
385            # programming error on our part, so let's just announce them
386            # loudly.
387            raise
388
389        _drdw = self._doReadOrWrite
390        for fd, event in l:
391            try:
392                selectable = self._selectables[fd]
393            except KeyError:
394                pass
395            else:
396                log.callWithLogger(selectable, _drdw, selectable, fd, event)
397
398    doIteration = doPoll
399
400
401def install():
402    """
403    Install the epoll() reactor.
404    """
405    p = EPollReactor()
406    from twisted.internet.main import installReactor
407    installReactor(p)
408
409
410__all__ = ["EPollReactor", "install"]
411