1# -*- test-case-name: twisted.test.test_kqueuereactor -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6A kqueue()/kevent() based implementation of the Twisted main loop. 7 8To use this reactor, start your application specifying the kqueue reactor:: 9 10 twistd --reactor kqueue ... 11 12To install the event loop from code (and you should do this before any 13connections, listeners or connectors are added):: 14 15 from twisted.internet import kqreactor 16 kqreactor.install() 17""" 18 19import errno 20import select 21 22from zope.interface import Attribute, Interface, declarations, implementer 23 24from twisted.internet import main, posixbase 25from twisted.internet.interfaces import IReactorDaemonize, IReactorFDSet 26from twisted.python import failure, log 27 28try: 29 # This is to keep mypy from complaining 30 # We don't use type: ignore[attr-defined] on import, because mypy only complains 31 # on on some platforms, and then the unused ignore is an issue if the undefined 32 # attribute isn't. 33 KQ_EV_ADD = getattr(select, "KQ_EV_ADD") 34 KQ_EV_DELETE = getattr(select, "KQ_EV_DELETE") 35 KQ_EV_EOF = getattr(select, "KQ_EV_EOF") 36 KQ_FILTER_READ = getattr(select, "KQ_FILTER_READ") 37 KQ_FILTER_WRITE = getattr(select, "KQ_FILTER_WRITE") 38except AttributeError as e: 39 raise ImportError(e) 40 41 42class _IKQueue(Interface): 43 """ 44 An interface for KQueue implementations. 45 """ 46 47 kqueue = Attribute("An implementation of kqueue(2).") 48 kevent = Attribute("An implementation of kevent(2).") 49 50 51declarations.directlyProvides(select, _IKQueue) 52 53 54@implementer(IReactorFDSet, IReactorDaemonize) 55class KQueueReactor(posixbase.PosixReactorBase): 56 """ 57 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 58 which has built in support for kqueue in the select module. 59 60 @ivar _kq: A C{kqueue} which will be used to check for I/O readiness. 61 62 @ivar _impl: The implementation of L{_IKQueue} to use. 63 64 @ivar _selectables: A dictionary mapping integer file descriptors to 65 instances of L{FileDescriptor} which have been registered with the 66 reactor. All L{FileDescriptor}s which are currently receiving read or 67 write readiness notifications will be present as values in this 68 dictionary. 69 70 @ivar _reads: A set containing integer file descriptors. Values in this 71 set will be registered with C{_kq} for read readiness notifications 72 which will be dispatched to the corresponding L{FileDescriptor} 73 instances in C{_selectables}. 74 75 @ivar _writes: A set containing integer file descriptors. Values in this 76 set will be registered with C{_kq} for write readiness notifications 77 which will be dispatched to the corresponding L{FileDescriptor} 78 instances in C{_selectables}. 79 """ 80 81 def __init__(self, _kqueueImpl=select): 82 """ 83 Initialize kqueue object, file descriptor tracking dictionaries, and 84 the base class. 85 86 See: 87 - http://docs.python.org/library/select.html 88 - www.freebsd.org/cgi/man.cgi?query=kqueue 89 - people.freebsd.org/~jlemon/papers/kqueue.pdf 90 91 @param _kqueueImpl: The implementation of L{_IKQueue} to use. A 92 hook for testing. 93 """ 94 self._impl = _kqueueImpl 95 self._kq = self._impl.kqueue() 96 self._reads = set() 97 self._writes = set() 98 self._selectables = {} 99 posixbase.PosixReactorBase.__init__(self) 100 101 def _updateRegistration(self, fd, filter, op): 102 """ 103 Private method for changing kqueue registration on a given FD 104 filtering for events given filter/op. This will never block and 105 returns nothing. 106 """ 107 self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0) 108 109 def beforeDaemonize(self): 110 """ 111 Implement L{IReactorDaemonize.beforeDaemonize}. 112 """ 113 # Twisted-internal method called during daemonization (when application 114 # is started via twistd). This is called right before the magic double 115 # forking done for daemonization. We cleanly close the kqueue() and later 116 # recreate it. This is needed since a) kqueue() are not inherited across 117 # forks and b) twistd will create the reactor already before daemonization 118 # (and will also add at least 1 reader to the reactor, an instance of 119 # twisted.internet.posixbase._UnixWaker). 120 # 121 # See: twisted.scripts._twistd_unix.daemonize() 122 self._kq.close() 123 self._kq = None 124 125 def afterDaemonize(self): 126 """ 127 Implement L{IReactorDaemonize.afterDaemonize}. 128 """ 129 # Twisted-internal method called during daemonization. This is called right 130 # after daemonization and recreates the kqueue() and any readers/writers 131 # that were added before. Note that you MUST NOT call any reactor methods 132 # in between beforeDaemonize() and afterDaemonize()! 133 self._kq = self._impl.kqueue() 134 for fd in self._reads: 135 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 136 for fd in self._writes: 137 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 138 139 def addReader(self, reader): 140 """ 141 Implement L{IReactorFDSet.addReader}. 142 """ 143 fd = reader.fileno() 144 if fd not in self._reads: 145 try: 146 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 147 except OSError: 148 pass 149 finally: 150 self._selectables[fd] = reader 151 self._reads.add(fd) 152 153 def addWriter(self, writer): 154 """ 155 Implement L{IReactorFDSet.addWriter}. 156 """ 157 fd = writer.fileno() 158 if fd not in self._writes: 159 try: 160 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 161 except OSError: 162 pass 163 finally: 164 self._selectables[fd] = writer 165 self._writes.add(fd) 166 167 def removeReader(self, reader): 168 """ 169 Implement L{IReactorFDSet.removeReader}. 170 """ 171 wasLost = False 172 try: 173 fd = reader.fileno() 174 except BaseException: 175 fd = -1 176 if fd == -1: 177 for fd, fdes in self._selectables.items(): 178 if reader is fdes: 179 wasLost = True 180 break 181 else: 182 return 183 if fd in self._reads: 184 self._reads.remove(fd) 185 if fd not in self._writes: 186 del self._selectables[fd] 187 if not wasLost: 188 try: 189 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 190 except OSError: 191 pass 192 193 def removeWriter(self, writer): 194 """ 195 Implement L{IReactorFDSet.removeWriter}. 196 """ 197 wasLost = False 198 try: 199 fd = writer.fileno() 200 except BaseException: 201 fd = -1 202 if fd == -1: 203 for fd, fdes in self._selectables.items(): 204 if writer is fdes: 205 wasLost = True 206 break 207 else: 208 return 209 if fd in self._writes: 210 self._writes.remove(fd) 211 if fd not in self._reads: 212 del self._selectables[fd] 213 if not wasLost: 214 try: 215 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 216 except OSError: 217 pass 218 219 def removeAll(self): 220 """ 221 Implement L{IReactorFDSet.removeAll}. 222 """ 223 return self._removeAll( 224 [self._selectables[fd] for fd in self._reads], 225 [self._selectables[fd] for fd in self._writes], 226 ) 227 228 def getReaders(self): 229 """ 230 Implement L{IReactorFDSet.getReaders}. 231 """ 232 return [self._selectables[fd] for fd in self._reads] 233 234 def getWriters(self): 235 """ 236 Implement L{IReactorFDSet.getWriters}. 237 """ 238 return [self._selectables[fd] for fd in self._writes] 239 240 def doKEvent(self, timeout): 241 """ 242 Poll the kqueue for new events. 243 """ 244 if timeout is None: 245 timeout = 1 246 247 try: 248 events = self._kq.control([], len(self._selectables), timeout) 249 except OSError as e: 250 # Since this command blocks for potentially a while, it's possible 251 # EINTR can be raised for various reasons (for example, if the user 252 # hits ^C). 253 if e.errno == errno.EINTR: 254 return 255 else: 256 raise 257 258 _drdw = self._doWriteOrRead 259 for event in events: 260 fd = event.ident 261 try: 262 selectable = self._selectables[fd] 263 except KeyError: 264 # Handles the infrequent case where one selectable's 265 # handler disconnects another. 266 continue 267 else: 268 log.callWithLogger(selectable, _drdw, selectable, fd, event) 269 270 def _doWriteOrRead(self, selectable, fd, event): 271 """ 272 Private method called when a FD is ready for reading, writing or was 273 lost. Do the work and raise errors where necessary. 274 """ 275 why = None 276 inRead = False 277 (filter, flags, data, fflags) = ( 278 event.filter, 279 event.flags, 280 event.data, 281 event.fflags, 282 ) 283 284 if flags & KQ_EV_EOF and data and fflags: 285 why = main.CONNECTION_LOST 286 else: 287 try: 288 if selectable.fileno() == -1: 289 inRead = False 290 why = posixbase._NO_FILEDESC 291 else: 292 if filter == KQ_FILTER_READ: 293 inRead = True 294 why = selectable.doRead() 295 if filter == KQ_FILTER_WRITE: 296 inRead = False 297 why = selectable.doWrite() 298 except BaseException: 299 # Any exception from application code gets logged and will 300 # cause us to disconnect the selectable. 301 why = failure.Failure() 302 log.err( 303 why, 304 "An exception was raised from application code" 305 " while processing a reactor selectable", 306 ) 307 308 if why: 309 self._disconnectSelectable(selectable, why, inRead) 310 311 doIteration = doKEvent 312 313 314def install(): 315 """ 316 Install the kqueue() reactor. 317 """ 318 p = KQueueReactor() 319 from twisted.internet.main import installReactor 320 321 installReactor(p) 322 323 324__all__ = ["KQueueReactor", "install"] 325