1# -*- test-case-name: twisted.test.test_internet -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Select reactor 7""" 8 9 10import select 11import sys 12from errno import EBADF, EINTR 13from time import sleep 14from typing import Type 15 16from zope.interface import implementer 17 18from twisted.internet import posixbase 19from twisted.internet.interfaces import IReactorFDSet 20from twisted.python import log 21from twisted.python.runtime import platformType 22 23 24def win32select(r, w, e, timeout=None): 25 """Win32 select wrapper.""" 26 if not (r or w): 27 # windows select() exits immediately when no sockets 28 if timeout is None: 29 timeout = 0.01 30 else: 31 timeout = min(timeout, 0.001) 32 sleep(timeout) 33 return [], [], [] 34 # windows doesn't process 'signals' inside select(), so we set a max 35 # time or ctrl-c will never be recognized 36 if timeout is None or timeout > 0.5: 37 timeout = 0.5 38 r, w, e = select.select(r, w, w, timeout) 39 return r, w + e, [] 40 41 42if platformType == "win32": 43 _select = win32select 44else: 45 _select = select.select 46 47 48try: 49 from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin 50except ImportError: 51 _extraBase: Type[object] = object 52else: 53 _extraBase = _ThreadedWin32EventsMixin 54 55 56@implementer(IReactorFDSet) 57class SelectReactor(posixbase.PosixReactorBase, _extraBase): # type: ignore[misc,valid-type] 58 """ 59 A select() based reactor - runs on all POSIX platforms and on Win32. 60 61 @ivar _reads: A set containing L{FileDescriptor} instances which will be 62 checked for read events. 63 64 @ivar _writes: A set containing L{FileDescriptor} instances which will be 65 checked for writability. 66 """ 67 68 def __init__(self): 69 """ 70 Initialize file descriptor tracking dictionaries and the base class. 71 """ 72 self._reads = set() 73 self._writes = set() 74 posixbase.PosixReactorBase.__init__(self) 75 76 def _preenDescriptors(self): 77 log.msg("Malformed file descriptor found. Preening lists.") 78 readers = list(self._reads) 79 writers = list(self._writes) 80 self._reads.clear() 81 self._writes.clear() 82 for selSet, selList in ((self._reads, readers), (self._writes, writers)): 83 for selectable in selList: 84 try: 85 select.select([selectable], [selectable], [selectable], 0) 86 except Exception as e: 87 log.msg("bad descriptor %s" % selectable) 88 self._disconnectSelectable(selectable, e, False) 89 else: 90 selSet.add(selectable) 91 92 def doSelect(self, timeout): 93 """ 94 Run one iteration of the I/O monitor loop. 95 96 This will run all selectables who had input or output readiness 97 waiting for them. 98 """ 99 try: 100 r, w, ignored = _select(self._reads, self._writes, [], timeout) 101 except ValueError: 102 # Possibly a file descriptor has gone negative? 103 self._preenDescriptors() 104 return 105 except TypeError: 106 # Something *totally* invalid (object w/o fileno, non-integral 107 # result) was passed 108 log.err() 109 self._preenDescriptors() 110 return 111 except OSError as se: 112 # select(2) encountered an error, perhaps while calling the fileno() 113 # method of a socket. (Python 2.6 socket.error is an IOError 114 # subclass, but on Python 2.5 and earlier it is not.) 115 if se.args[0] in (0, 2): 116 # windows does this if it got an empty list 117 if (not self._reads) and (not self._writes): 118 return 119 else: 120 raise 121 elif se.args[0] == EINTR: 122 return 123 elif se.args[0] == EBADF: 124 self._preenDescriptors() 125 return 126 else: 127 # OK, I really don't know what's going on. Blow up. 128 raise 129 130 _drdw = self._doReadOrWrite 131 _logrun = log.callWithLogger 132 for selectables, method, fdset in ( 133 (r, "doRead", self._reads), 134 (w, "doWrite", self._writes), 135 ): 136 for selectable in selectables: 137 # if this was disconnected in another thread, kill it. 138 # ^^^^ --- what the !@#*? serious! -exarkun 139 if selectable not in fdset: 140 continue 141 # This for pausing input when we're not ready for more. 142 _logrun(selectable, _drdw, selectable, method) 143 144 doIteration = doSelect 145 146 def _doReadOrWrite(self, selectable, method): 147 try: 148 why = getattr(selectable, method)() 149 except BaseException: 150 why = sys.exc_info()[1] 151 log.err() 152 if why: 153 self._disconnectSelectable(selectable, why, method == "doRead") 154 155 def addReader(self, reader): 156 """ 157 Add a FileDescriptor for notification of data available to read. 158 """ 159 self._reads.add(reader) 160 161 def addWriter(self, writer): 162 """ 163 Add a FileDescriptor for notification of data available to write. 164 """ 165 self._writes.add(writer) 166 167 def removeReader(self, reader): 168 """ 169 Remove a Selectable for notification of data available to read. 170 """ 171 self._reads.discard(reader) 172 173 def removeWriter(self, writer): 174 """ 175 Remove a Selectable for notification of data available to write. 176 """ 177 self._writes.discard(writer) 178 179 def removeAll(self): 180 return self._removeAll(self._reads, self._writes) 181 182 def getReaders(self): 183 return list(self._reads) 184 185 def getWriters(self): 186 return list(self._writes) 187 188 189def install(): 190 """Configure the twisted mainloop to be run using the select() reactor.""" 191 reactor = SelectReactor() 192 from twisted.internet.main import installReactor 193 194 installReactor(reactor) 195 196 197__all__ = ["install"] 198