1# -*- test-case-name: twisted.test.test_internet -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Select reactor
7"""
8
9
10import select
11import sys
12from errno import EBADF, EINTR
13from time import sleep
14from typing import Type
15
16from zope.interface import implementer
17
18from twisted.internet import posixbase
19from twisted.internet.interfaces import IReactorFDSet
20from twisted.python import log
21from twisted.python.runtime import platformType
22
23
24def win32select(r, w, e, timeout=None):
25    """Win32 select wrapper."""
26    if not (r or w):
27        # windows select() exits immediately when no sockets
28        if timeout is None:
29            timeout = 0.01
30        else:
31            timeout = min(timeout, 0.001)
32        sleep(timeout)
33        return [], [], []
34    # windows doesn't process 'signals' inside select(), so we set a max
35    # time or ctrl-c will never be recognized
36    if timeout is None or timeout > 0.5:
37        timeout = 0.5
38    r, w, e = select.select(r, w, w, timeout)
39    return r, w + e, []
40
41
42if platformType == "win32":
43    _select = win32select
44else:
45    _select = select.select
46
47
48try:
49    from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
50except ImportError:
51    _extraBase: Type[object] = object
52else:
53    _extraBase = _ThreadedWin32EventsMixin
54
55
56@implementer(IReactorFDSet)
57class SelectReactor(posixbase.PosixReactorBase, _extraBase):  # type: ignore[misc,valid-type]
58    """
59    A select() based reactor - runs on all POSIX platforms and on Win32.
60
61    @ivar _reads: A set containing L{FileDescriptor} instances which will be
62        checked for read events.
63
64    @ivar _writes: A set containing L{FileDescriptor} instances which will be
65        checked for writability.
66    """
67
68    def __init__(self):
69        """
70        Initialize file descriptor tracking dictionaries and the base class.
71        """
72        self._reads = set()
73        self._writes = set()
74        posixbase.PosixReactorBase.__init__(self)
75
76    def _preenDescriptors(self):
77        log.msg("Malformed file descriptor found.  Preening lists.")
78        readers = list(self._reads)
79        writers = list(self._writes)
80        self._reads.clear()
81        self._writes.clear()
82        for selSet, selList in ((self._reads, readers), (self._writes, writers)):
83            for selectable in selList:
84                try:
85                    select.select([selectable], [selectable], [selectable], 0)
86                except Exception as e:
87                    log.msg("bad descriptor %s" % selectable)
88                    self._disconnectSelectable(selectable, e, False)
89                else:
90                    selSet.add(selectable)
91
92    def doSelect(self, timeout):
93        """
94        Run one iteration of the I/O monitor loop.
95
96        This will run all selectables who had input or output readiness
97        waiting for them.
98        """
99        try:
100            r, w, ignored = _select(self._reads, self._writes, [], timeout)
101        except ValueError:
102            # Possibly a file descriptor has gone negative?
103            self._preenDescriptors()
104            return
105        except TypeError:
106            # Something *totally* invalid (object w/o fileno, non-integral
107            # result) was passed
108            log.err()
109            self._preenDescriptors()
110            return
111        except OSError as se:
112            # select(2) encountered an error, perhaps while calling the fileno()
113            # method of a socket.  (Python 2.6 socket.error is an IOError
114            # subclass, but on Python 2.5 and earlier it is not.)
115            if se.args[0] in (0, 2):
116                # windows does this if it got an empty list
117                if (not self._reads) and (not self._writes):
118                    return
119                else:
120                    raise
121            elif se.args[0] == EINTR:
122                return
123            elif se.args[0] == EBADF:
124                self._preenDescriptors()
125                return
126            else:
127                # OK, I really don't know what's going on.  Blow up.
128                raise
129
130        _drdw = self._doReadOrWrite
131        _logrun = log.callWithLogger
132        for selectables, method, fdset in (
133            (r, "doRead", self._reads),
134            (w, "doWrite", self._writes),
135        ):
136            for selectable in selectables:
137                # if this was disconnected in another thread, kill it.
138                # ^^^^ --- what the !@#*?  serious!  -exarkun
139                if selectable not in fdset:
140                    continue
141                # This for pausing input when we're not ready for more.
142                _logrun(selectable, _drdw, selectable, method)
143
144    doIteration = doSelect
145
146    def _doReadOrWrite(self, selectable, method):
147        try:
148            why = getattr(selectable, method)()
149        except BaseException:
150            why = sys.exc_info()[1]
151            log.err()
152        if why:
153            self._disconnectSelectable(selectable, why, method == "doRead")
154
155    def addReader(self, reader):
156        """
157        Add a FileDescriptor for notification of data available to read.
158        """
159        self._reads.add(reader)
160
161    def addWriter(self, writer):
162        """
163        Add a FileDescriptor for notification of data available to write.
164        """
165        self._writes.add(writer)
166
167    def removeReader(self, reader):
168        """
169        Remove a Selectable for notification of data available to read.
170        """
171        self._reads.discard(reader)
172
173    def removeWriter(self, writer):
174        """
175        Remove a Selectable for notification of data available to write.
176        """
177        self._writes.discard(writer)
178
179    def removeAll(self):
180        return self._removeAll(self._reads, self._writes)
181
182    def getReaders(self):
183        return list(self._reads)
184
185    def getWriters(self):
186        return list(self._writes)
187
188
189def install():
190    """Configure the twisted mainloop to be run using the select() reactor."""
191    reactor = SelectReactor()
192    from twisted.internet.main import installReactor
193
194    installReactor(reactor)
195
196
197__all__ = ["install"]
198