1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4 5""" 6A win32event based implementation of the Twisted main loop. 7 8This requires pywin32 (formerly win32all) or ActivePython to be installed. 9 10To install the event loop (and you should do this before any connections, 11listeners or connectors are added):: 12 13 from twisted.internet import win32eventreactor 14 win32eventreactor.install() 15 16LIMITATIONS: 17 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects. 18 2. Process running has some problems (see L{twisted.internet.process} docstring). 19 20 21TODO: 22 1. Event loop handling of writes is *very* problematic (this is causing failed tests). 23 Switch to doing it the correct way, whatever that means (see below). 24 2. Replace icky socket loopback waker with event based waker (use dummyEvent object) 25 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs. 26 27 28ALTERNATIVE SOLUTIONS: 29 - IIRC, sockets can only be registered once. So we switch to a structure 30 like the poll() reactor, thus allowing us to deal with write events in 31 a decent fashion. This should allow us to pass tests, but we're still 32 limited to 64 events. 33 34Or: 35 36 - Instead of doing a reactor, we make this an addon to the select reactor. 37 The WFMO event loop runs in a separate thread. This means no need to maintain 38 separate code for networking, 64 event limit doesn't apply to sockets, 39 we can run processes and other win32 stuff in default event loop. The 40 only problem is that we're stuck with the icky socket based waker. 41 Another benefit is that this could be extended to support >64 events 42 in a simpler manner than the previous solution. 43 44The 2nd solution is probably what will get implemented. 45""" 46 47import sys 48 49# System imports 50import time 51from threading import Thread 52from weakref import WeakKeyDictionary 53 54from zope.interface import implementer 55 56# Win32 imports 57from win32file import ( # type: ignore[import] 58 FD_ACCEPT, 59 FD_CLOSE, 60 FD_CONNECT, 61 FD_READ, 62 WSAEventSelect, 63) 64 65try: 66 # WSAEnumNetworkEvents was added in pywin32 215 67 from win32file import WSAEnumNetworkEvents 68except ImportError: 69 import warnings 70 71 warnings.warn( 72 "Reliable disconnection notification requires pywin32 215 or later", 73 category=UserWarning, 74 ) 75 76 def WSAEnumNetworkEvents(fd, event): 77 return {FD_READ} 78 79 80import win32gui # type: ignore[import] 81from win32event import ( # type: ignore[import] 82 QS_ALLINPUT, 83 WAIT_OBJECT_0, 84 WAIT_TIMEOUT, 85 CreateEvent, 86 MsgWaitForMultipleObjects, 87) 88 89# Twisted imports 90from twisted.internet import posixbase 91from twisted.internet.interfaces import IReactorFDSet, IReactorWin32Events 92from twisted.internet.threads import blockingCallFromThread 93from twisted.python import failure, log, threadable 94 95 96@implementer(IReactorFDSet, IReactorWin32Events) 97class Win32Reactor(posixbase.PosixReactorBase): 98 """ 99 Reactor that uses Win32 event APIs. 100 101 @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a 102 win32 event object used to check for read events for that descriptor. 103 104 @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a 105 arbitrary value. Keys in this dictionary will be given a chance to 106 write out their data. 107 108 @ivar _events: A dictionary mapping win32 event object to tuples of 109 L{FileDescriptor} instances and event masks. 110 111 @ivar _closedAndReading: Along with C{_closedAndNotReading}, keeps track of 112 descriptors which have had close notification delivered from the OS but 113 which we have not finished reading data from. MsgWaitForMultipleObjects 114 will only deliver close notification to us once, so we remember it in 115 these two dictionaries until we're ready to act on it. The OS has 116 delivered close notification for each descriptor in this dictionary, and 117 the descriptors are marked as allowed to handle read events in the 118 reactor, so they can be processed. When a descriptor is marked as not 119 allowed to handle read events in the reactor (ie, it is passed to 120 L{IReactorFDSet.removeReader}), it is moved out of this dictionary and 121 into C{_closedAndNotReading}. The descriptors are keys in this 122 dictionary. The values are arbitrary. 123 @type _closedAndReading: C{dict} 124 125 @ivar _closedAndNotReading: These descriptors have had close notification 126 delivered from the OS, but are not marked as allowed to handle read 127 events in the reactor. They are saved here to record their closed 128 state, but not processed at all. When one of these descriptors is 129 passed to L{IReactorFDSet.addReader}, it is moved out of this dictionary 130 and into C{_closedAndReading}. The descriptors are keys in this 131 dictionary. The values are arbitrary. This is a weak key dictionary so 132 that if an application tells the reactor to stop reading from a 133 descriptor and then forgets about that descriptor itself, the reactor 134 will also forget about it. 135 @type _closedAndNotReading: C{WeakKeyDictionary} 136 """ 137 138 dummyEvent = CreateEvent(None, 0, 0, None) 139 140 def __init__(self): 141 self._reads = {} 142 self._writes = {} 143 self._events = {} 144 self._closedAndReading = {} 145 self._closedAndNotReading = WeakKeyDictionary() 146 posixbase.PosixReactorBase.__init__(self) 147 148 def _makeSocketEvent(self, fd, action, why): 149 """ 150 Make a win32 event object for a socket. 151 """ 152 event = CreateEvent(None, 0, 0, None) 153 WSAEventSelect(fd, event, why) 154 self._events[event] = (fd, action) 155 return event 156 157 def addEvent(self, event, fd, action): 158 """ 159 Add a new win32 event to the event loop. 160 """ 161 self._events[event] = (fd, action) 162 163 def removeEvent(self, event): 164 """ 165 Remove an event. 166 """ 167 del self._events[event] 168 169 def addReader(self, reader): 170 """ 171 Add a socket FileDescriptor for notification of data available to read. 172 """ 173 if reader not in self._reads: 174 self._reads[reader] = self._makeSocketEvent( 175 reader, "doRead", FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE 176 ) 177 # If the reader is closed, move it over to the dictionary of reading 178 # descriptors. 179 if reader in self._closedAndNotReading: 180 self._closedAndReading[reader] = True 181 del self._closedAndNotReading[reader] 182 183 def addWriter(self, writer): 184 """ 185 Add a socket FileDescriptor for notification of data available to write. 186 """ 187 if writer not in self._writes: 188 self._writes[writer] = 1 189 190 def removeReader(self, reader): 191 """Remove a Selectable for notification of data available to read.""" 192 if reader in self._reads: 193 del self._events[self._reads[reader]] 194 del self._reads[reader] 195 196 # If the descriptor is closed, move it out of the dictionary of 197 # reading descriptors into the dictionary of waiting descriptors. 198 if reader in self._closedAndReading: 199 self._closedAndNotReading[reader] = True 200 del self._closedAndReading[reader] 201 202 def removeWriter(self, writer): 203 """Remove a Selectable for notification of data available to write.""" 204 if writer in self._writes: 205 del self._writes[writer] 206 207 def removeAll(self): 208 """ 209 Remove all selectables, and return a list of them. 210 """ 211 return self._removeAll(self._reads, self._writes) 212 213 def getReaders(self): 214 return list(self._reads.keys()) 215 216 def getWriters(self): 217 return list(self._writes.keys()) 218 219 def doWaitForMultipleEvents(self, timeout): 220 log.msg(channel="system", event="iteration", reactor=self) 221 if timeout is None: 222 timeout = 100 223 224 # Keep track of whether we run any application code before we get to the 225 # MsgWaitForMultipleObjects. If so, there's a chance it will schedule a 226 # new timed call or stop the reactor or do something else that means we 227 # shouldn't block in MsgWaitForMultipleObjects for the full timeout. 228 ranUserCode = False 229 230 # If any descriptors are trying to close, try to get them out of the way 231 # first. 232 for reader in list(self._closedAndReading.keys()): 233 ranUserCode = True 234 self._runAction("doRead", reader) 235 236 for fd in list(self._writes.keys()): 237 ranUserCode = True 238 log.callWithLogger(fd, self._runWrite, fd) 239 240 if ranUserCode: 241 # If application code *might* have scheduled an event, assume it 242 # did. If we're wrong, we'll get back here shortly anyway. If 243 # we're right, we'll be sure to handle the event (including reactor 244 # shutdown) in a timely manner. 245 timeout = 0 246 247 if not (self._events or self._writes): 248 # sleep so we don't suck up CPU time 249 time.sleep(timeout) 250 return 251 252 handles = list(self._events.keys()) or [self.dummyEvent] 253 timeout = int(timeout * 1000) 254 val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT) 255 if val == WAIT_TIMEOUT: 256 return 257 elif val == WAIT_OBJECT_0 + len(handles): 258 exit = win32gui.PumpWaitingMessages() 259 if exit: 260 self.callLater(0, self.stop) 261 return 262 elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles): 263 event = handles[val - WAIT_OBJECT_0] 264 fd, action = self._events[event] 265 266 if fd in self._reads: 267 # Before anything, make sure it's still a valid file descriptor. 268 fileno = fd.fileno() 269 if fileno == -1: 270 self._disconnectSelectable(fd, posixbase._NO_FILEDESC, False) 271 return 272 273 # Since it's a socket (not another arbitrary event added via 274 # addEvent) and we asked for FD_READ | FD_CLOSE, check to see if 275 # we actually got FD_CLOSE. This needs a special check because 276 # it only gets delivered once. If we miss it, it's gone forever 277 # and we'll never know that the connection is closed. 278 events = WSAEnumNetworkEvents(fileno, event) 279 if FD_CLOSE in events: 280 self._closedAndReading[fd] = True 281 log.callWithLogger(fd, self._runAction, action, fd) 282 283 def _runWrite(self, fd): 284 closed = 0 285 try: 286 closed = fd.doWrite() 287 except BaseException: 288 closed = sys.exc_info()[1] 289 log.deferr() 290 291 if closed: 292 self.removeReader(fd) 293 self.removeWriter(fd) 294 try: 295 fd.connectionLost(failure.Failure(closed)) 296 except BaseException: 297 log.deferr() 298 elif closed is None: 299 return 1 300 301 def _runAction(self, action, fd): 302 try: 303 closed = getattr(fd, action)() 304 except BaseException: 305 closed = sys.exc_info()[1] 306 log.deferr() 307 if closed: 308 self._disconnectSelectable(fd, closed, action == "doRead") 309 310 doIteration = doWaitForMultipleEvents 311 312 313class _ThreadFDWrapper: 314 """ 315 This wraps an event handler and translates notification in the helper 316 L{Win32Reactor} thread into a notification in the primary reactor thread. 317 318 @ivar _reactor: The primary reactor, the one to which event notification 319 will be sent. 320 321 @ivar _fd: The L{FileDescriptor} to which the event will be dispatched. 322 323 @ivar _action: A C{str} giving the method of C{_fd} which handles the event. 324 325 @ivar _logPrefix: The pre-fetched log prefix string for C{_fd}, so that 326 C{_fd.logPrefix} does not need to be called in a non-main thread. 327 """ 328 329 def __init__(self, reactor, fd, action, logPrefix): 330 self._reactor = reactor 331 self._fd = fd 332 self._action = action 333 self._logPrefix = logPrefix 334 335 def logPrefix(self): 336 """ 337 Return the original handler's log prefix, as it was given to 338 C{__init__}. 339 """ 340 return self._logPrefix 341 342 def _execute(self): 343 """ 344 Callback fired when the associated event is set. Run the C{action} 345 callback on the wrapped descriptor in the main reactor thread and raise 346 or return whatever it raises or returns to cause this event handler to 347 be removed from C{self._reactor} if appropriate. 348 """ 349 return blockingCallFromThread( 350 self._reactor, lambda: getattr(self._fd, self._action)() 351 ) 352 353 def connectionLost(self, reason): 354 """ 355 Pass through to the wrapped descriptor, but in the main reactor thread 356 instead of the helper C{Win32Reactor} thread. 357 """ 358 self._reactor.callFromThread(self._fd.connectionLost, reason) 359 360 361@implementer(IReactorWin32Events) 362class _ThreadedWin32EventsMixin: 363 """ 364 This mixin implements L{IReactorWin32Events} for another reactor by running 365 a L{Win32Reactor} in a separate thread and dispatching work to it. 366 367 @ivar _reactor: The L{Win32Reactor} running in the other thread. This is 368 L{None} until it is actually needed. 369 370 @ivar _reactorThread: The L{threading.Thread} which is running the 371 L{Win32Reactor}. This is L{None} until it is actually needed. 372 """ 373 374 _reactor = None 375 _reactorThread = None 376 377 def _unmakeHelperReactor(self): 378 """ 379 Stop and discard the reactor started by C{_makeHelperReactor}. 380 """ 381 self._reactor.callFromThread(self._reactor.stop) 382 self._reactor = None 383 384 def _makeHelperReactor(self): 385 """ 386 Create and (in a new thread) start a L{Win32Reactor} instance to use for 387 the implementation of L{IReactorWin32Events}. 388 """ 389 self._reactor = Win32Reactor() 390 # This is a helper reactor, it is not the global reactor and its thread 391 # is not "the" I/O thread. Prevent it from registering it as such. 392 self._reactor._registerAsIOThread = False 393 self._reactorThread = Thread(target=self._reactor.run, args=(False,)) 394 self.addSystemEventTrigger("after", "shutdown", self._unmakeHelperReactor) 395 self._reactorThread.start() 396 397 def addEvent(self, event, fd, action): 398 """ 399 @see: L{IReactorWin32Events} 400 """ 401 if self._reactor is None: 402 self._makeHelperReactor() 403 self._reactor.callFromThread( 404 self._reactor.addEvent, 405 event, 406 _ThreadFDWrapper(self, fd, action, fd.logPrefix()), 407 "_execute", 408 ) 409 410 def removeEvent(self, event): 411 """ 412 @see: L{IReactorWin32Events} 413 """ 414 self._reactor.callFromThread(self._reactor.removeEvent, event) 415 416 417def install(): 418 threadable.init(1) 419 r = Win32Reactor() 420 from . import main 421 422 main.installReactor(r) 423 424 425__all__ = ["Win32Reactor", "install"] 426