1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5An epoll() based implementation of the twisted main loop. 6 7To install the event loop (and you should do this before any connections, 8listeners or connectors are added):: 9 10 from twisted.internet import epollreactor 11 epollreactor.install() 12""" 13 14from __future__ import division, absolute_import 15 16from select import epoll, EPOLLHUP, EPOLLERR, EPOLLIN, EPOLLOUT 17import errno 18 19from zope.interface import implementer 20 21from twisted.internet.interfaces import IReactorFDSet 22 23from twisted.python import log 24from twisted.internet import posixbase 25 26 27 28@implementer(IReactorFDSet) 29class _ContinuousPolling(posixbase._PollLikeMixin, 30 posixbase._DisconnectSelectableMixin): 31 """ 32 Schedule reads and writes based on the passage of time, rather than 33 notification. 34 35 This is useful for supporting polling filesystem files, which C{epoll(7)} 36 does not support. 37 38 The implementation uses L{posixbase._PollLikeMixin}, which is a bit hacky, 39 but re-implementing and testing the relevant code yet again is 40 unappealing. 41 42 @ivar _reactor: The L{EPollReactor} that is using this instance. 43 44 @ivar _loop: A C{LoopingCall} that drives the polling, or C{None}. 45 46 @ivar _readers: A C{set} of C{FileDescriptor} objects that should be read 47 from. 48 49 @ivar _writers: A C{set} of C{FileDescriptor} objects that should be 50 written to. 51 """ 52 53 # Attributes for _PollLikeMixin 54 _POLL_DISCONNECTED = 1 55 _POLL_IN = 2 56 _POLL_OUT = 4 57 58 59 def __init__(self, reactor): 60 self._reactor = reactor 61 self._loop = None 62 self._readers = set() 63 self._writers = set() 64 65 66 def _checkLoop(self): 67 """ 68 Start or stop a C{LoopingCall} based on whether there are readers and 69 writers. 70 """ 71 if self._readers or self._writers: 72 if self._loop is None: 73 from twisted.internet.task import LoopingCall, _EPSILON 74 self._loop = LoopingCall(self.iterate) 75 self._loop.clock = self._reactor 76 # LoopingCall seems unhappy with timeout of 0, so use very 77 # small number: 78 self._loop.start(_EPSILON, now=False) 79 elif self._loop: 80 self._loop.stop() 81 self._loop = None 82 83 84 def iterate(self): 85 """ 86 Call C{doRead} and C{doWrite} on all readers and writers respectively. 87 """ 88 for reader in list(self._readers): 89 self._doReadOrWrite(reader, reader, self._POLL_IN) 90 for reader in list(self._writers): 91 self._doReadOrWrite(reader, reader, self._POLL_OUT) 92 93 94 def addReader(self, reader): 95 """ 96 Add a C{FileDescriptor} for notification of data available to read. 97 """ 98 self._readers.add(reader) 99 self._checkLoop() 100 101 102 def addWriter(self, writer): 103 """ 104 Add a C{FileDescriptor} for notification of data available to write. 105 """ 106 self._writers.add(writer) 107 self._checkLoop() 108 109 110 def removeReader(self, reader): 111 """ 112 Remove a C{FileDescriptor} from notification of data available to read. 113 """ 114 try: 115 self._readers.remove(reader) 116 except KeyError: 117 return 118 self._checkLoop() 119 120 121 def removeWriter(self, writer): 122 """ 123 Remove a C{FileDescriptor} from notification of data available to 124 write. 125 """ 126 try: 127 self._writers.remove(writer) 128 except KeyError: 129 return 130 self._checkLoop() 131 132 133 def removeAll(self): 134 """ 135 Remove all readers and writers. 136 """ 137 result = list(self._readers | self._writers) 138 # Don't reset to new value, since self.isWriting and .isReading refer 139 # to the existing instance: 140 self._readers.clear() 141 self._writers.clear() 142 return result 143 144 145 def getReaders(self): 146 """ 147 Return a list of the readers. 148 """ 149 return list(self._readers) 150 151 152 def getWriters(self): 153 """ 154 Return a list of the writers. 155 """ 156 return list(self._writers) 157 158 159 def isReading(self, fd): 160 """ 161 Checks if the file descriptor is currently being observed for read 162 readiness. 163 164 @param fd: The file descriptor being checked. 165 @type fd: L{twisted.internet.abstract.FileDescriptor} 166 @return: C{True} if the file descriptor is being observed for read 167 readiness, C{False} otherwise. 168 @rtype: C{bool} 169 """ 170 return fd in self._readers 171 172 173 def isWriting(self, fd): 174 """ 175 Checks if the file descriptor is currently being observed for write 176 readiness. 177 178 @param fd: The file descriptor being checked. 179 @type fd: L{twisted.internet.abstract.FileDescriptor} 180 @return: C{True} if the file descriptor is being observed for write 181 readiness, C{False} otherwise. 182 @rtype: C{bool} 183 """ 184 return fd in self._writers 185 186 187 188@implementer(IReactorFDSet) 189class EPollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin): 190 """ 191 A reactor that uses epoll(7). 192 193 @ivar _poller: A C{epoll} which will be used to check for I/O 194 readiness. 195 196 @ivar _selectables: A dictionary mapping integer file descriptors to 197 instances of C{FileDescriptor} which have been registered with the 198 reactor. All C{FileDescriptors} which are currently receiving read or 199 write readiness notifications will be present as values in this 200 dictionary. 201 202 @ivar _reads: A set containing integer file descriptors. Values in this 203 set will be registered with C{_poller} for read readiness notifications 204 which will be dispatched to the corresponding C{FileDescriptor} 205 instances in C{_selectables}. 206 207 @ivar _writes: A set containing integer file descriptors. Values in this 208 set will be registered with C{_poller} for write readiness 209 notifications which will be dispatched to the corresponding 210 C{FileDescriptor} instances in C{_selectables}. 211 212 @ivar _continuousPolling: A L{_ContinuousPolling} instance, used to handle 213 file descriptors (e.g. filesytem files) that are not supported by 214 C{epoll(7)}. 215 """ 216 217 # Attributes for _PollLikeMixin 218 _POLL_DISCONNECTED = (EPOLLHUP | EPOLLERR) 219 _POLL_IN = EPOLLIN 220 _POLL_OUT = EPOLLOUT 221 222 def __init__(self): 223 """ 224 Initialize epoll object, file descriptor tracking dictionaries, and the 225 base class. 226 """ 227 # Create the poller we're going to use. The 1024 here is just a hint 228 # to the kernel, it is not a hard maximum. After Linux 2.6.8, the size 229 # argument is completely ignored. 230 self._poller = epoll(1024) 231 self._reads = set() 232 self._writes = set() 233 self._selectables = {} 234 self._continuousPolling = _ContinuousPolling(self) 235 posixbase.PosixReactorBase.__init__(self) 236 237 238 def _add(self, xer, primary, other, selectables, event, antievent): 239 """ 240 Private method for adding a descriptor from the event loop. 241 242 It takes care of adding it if new or modifying it if already added 243 for another state (read -> read/write for example). 244 """ 245 fd = xer.fileno() 246 if fd not in primary: 247 flags = event 248 # epoll_ctl can raise all kinds of IOErrors, and every one 249 # indicates a bug either in the reactor or application-code. 250 # Let them all through so someone sees a traceback and fixes 251 # something. We'll do the same thing for every other call to 252 # this method in this file. 253 if fd in other: 254 flags |= antievent 255 self._poller.modify(fd, flags) 256 else: 257 self._poller.register(fd, flags) 258 259 # Update our own tracking state *only* after the epoll call has 260 # succeeded. Otherwise we may get out of sync. 261 primary.add(fd) 262 selectables[fd] = xer 263 264 265 def addReader(self, reader): 266 """ 267 Add a FileDescriptor for notification of data available to read. 268 """ 269 try: 270 self._add(reader, self._reads, self._writes, self._selectables, 271 EPOLLIN, EPOLLOUT) 272 except IOError as e: 273 if e.errno == errno.EPERM: 274 # epoll(7) doesn't support certain file descriptors, 275 # e.g. filesystem files, so for those we just poll 276 # continuously: 277 self._continuousPolling.addReader(reader) 278 else: 279 raise 280 281 282 def addWriter(self, writer): 283 """ 284 Add a FileDescriptor for notification of data available to write. 285 """ 286 try: 287 self._add(writer, self._writes, self._reads, self._selectables, 288 EPOLLOUT, EPOLLIN) 289 except IOError as e: 290 if e.errno == errno.EPERM: 291 # epoll(7) doesn't support certain file descriptors, 292 # e.g. filesystem files, so for those we just poll 293 # continuously: 294 self._continuousPolling.addWriter(writer) 295 else: 296 raise 297 298 299 def _remove(self, xer, primary, other, selectables, event, antievent): 300 """ 301 Private method for removing a descriptor from the event loop. 302 303 It does the inverse job of _add, and also add a check in case of the fd 304 has gone away. 305 """ 306 fd = xer.fileno() 307 if fd == -1: 308 for fd, fdes in selectables.items(): 309 if xer is fdes: 310 break 311 else: 312 return 313 if fd in primary: 314 if fd in other: 315 flags = antievent 316 # See comment above modify call in _add. 317 self._poller.modify(fd, flags) 318 else: 319 del selectables[fd] 320 # See comment above _control call in _add. 321 self._poller.unregister(fd) 322 primary.remove(fd) 323 324 325 def removeReader(self, reader): 326 """ 327 Remove a Selectable for notification of data available to read. 328 """ 329 if self._continuousPolling.isReading(reader): 330 self._continuousPolling.removeReader(reader) 331 return 332 self._remove(reader, self._reads, self._writes, self._selectables, 333 EPOLLIN, EPOLLOUT) 334 335 336 def removeWriter(self, writer): 337 """ 338 Remove a Selectable for notification of data available to write. 339 """ 340 if self._continuousPolling.isWriting(writer): 341 self._continuousPolling.removeWriter(writer) 342 return 343 self._remove(writer, self._writes, self._reads, self._selectables, 344 EPOLLOUT, EPOLLIN) 345 346 347 def removeAll(self): 348 """ 349 Remove all selectables, and return a list of them. 350 """ 351 return (self._removeAll( 352 [self._selectables[fd] for fd in self._reads], 353 [self._selectables[fd] for fd in self._writes]) + 354 self._continuousPolling.removeAll()) 355 356 357 def getReaders(self): 358 return ([self._selectables[fd] for fd in self._reads] + 359 self._continuousPolling.getReaders()) 360 361 362 def getWriters(self): 363 return ([self._selectables[fd] for fd in self._writes] + 364 self._continuousPolling.getWriters()) 365 366 367 def doPoll(self, timeout): 368 """ 369 Poll the poller for new events. 370 """ 371 if timeout is None: 372 timeout = -1 # Wait indefinitely. 373 374 try: 375 # Limit the number of events to the number of io objects we're 376 # currently tracking (because that's maybe a good heuristic) and 377 # the amount of time we block to the value specified by our 378 # caller. 379 l = self._poller.poll(timeout, len(self._selectables)) 380 except IOError as err: 381 if err.errno == errno.EINTR: 382 return 383 # See epoll_wait(2) for documentation on the other conditions 384 # under which this can fail. They can only be due to a serious 385 # programming error on our part, so let's just announce them 386 # loudly. 387 raise 388 389 _drdw = self._doReadOrWrite 390 for fd, event in l: 391 try: 392 selectable = self._selectables[fd] 393 except KeyError: 394 pass 395 else: 396 log.callWithLogger(selectable, _drdw, selectable, fd, event) 397 398 doIteration = doPoll 399 400 401def install(): 402 """ 403 Install the epoll() reactor. 404 """ 405 p = EPollReactor() 406 from twisted.internet.main import installReactor 407 installReactor(p) 408 409 410__all__ = ["EPollReactor", "install"] 411