1# -*- test-case-name: twisted.internet.test.test_core -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6A reactor for integrating with U{CFRunLoop<http://bit.ly/cfrunloop>}, the 7CoreFoundation main loop used by MacOS X. 8 9This is useful for integrating Twisted with U{PyObjC<http://pyobjc.sf.net/>} 10applications. 11""" 12 13__all__ = [ 14 'install', 15 'CFReactor' 16] 17 18import sys 19 20from zope.interface import implements 21 22from twisted.internet.interfaces import IReactorFDSet 23from twisted.internet.posixbase import PosixReactorBase, _Waker 24from twisted.internet.posixbase import _NO_FILEDESC 25 26from twisted.python import log 27 28from CoreFoundation import ( 29 CFRunLoopAddSource, CFRunLoopRemoveSource, CFRunLoopGetMain, CFRunLoopRun, 30 CFRunLoopStop, CFRunLoopTimerCreate, CFRunLoopAddTimer, 31 CFRunLoopTimerInvalidate, kCFAllocatorDefault, kCFRunLoopCommonModes, 32 CFAbsoluteTimeGetCurrent) 33 34from CFNetwork import ( 35 CFSocketCreateWithNative, CFSocketSetSocketFlags, CFSocketEnableCallBacks, 36 CFSocketCreateRunLoopSource, CFSocketDisableCallBacks, CFSocketInvalidate, 37 kCFSocketWriteCallBack, kCFSocketReadCallBack, kCFSocketConnectCallBack, 38 kCFSocketAutomaticallyReenableReadCallBack, 39 kCFSocketAutomaticallyReenableWriteCallBack) 40 41 42_READ = 0 43_WRITE = 1 44_preserveSOError = 1 << 6 45 46 47class _WakerPlus(_Waker): 48 """ 49 The normal Twisted waker will simply wake up the main loop, which causes an 50 iteration to run, which in turn causes L{PosixReactorBase.runUntilCurrent} 51 to get invoked. 52 53 L{CFReactor} has a slightly different model of iteration, though: rather 54 than have each iteration process the thread queue, then timed calls, then 55 file descriptors, each callback is run as it is dispatched by the CFRunLoop 56 observer which triggered it. 57 58 So this waker needs to not only unblock the loop, but also make sure the 59 work gets done; so, it reschedules the invocation of C{runUntilCurrent} to 60 be immediate (0 seconds from now) even if there is no timed call work to 61 do. 62 """ 63 64 def doRead(self): 65 """ 66 Wake up the loop and force C{runUntilCurrent} to run immediately in the 67 next timed iteration. 68 """ 69 result = _Waker.doRead(self) 70 self.reactor._scheduleSimulate(True) 71 return result 72 73 74 75class CFReactor(PosixReactorBase): 76 """ 77 The CoreFoundation reactor. 78 79 You probably want to use this via the L{install} API. 80 81 @ivar _fdmap: a dictionary, mapping an integer (a file descriptor) to a 82 4-tuple of: 83 84 - source: a C{CFRunLoopSource}; the source associated with this 85 socket. 86 - socket: a C{CFSocket} wrapping the file descriptor. 87 - descriptor: an L{IReadDescriptor} and/or L{IWriteDescriptor} 88 provider. 89 - read-write: a 2-C{list} of booleans: respectively, whether this 90 descriptor is currently registered for reading or registered for 91 writing. 92 93 @ivar _idmap: a dictionary, mapping the id() of an L{IReadDescriptor} or 94 L{IWriteDescriptor} to a C{fd} in L{_fdmap}. Implemented in this 95 manner so that we don't have to rely (even more) on the hashability of 96 L{IReadDescriptor} providers, and we know that they won't be collected 97 since these are kept in sync with C{_fdmap}. Necessary because the 98 .fileno() of a file descriptor may change at will, so we need to be 99 able to look up what its file descriptor I{used} to be, so that we can 100 look it up in C{_fdmap} 101 102 @ivar _cfrunloop: the L{CFRunLoop} pyobjc object wrapped by this reactor. 103 104 @ivar _inCFLoop: Is L{CFRunLoopRun} currently running? 105 106 @type _inCFLoop: C{bool} 107 108 @ivar _currentSimulator: if a CFTimer is currently scheduled with the CF 109 run loop to run Twisted callLater calls, this is a reference to it. 110 Otherwise, it is C{None} 111 """ 112 113 implements(IReactorFDSet) 114 115 def __init__(self, runLoop=None, runner=None): 116 self._fdmap = {} 117 self._idmap = {} 118 if runner is None: 119 runner = CFRunLoopRun 120 self._runner = runner 121 122 if runLoop is None: 123 runLoop = CFRunLoopGetMain() 124 self._cfrunloop = runLoop 125 PosixReactorBase.__init__(self) 126 127 128 def installWaker(self): 129 """ 130 Override C{installWaker} in order to use L{_WakerPlus}; otherwise this 131 should be exactly the same as the parent implementation. 132 """ 133 if not self.waker: 134 self.waker = _WakerPlus(self) 135 self._internalReaders.add(self.waker) 136 self.addReader(self.waker) 137 138 139 def _socketCallback(self, cfSocket, callbackType, 140 ignoredAddress, ignoredData, context): 141 """ 142 The socket callback issued by CFRunLoop. This will issue C{doRead} or 143 C{doWrite} calls to the L{IReadDescriptor} and L{IWriteDescriptor} 144 registered with the file descriptor that we are being notified of. 145 146 @param cfSocket: The L{CFSocket} which has got some activity. 147 148 @param callbackType: The type of activity that we are being notified 149 of. Either L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}. 150 151 @param ignoredAddress: Unused, because this is not used for either of 152 the callback types we register for. 153 154 @param ignoredData: Unused, because this is not used for either of the 155 callback types we register for. 156 157 @param context: The data associated with this callback by 158 L{CFSocketCreateWithNative} (in L{CFReactor._watchFD}). A 2-tuple 159 of C{(int, CFRunLoopSource)}. 160 """ 161 (fd, smugglesrc) = context 162 if fd not in self._fdmap: 163 # Spurious notifications seem to be generated sometimes if you 164 # CFSocketDisableCallBacks in the middle of an event. I don't know 165 # about this FD, any more, so let's get rid of it. 166 CFRunLoopRemoveSource( 167 self._cfrunloop, smugglesrc, kCFRunLoopCommonModes 168 ) 169 return 170 171 why = None 172 isRead = False 173 src, skt, readWriteDescriptor, rw = self._fdmap[fd] 174 try: 175 if readWriteDescriptor.fileno() == -1: 176 why = _NO_FILEDESC 177 else: 178 isRead = callbackType == kCFSocketReadCallBack 179 # CFSocket seems to deliver duplicate read/write notifications 180 # sometimes, especially a duplicate writability notification 181 # when first registering the socket. This bears further 182 # investigation, since I may have been mis-interpreting the 183 # behavior I was seeing. (Running the full Twisted test suite, 184 # while thorough, is not always entirely clear.) Until this has 185 # been more thoroughly investigated , we consult our own 186 # reading/writing state flags to determine whether we should 187 # actually attempt a doRead/doWrite first. -glyph 188 if isRead: 189 if rw[_READ]: 190 why = log.callWithLogger( 191 readWriteDescriptor, readWriteDescriptor.doRead) 192 else: 193 if rw[_WRITE]: 194 why = log.callWithLogger( 195 readWriteDescriptor, readWriteDescriptor.doWrite) 196 except: 197 why = sys.exc_info()[1] 198 log.err() 199 if why: 200 self._disconnectSelectable(readWriteDescriptor, why, isRead) 201 202 203 def _watchFD(self, fd, descr, flag): 204 """ 205 Register a file descriptor with the L{CFRunLoop}, or modify its state 206 so that it's listening for both notifications (read and write) rather 207 than just one; used to implement C{addReader} and C{addWriter}. 208 209 @param fd: The file descriptor. 210 211 @type fd: C{int} 212 213 @param descr: the L{IReadDescriptor} or L{IWriteDescriptor} 214 215 @param flag: the flag to register for callbacks on, either 216 L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack} 217 """ 218 if fd == -1: 219 raise RuntimeError("Invalid file descriptor.") 220 if fd in self._fdmap: 221 src, cfs, gotdescr, rw = self._fdmap[fd] 222 # do I need to verify that it's the same descr? 223 else: 224 ctx = [] 225 ctx.append(fd) 226 cfs = CFSocketCreateWithNative( 227 kCFAllocatorDefault, fd, 228 kCFSocketReadCallBack | kCFSocketWriteCallBack | 229 kCFSocketConnectCallBack, 230 self._socketCallback, ctx 231 ) 232 CFSocketSetSocketFlags( 233 cfs, 234 kCFSocketAutomaticallyReenableReadCallBack | 235 kCFSocketAutomaticallyReenableWriteCallBack | 236 237 # This extra flag is to ensure that CF doesn't (destructively, 238 # because destructively is the only way to do it) retrieve 239 # SO_ERROR and thereby break twisted.internet.tcp.BaseClient, 240 # which needs SO_ERROR to tell it whether or not it needs to 241 # call connect_ex a second time. 242 _preserveSOError 243 ) 244 src = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cfs, 0) 245 ctx.append(src) 246 CFRunLoopAddSource(self._cfrunloop, src, kCFRunLoopCommonModes) 247 CFSocketDisableCallBacks( 248 cfs, 249 kCFSocketReadCallBack | kCFSocketWriteCallBack | 250 kCFSocketConnectCallBack 251 ) 252 rw = [False, False] 253 self._idmap[id(descr)] = fd 254 self._fdmap[fd] = src, cfs, descr, rw 255 rw[self._flag2idx(flag)] = True 256 CFSocketEnableCallBacks(cfs, flag) 257 258 259 def _flag2idx(self, flag): 260 """ 261 Convert a C{kCFSocket...} constant to an index into the read/write 262 state list (C{_READ} or C{_WRITE}) (the 4th element of the value of 263 C{self._fdmap}). 264 265 @param flag: C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack} 266 267 @return: C{_READ} or C{_WRITE} 268 """ 269 return {kCFSocketReadCallBack: _READ, 270 kCFSocketWriteCallBack: _WRITE}[flag] 271 272 273 def _unwatchFD(self, fd, descr, flag): 274 """ 275 Unregister a file descriptor with the L{CFRunLoop}, or modify its state 276 so that it's listening for only one notification (read or write) as 277 opposed to both; used to implement C{removeReader} and C{removeWriter}. 278 279 @param fd: a file descriptor 280 281 @type fd: C{int} 282 283 @param descr: an L{IReadDescriptor} or L{IWriteDescriptor} 284 285 @param flag: L{kCFSocketWriteCallBack} L{kCFSocketReadCallBack} 286 """ 287 if id(descr) not in self._idmap: 288 return 289 if fd == -1: 290 # need to deal with it in this case, I think. 291 realfd = self._idmap[id(descr)] 292 else: 293 realfd = fd 294 src, cfs, descr, rw = self._fdmap[realfd] 295 CFSocketDisableCallBacks(cfs, flag) 296 rw[self._flag2idx(flag)] = False 297 if not rw[_READ] and not rw[_WRITE]: 298 del self._idmap[id(descr)] 299 del self._fdmap[realfd] 300 CFRunLoopRemoveSource(self._cfrunloop, src, kCFRunLoopCommonModes) 301 CFSocketInvalidate(cfs) 302 303 304 def addReader(self, reader): 305 """ 306 Implement L{IReactorFDSet.addReader}. 307 """ 308 self._watchFD(reader.fileno(), reader, kCFSocketReadCallBack) 309 310 311 def addWriter(self, writer): 312 """ 313 Implement L{IReactorFDSet.addWriter}. 314 """ 315 self._watchFD(writer.fileno(), writer, kCFSocketWriteCallBack) 316 317 318 def removeReader(self, reader): 319 """ 320 Implement L{IReactorFDSet.removeReader}. 321 """ 322 self._unwatchFD(reader.fileno(), reader, kCFSocketReadCallBack) 323 324 325 def removeWriter(self, writer): 326 """ 327 Implement L{IReactorFDSet.removeWriter}. 328 """ 329 self._unwatchFD(writer.fileno(), writer, kCFSocketWriteCallBack) 330 331 332 def removeAll(self): 333 """ 334 Implement L{IReactorFDSet.removeAll}. 335 """ 336 allDesc = set([descr for src, cfs, descr, rw in self._fdmap.values()]) 337 allDesc -= set(self._internalReaders) 338 for desc in allDesc: 339 self.removeReader(desc) 340 self.removeWriter(desc) 341 return list(allDesc) 342 343 344 def getReaders(self): 345 """ 346 Implement L{IReactorFDSet.getReaders}. 347 """ 348 return [descr for src, cfs, descr, rw in self._fdmap.values() 349 if rw[_READ]] 350 351 352 def getWriters(self): 353 """ 354 Implement L{IReactorFDSet.getWriters}. 355 """ 356 return [descr for src, cfs, descr, rw in self._fdmap.values() 357 if rw[_WRITE]] 358 359 360 def _moveCallLaterSooner(self, tple): 361 """ 362 Override L{PosixReactorBase}'s implementation of L{IDelayedCall.reset} 363 so that it will immediately reschedule. Normally 364 C{_moveCallLaterSooner} depends on the fact that C{runUntilCurrent} is 365 always run before the mainloop goes back to sleep, so this forces it to 366 immediately recompute how long the loop needs to stay asleep. 367 """ 368 result = PosixReactorBase._moveCallLaterSooner(self, tple) 369 self._scheduleSimulate() 370 return result 371 372 373 _inCFLoop = False 374 375 def mainLoop(self): 376 """ 377 Run the runner (L{CFRunLoopRun} or something that calls it), which runs 378 the run loop until C{crash()} is called. 379 """ 380 self._inCFLoop = True 381 try: 382 self._runner() 383 finally: 384 self._inCFLoop = False 385 386 387 _currentSimulator = None 388 389 def _scheduleSimulate(self, force=False): 390 """ 391 Schedule a call to C{self.runUntilCurrent}. This will cancel the 392 currently scheduled call if it is already scheduled. 393 394 @param force: Even if there are no timed calls, make sure that 395 C{runUntilCurrent} runs immediately (in a 0-seconds-from-now 396 {CFRunLoopTimer}). This is necessary for calls which need to 397 trigger behavior of C{runUntilCurrent} other than running timed 398 calls, such as draining the thread call queue or calling C{crash()} 399 when the appropriate flags are set. 400 401 @type force: C{bool} 402 """ 403 if self._currentSimulator is not None: 404 CFRunLoopTimerInvalidate(self._currentSimulator) 405 self._currentSimulator = None 406 timeout = self.timeout() 407 if force: 408 timeout = 0.0 409 if timeout is not None: 410 fireDate = (CFAbsoluteTimeGetCurrent() + timeout) 411 def simulate(cftimer, extra): 412 self._currentSimulator = None 413 self.runUntilCurrent() 414 self._scheduleSimulate() 415 c = self._currentSimulator = CFRunLoopTimerCreate( 416 kCFAllocatorDefault, fireDate, 417 0, 0, 0, simulate, None 418 ) 419 CFRunLoopAddTimer(self._cfrunloop, c, kCFRunLoopCommonModes) 420 421 422 def callLater(self, _seconds, _f, *args, **kw): 423 """ 424 Implement L{IReactorTime.callLater}. 425 """ 426 delayedCall = PosixReactorBase.callLater( 427 self, _seconds, _f, *args, **kw 428 ) 429 self._scheduleSimulate() 430 return delayedCall 431 432 433 def stop(self): 434 """ 435 Implement L{IReactorCore.stop}. 436 """ 437 PosixReactorBase.stop(self) 438 self._scheduleSimulate(True) 439 440 441 def crash(self): 442 """ 443 Implement L{IReactorCore.crash} 444 """ 445 wasStarted = self._started 446 PosixReactorBase.crash(self) 447 if self._inCFLoop: 448 self._stopNow() 449 else: 450 if wasStarted: 451 self.callLater(0, self._stopNow) 452 453 454 def _stopNow(self): 455 """ 456 Immediately stop the CFRunLoop (which must be running!). 457 """ 458 CFRunLoopStop(self._cfrunloop) 459 460 461 def iterate(self, delay=0): 462 """ 463 Emulate the behavior of C{iterate()} for things that want to call it, 464 by letting the loop run for a little while and then scheduling a timed 465 call to exit it. 466 """ 467 self.callLater(delay, self._stopNow) 468 self.mainLoop() 469 470 471 472def install(runLoop=None, runner=None): 473 """ 474 Configure the twisted mainloop to be run inside CFRunLoop. 475 476 @param runLoop: the run loop to use. 477 478 @param runner: the function to call in order to actually invoke the main 479 loop. This will default to L{CFRunLoopRun} if not specified. However, 480 this is not an appropriate choice for GUI applications, as you need to 481 run NSApplicationMain (or something like it). For example, to run the 482 Twisted mainloop in a PyObjC application, your C{main.py} should look 483 something like this:: 484 485 from PyObjCTools import AppHelper 486 from twisted.internet.cfreactor import install 487 install(runner=AppHelper.runEventLoop) 488 # initialize your application 489 reactor.run() 490 491 @return: The installed reactor. 492 493 @rtype: L{CFReactor} 494 """ 495 496 reactor = CFReactor(runLoop=runLoop, runner=runner) 497 from twisted.internet.main import installReactor 498 installReactor(reactor) 499 return reactor 500 501 502