1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4
5"""
6A win32event based implementation of the Twisted main loop.
7
8This requires pywin32 (formerly win32all) or ActivePython to be installed.
9
10To install the event loop (and you should do this before any connections,
11listeners or connectors are added)::
12
13    from twisted.internet import win32eventreactor
14    win32eventreactor.install()
15
16LIMITATIONS:
17 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
18 2. Process running has some problems (see L{twisted.internet.process} docstring).
19
20
21TODO:
22 1. Event loop handling of writes is *very* problematic (this is causing failed tests).
23    Switch to doing it the correct way, whatever that means (see below).
24 2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
25 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
26
27
28ALTERNATIVE SOLUTIONS:
29 - IIRC, sockets can only be registered once. So we switch to a structure
30   like the poll() reactor, thus allowing us to deal with write events in
31   a decent fashion. This should allow us to pass tests, but we're still
32   limited to 64 events.
33
34Or:
35
36 - Instead of doing a reactor, we make this an addon to the select reactor.
37   The WFMO event loop runs in a separate thread. This means no need to maintain
38   separate code for networking, 64 event limit doesn't apply to sockets,
39   we can run processes and other win32 stuff in default event loop. The
40   only problem is that we're stuck with the icky socket based waker.
41   Another benefit is that this could be extended to support >64 events
42   in a simpler manner than the previous solution.
43
44The 2nd solution is probably what will get implemented.
45"""
46
47import sys
48
49# System imports
50import time
51from threading import Thread
52from weakref import WeakKeyDictionary
53
54from zope.interface import implementer
55
56# Win32 imports
57from win32file import (  # type: ignore[import]
58    FD_ACCEPT,
59    FD_CLOSE,
60    FD_CONNECT,
61    FD_READ,
62    WSAEventSelect,
63)
64
65try:
66    # WSAEnumNetworkEvents was added in pywin32 215
67    from win32file import WSAEnumNetworkEvents
68except ImportError:
69    import warnings
70
71    warnings.warn(
72        "Reliable disconnection notification requires pywin32 215 or later",
73        category=UserWarning,
74    )
75
76    def WSAEnumNetworkEvents(fd, event):
77        return {FD_READ}
78
79
80import win32gui  # type: ignore[import]
81from win32event import (  # type: ignore[import]
82    QS_ALLINPUT,
83    WAIT_OBJECT_0,
84    WAIT_TIMEOUT,
85    CreateEvent,
86    MsgWaitForMultipleObjects,
87)
88
89# Twisted imports
90from twisted.internet import posixbase
91from twisted.internet.interfaces import IReactorFDSet, IReactorWin32Events
92from twisted.internet.threads import blockingCallFromThread
93from twisted.python import failure, log, threadable
94
95
96@implementer(IReactorFDSet, IReactorWin32Events)
97class Win32Reactor(posixbase.PosixReactorBase):
98    """
99    Reactor that uses Win32 event APIs.
100
101    @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
102        win32 event object used to check for read events for that descriptor.
103
104    @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
105        arbitrary value.  Keys in this dictionary will be given a chance to
106        write out their data.
107
108    @ivar _events: A dictionary mapping win32 event object to tuples of
109        L{FileDescriptor} instances and event masks.
110
111    @ivar _closedAndReading: Along with C{_closedAndNotReading}, keeps track of
112        descriptors which have had close notification delivered from the OS but
113        which we have not finished reading data from.  MsgWaitForMultipleObjects
114        will only deliver close notification to us once, so we remember it in
115        these two dictionaries until we're ready to act on it.  The OS has
116        delivered close notification for each descriptor in this dictionary, and
117        the descriptors are marked as allowed to handle read events in the
118        reactor, so they can be processed.  When a descriptor is marked as not
119        allowed to handle read events in the reactor (ie, it is passed to
120        L{IReactorFDSet.removeReader}), it is moved out of this dictionary and
121        into C{_closedAndNotReading}.  The descriptors are keys in this
122        dictionary.  The values are arbitrary.
123    @type _closedAndReading: C{dict}
124
125    @ivar _closedAndNotReading: These descriptors have had close notification
126        delivered from the OS, but are not marked as allowed to handle read
127        events in the reactor.  They are saved here to record their closed
128        state, but not processed at all.  When one of these descriptors is
129        passed to L{IReactorFDSet.addReader}, it is moved out of this dictionary
130        and into C{_closedAndReading}.  The descriptors are keys in this
131        dictionary.  The values are arbitrary.  This is a weak key dictionary so
132        that if an application tells the reactor to stop reading from a
133        descriptor and then forgets about that descriptor itself, the reactor
134        will also forget about it.
135    @type _closedAndNotReading: C{WeakKeyDictionary}
136    """
137
138    dummyEvent = CreateEvent(None, 0, 0, None)
139
140    def __init__(self):
141        self._reads = {}
142        self._writes = {}
143        self._events = {}
144        self._closedAndReading = {}
145        self._closedAndNotReading = WeakKeyDictionary()
146        posixbase.PosixReactorBase.__init__(self)
147
148    def _makeSocketEvent(self, fd, action, why):
149        """
150        Make a win32 event object for a socket.
151        """
152        event = CreateEvent(None, 0, 0, None)
153        WSAEventSelect(fd, event, why)
154        self._events[event] = (fd, action)
155        return event
156
157    def addEvent(self, event, fd, action):
158        """
159        Add a new win32 event to the event loop.
160        """
161        self._events[event] = (fd, action)
162
163    def removeEvent(self, event):
164        """
165        Remove an event.
166        """
167        del self._events[event]
168
169    def addReader(self, reader):
170        """
171        Add a socket FileDescriptor for notification of data available to read.
172        """
173        if reader not in self._reads:
174            self._reads[reader] = self._makeSocketEvent(
175                reader, "doRead", FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE
176            )
177            # If the reader is closed, move it over to the dictionary of reading
178            # descriptors.
179            if reader in self._closedAndNotReading:
180                self._closedAndReading[reader] = True
181                del self._closedAndNotReading[reader]
182
183    def addWriter(self, writer):
184        """
185        Add a socket FileDescriptor for notification of data available to write.
186        """
187        if writer not in self._writes:
188            self._writes[writer] = 1
189
190    def removeReader(self, reader):
191        """Remove a Selectable for notification of data available to read."""
192        if reader in self._reads:
193            del self._events[self._reads[reader]]
194            del self._reads[reader]
195
196            # If the descriptor is closed, move it out of the dictionary of
197            # reading descriptors into the dictionary of waiting descriptors.
198            if reader in self._closedAndReading:
199                self._closedAndNotReading[reader] = True
200                del self._closedAndReading[reader]
201
202    def removeWriter(self, writer):
203        """Remove a Selectable for notification of data available to write."""
204        if writer in self._writes:
205            del self._writes[writer]
206
207    def removeAll(self):
208        """
209        Remove all selectables, and return a list of them.
210        """
211        return self._removeAll(self._reads, self._writes)
212
213    def getReaders(self):
214        return list(self._reads.keys())
215
216    def getWriters(self):
217        return list(self._writes.keys())
218
219    def doWaitForMultipleEvents(self, timeout):
220        log.msg(channel="system", event="iteration", reactor=self)
221        if timeout is None:
222            timeout = 100
223
224        # Keep track of whether we run any application code before we get to the
225        # MsgWaitForMultipleObjects.  If so, there's a chance it will schedule a
226        # new timed call or stop the reactor or do something else that means we
227        # shouldn't block in MsgWaitForMultipleObjects for the full timeout.
228        ranUserCode = False
229
230        # If any descriptors are trying to close, try to get them out of the way
231        # first.
232        for reader in list(self._closedAndReading.keys()):
233            ranUserCode = True
234            self._runAction("doRead", reader)
235
236        for fd in list(self._writes.keys()):
237            ranUserCode = True
238            log.callWithLogger(fd, self._runWrite, fd)
239
240        if ranUserCode:
241            # If application code *might* have scheduled an event, assume it
242            # did.  If we're wrong, we'll get back here shortly anyway.  If
243            # we're right, we'll be sure to handle the event (including reactor
244            # shutdown) in a timely manner.
245            timeout = 0
246
247        if not (self._events or self._writes):
248            # sleep so we don't suck up CPU time
249            time.sleep(timeout)
250            return
251
252        handles = list(self._events.keys()) or [self.dummyEvent]
253        timeout = int(timeout * 1000)
254        val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT)
255        if val == WAIT_TIMEOUT:
256            return
257        elif val == WAIT_OBJECT_0 + len(handles):
258            exit = win32gui.PumpWaitingMessages()
259            if exit:
260                self.callLater(0, self.stop)
261                return
262        elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
263            event = handles[val - WAIT_OBJECT_0]
264            fd, action = self._events[event]
265
266            if fd in self._reads:
267                # Before anything, make sure it's still a valid file descriptor.
268                fileno = fd.fileno()
269                if fileno == -1:
270                    self._disconnectSelectable(fd, posixbase._NO_FILEDESC, False)
271                    return
272
273                # Since it's a socket (not another arbitrary event added via
274                # addEvent) and we asked for FD_READ | FD_CLOSE, check to see if
275                # we actually got FD_CLOSE.  This needs a special check because
276                # it only gets delivered once.  If we miss it, it's gone forever
277                # and we'll never know that the connection is closed.
278                events = WSAEnumNetworkEvents(fileno, event)
279                if FD_CLOSE in events:
280                    self._closedAndReading[fd] = True
281            log.callWithLogger(fd, self._runAction, action, fd)
282
283    def _runWrite(self, fd):
284        closed = 0
285        try:
286            closed = fd.doWrite()
287        except BaseException:
288            closed = sys.exc_info()[1]
289            log.deferr()
290
291        if closed:
292            self.removeReader(fd)
293            self.removeWriter(fd)
294            try:
295                fd.connectionLost(failure.Failure(closed))
296            except BaseException:
297                log.deferr()
298        elif closed is None:
299            return 1
300
301    def _runAction(self, action, fd):
302        try:
303            closed = getattr(fd, action)()
304        except BaseException:
305            closed = sys.exc_info()[1]
306            log.deferr()
307        if closed:
308            self._disconnectSelectable(fd, closed, action == "doRead")
309
310    doIteration = doWaitForMultipleEvents
311
312
313class _ThreadFDWrapper:
314    """
315    This wraps an event handler and translates notification in the helper
316    L{Win32Reactor} thread into a notification in the primary reactor thread.
317
318    @ivar _reactor: The primary reactor, the one to which event notification
319        will be sent.
320
321    @ivar _fd: The L{FileDescriptor} to which the event will be dispatched.
322
323    @ivar _action: A C{str} giving the method of C{_fd} which handles the event.
324
325    @ivar _logPrefix: The pre-fetched log prefix string for C{_fd}, so that
326        C{_fd.logPrefix} does not need to be called in a non-main thread.
327    """
328
329    def __init__(self, reactor, fd, action, logPrefix):
330        self._reactor = reactor
331        self._fd = fd
332        self._action = action
333        self._logPrefix = logPrefix
334
335    def logPrefix(self):
336        """
337        Return the original handler's log prefix, as it was given to
338        C{__init__}.
339        """
340        return self._logPrefix
341
342    def _execute(self):
343        """
344        Callback fired when the associated event is set.  Run the C{action}
345        callback on the wrapped descriptor in the main reactor thread and raise
346        or return whatever it raises or returns to cause this event handler to
347        be removed from C{self._reactor} if appropriate.
348        """
349        return blockingCallFromThread(
350            self._reactor, lambda: getattr(self._fd, self._action)()
351        )
352
353    def connectionLost(self, reason):
354        """
355        Pass through to the wrapped descriptor, but in the main reactor thread
356        instead of the helper C{Win32Reactor} thread.
357        """
358        self._reactor.callFromThread(self._fd.connectionLost, reason)
359
360
361@implementer(IReactorWin32Events)
362class _ThreadedWin32EventsMixin:
363    """
364    This mixin implements L{IReactorWin32Events} for another reactor by running
365    a L{Win32Reactor} in a separate thread and dispatching work to it.
366
367    @ivar _reactor: The L{Win32Reactor} running in the other thread.  This is
368        L{None} until it is actually needed.
369
370    @ivar _reactorThread: The L{threading.Thread} which is running the
371        L{Win32Reactor}.  This is L{None} until it is actually needed.
372    """
373
374    _reactor = None
375    _reactorThread = None
376
377    def _unmakeHelperReactor(self):
378        """
379        Stop and discard the reactor started by C{_makeHelperReactor}.
380        """
381        self._reactor.callFromThread(self._reactor.stop)
382        self._reactor = None
383
384    def _makeHelperReactor(self):
385        """
386        Create and (in a new thread) start a L{Win32Reactor} instance to use for
387        the implementation of L{IReactorWin32Events}.
388        """
389        self._reactor = Win32Reactor()
390        # This is a helper reactor, it is not the global reactor and its thread
391        # is not "the" I/O thread.  Prevent it from registering it as such.
392        self._reactor._registerAsIOThread = False
393        self._reactorThread = Thread(target=self._reactor.run, args=(False,))
394        self.addSystemEventTrigger("after", "shutdown", self._unmakeHelperReactor)
395        self._reactorThread.start()
396
397    def addEvent(self, event, fd, action):
398        """
399        @see: L{IReactorWin32Events}
400        """
401        if self._reactor is None:
402            self._makeHelperReactor()
403        self._reactor.callFromThread(
404            self._reactor.addEvent,
405            event,
406            _ThreadFDWrapper(self, fd, action, fd.logPrefix()),
407            "_execute",
408        )
409
410    def removeEvent(self, event):
411        """
412        @see: L{IReactorWin32Events}
413        """
414        self._reactor.callFromThread(self._reactor.removeEvent, event)
415
416
417def install():
418    threadable.init(1)
419    r = Win32Reactor()
420    from . import main
421
422    main.installReactor(r)
423
424
425__all__ = ["Win32Reactor", "install"]
426