1# -*- test-case-name: twisted.test.test_kqueuereactor -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6A kqueue()/kevent() based implementation of the Twisted main loop.
7
8To use this reactor, start your application specifying the kqueue reactor::
9
10   twistd --reactor kqueue ...
11
12To install the event loop from code (and you should do this before any
13connections, listeners or connectors are added)::
14
15   from twisted.internet import kqreactor
16   kqreactor.install()
17"""
18
19import errno
20import select
21
22from zope.interface import Attribute, Interface, declarations, implementer
23
24from twisted.internet import main, posixbase
25from twisted.internet.interfaces import IReactorDaemonize, IReactorFDSet
26from twisted.python import failure, log
27
28try:
29    # This is to keep mypy from complaining
30    # We don't use type: ignore[attr-defined] on import, because mypy only complains
31    # on on some platforms, and then the unused ignore is an issue if the undefined
32    # attribute isn't.
33    KQ_EV_ADD = getattr(select, "KQ_EV_ADD")
34    KQ_EV_DELETE = getattr(select, "KQ_EV_DELETE")
35    KQ_EV_EOF = getattr(select, "KQ_EV_EOF")
36    KQ_FILTER_READ = getattr(select, "KQ_FILTER_READ")
37    KQ_FILTER_WRITE = getattr(select, "KQ_FILTER_WRITE")
38except AttributeError as e:
39    raise ImportError(e)
40
41
42class _IKQueue(Interface):
43    """
44    An interface for KQueue implementations.
45    """
46
47    kqueue = Attribute("An implementation of kqueue(2).")
48    kevent = Attribute("An implementation of kevent(2).")
49
50
51declarations.directlyProvides(select, _IKQueue)
52
53
54@implementer(IReactorFDSet, IReactorDaemonize)
55class KQueueReactor(posixbase.PosixReactorBase):
56    """
57    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
58    which has built in support for kqueue in the select module.
59
60    @ivar _kq: A C{kqueue} which will be used to check for I/O readiness.
61
62    @ivar _impl: The implementation of L{_IKQueue} to use.
63
64    @ivar _selectables: A dictionary mapping integer file descriptors to
65        instances of L{FileDescriptor} which have been registered with the
66        reactor.  All L{FileDescriptor}s which are currently receiving read or
67        write readiness notifications will be present as values in this
68        dictionary.
69
70    @ivar _reads: A set containing integer file descriptors.  Values in this
71        set will be registered with C{_kq} for read readiness notifications
72        which will be dispatched to the corresponding L{FileDescriptor}
73        instances in C{_selectables}.
74
75    @ivar _writes: A set containing integer file descriptors.  Values in this
76        set will be registered with C{_kq} for write readiness notifications
77        which will be dispatched to the corresponding L{FileDescriptor}
78        instances in C{_selectables}.
79    """
80
81    def __init__(self, _kqueueImpl=select):
82        """
83        Initialize kqueue object, file descriptor tracking dictionaries, and
84        the base class.
85
86        See:
87            - http://docs.python.org/library/select.html
88            - www.freebsd.org/cgi/man.cgi?query=kqueue
89            - people.freebsd.org/~jlemon/papers/kqueue.pdf
90
91        @param _kqueueImpl: The implementation of L{_IKQueue} to use. A
92            hook for testing.
93        """
94        self._impl = _kqueueImpl
95        self._kq = self._impl.kqueue()
96        self._reads = set()
97        self._writes = set()
98        self._selectables = {}
99        posixbase.PosixReactorBase.__init__(self)
100
101    def _updateRegistration(self, fd, filter, op):
102        """
103        Private method for changing kqueue registration on a given FD
104        filtering for events given filter/op. This will never block and
105        returns nothing.
106        """
107        self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0)
108
109    def beforeDaemonize(self):
110        """
111        Implement L{IReactorDaemonize.beforeDaemonize}.
112        """
113        # Twisted-internal method called during daemonization (when application
114        # is started via twistd). This is called right before the magic double
115        # forking done for daemonization. We cleanly close the kqueue() and later
116        # recreate it. This is needed since a) kqueue() are not inherited across
117        # forks and b) twistd will create the reactor already before daemonization
118        # (and will also add at least 1 reader to the reactor, an instance of
119        # twisted.internet.posixbase._UnixWaker).
120        #
121        # See: twisted.scripts._twistd_unix.daemonize()
122        self._kq.close()
123        self._kq = None
124
125    def afterDaemonize(self):
126        """
127        Implement L{IReactorDaemonize.afterDaemonize}.
128        """
129        # Twisted-internal method called during daemonization. This is called right
130        # after daemonization and recreates the kqueue() and any readers/writers
131        # that were added before. Note that you MUST NOT call any reactor methods
132        # in between beforeDaemonize() and afterDaemonize()!
133        self._kq = self._impl.kqueue()
134        for fd in self._reads:
135            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
136        for fd in self._writes:
137            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
138
139    def addReader(self, reader):
140        """
141        Implement L{IReactorFDSet.addReader}.
142        """
143        fd = reader.fileno()
144        if fd not in self._reads:
145            try:
146                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
147            except OSError:
148                pass
149            finally:
150                self._selectables[fd] = reader
151                self._reads.add(fd)
152
153    def addWriter(self, writer):
154        """
155        Implement L{IReactorFDSet.addWriter}.
156        """
157        fd = writer.fileno()
158        if fd not in self._writes:
159            try:
160                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
161            except OSError:
162                pass
163            finally:
164                self._selectables[fd] = writer
165                self._writes.add(fd)
166
167    def removeReader(self, reader):
168        """
169        Implement L{IReactorFDSet.removeReader}.
170        """
171        wasLost = False
172        try:
173            fd = reader.fileno()
174        except BaseException:
175            fd = -1
176        if fd == -1:
177            for fd, fdes in self._selectables.items():
178                if reader is fdes:
179                    wasLost = True
180                    break
181            else:
182                return
183        if fd in self._reads:
184            self._reads.remove(fd)
185            if fd not in self._writes:
186                del self._selectables[fd]
187            if not wasLost:
188                try:
189                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
190                except OSError:
191                    pass
192
193    def removeWriter(self, writer):
194        """
195        Implement L{IReactorFDSet.removeWriter}.
196        """
197        wasLost = False
198        try:
199            fd = writer.fileno()
200        except BaseException:
201            fd = -1
202        if fd == -1:
203            for fd, fdes in self._selectables.items():
204                if writer is fdes:
205                    wasLost = True
206                    break
207            else:
208                return
209        if fd in self._writes:
210            self._writes.remove(fd)
211            if fd not in self._reads:
212                del self._selectables[fd]
213            if not wasLost:
214                try:
215                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
216                except OSError:
217                    pass
218
219    def removeAll(self):
220        """
221        Implement L{IReactorFDSet.removeAll}.
222        """
223        return self._removeAll(
224            [self._selectables[fd] for fd in self._reads],
225            [self._selectables[fd] for fd in self._writes],
226        )
227
228    def getReaders(self):
229        """
230        Implement L{IReactorFDSet.getReaders}.
231        """
232        return [self._selectables[fd] for fd in self._reads]
233
234    def getWriters(self):
235        """
236        Implement L{IReactorFDSet.getWriters}.
237        """
238        return [self._selectables[fd] for fd in self._writes]
239
240    def doKEvent(self, timeout):
241        """
242        Poll the kqueue for new events.
243        """
244        if timeout is None:
245            timeout = 1
246
247        try:
248            events = self._kq.control([], len(self._selectables), timeout)
249        except OSError as e:
250            # Since this command blocks for potentially a while, it's possible
251            # EINTR can be raised for various reasons (for example, if the user
252            # hits ^C).
253            if e.errno == errno.EINTR:
254                return
255            else:
256                raise
257
258        _drdw = self._doWriteOrRead
259        for event in events:
260            fd = event.ident
261            try:
262                selectable = self._selectables[fd]
263            except KeyError:
264                # Handles the infrequent case where one selectable's
265                # handler disconnects another.
266                continue
267            else:
268                log.callWithLogger(selectable, _drdw, selectable, fd, event)
269
270    def _doWriteOrRead(self, selectable, fd, event):
271        """
272        Private method called when a FD is ready for reading, writing or was
273        lost. Do the work and raise errors where necessary.
274        """
275        why = None
276        inRead = False
277        (filter, flags, data, fflags) = (
278            event.filter,
279            event.flags,
280            event.data,
281            event.fflags,
282        )
283
284        if flags & KQ_EV_EOF and data and fflags:
285            why = main.CONNECTION_LOST
286        else:
287            try:
288                if selectable.fileno() == -1:
289                    inRead = False
290                    why = posixbase._NO_FILEDESC
291                else:
292                    if filter == KQ_FILTER_READ:
293                        inRead = True
294                        why = selectable.doRead()
295                    if filter == KQ_FILTER_WRITE:
296                        inRead = False
297                        why = selectable.doWrite()
298            except BaseException:
299                # Any exception from application code gets logged and will
300                # cause us to disconnect the selectable.
301                why = failure.Failure()
302                log.err(
303                    why,
304                    "An exception was raised from application code"
305                    " while processing a reactor selectable",
306                )
307
308        if why:
309            self._disconnectSelectable(selectable, why, inRead)
310
311    doIteration = doKEvent
312
313
314def install():
315    """
316    Install the kqueue() reactor.
317    """
318    p = KQueueReactor()
319    from twisted.internet.main import installReactor
320
321    installReactor(p)
322
323
324__all__ = ["KQueueReactor", "install"]
325