1# -*- test-case-name: twisted.internet.test.test_core -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6A reactor for integrating with U{CFRunLoop<http://bit.ly/cfrunloop>}, the
7CoreFoundation main loop used by MacOS X.
8
9This is useful for integrating Twisted with U{PyObjC<http://pyobjc.sf.net/>}
10applications.
11"""
12
13__all__ = [
14    'install',
15    'CFReactor'
16]
17
18import sys
19
20from zope.interface import implements
21
22from twisted.internet.interfaces import IReactorFDSet
23from twisted.internet.posixbase import PosixReactorBase, _Waker
24from twisted.internet.posixbase import _NO_FILEDESC
25
26from twisted.python import log
27
28from CoreFoundation import (
29    CFRunLoopAddSource, CFRunLoopRemoveSource, CFRunLoopGetMain, CFRunLoopRun,
30    CFRunLoopStop, CFRunLoopTimerCreate, CFRunLoopAddTimer,
31    CFRunLoopTimerInvalidate, kCFAllocatorDefault, kCFRunLoopCommonModes,
32    CFAbsoluteTimeGetCurrent)
33
34from CFNetwork import (
35    CFSocketCreateWithNative, CFSocketSetSocketFlags, CFSocketEnableCallBacks,
36    CFSocketCreateRunLoopSource, CFSocketDisableCallBacks, CFSocketInvalidate,
37    kCFSocketWriteCallBack, kCFSocketReadCallBack, kCFSocketConnectCallBack,
38    kCFSocketAutomaticallyReenableReadCallBack,
39    kCFSocketAutomaticallyReenableWriteCallBack)
40
41
42_READ = 0
43_WRITE = 1
44_preserveSOError = 1 << 6
45
46
47class _WakerPlus(_Waker):
48    """
49    The normal Twisted waker will simply wake up the main loop, which causes an
50    iteration to run, which in turn causes L{PosixReactorBase.runUntilCurrent}
51    to get invoked.
52
53    L{CFReactor} has a slightly different model of iteration, though: rather
54    than have each iteration process the thread queue, then timed calls, then
55    file descriptors, each callback is run as it is dispatched by the CFRunLoop
56    observer which triggered it.
57
58    So this waker needs to not only unblock the loop, but also make sure the
59    work gets done; so, it reschedules the invocation of C{runUntilCurrent} to
60    be immediate (0 seconds from now) even if there is no timed call work to
61    do.
62    """
63
64    def doRead(self):
65        """
66        Wake up the loop and force C{runUntilCurrent} to run immediately in the
67        next timed iteration.
68        """
69        result = _Waker.doRead(self)
70        self.reactor._scheduleSimulate(True)
71        return result
72
73
74
75class CFReactor(PosixReactorBase):
76    """
77    The CoreFoundation reactor.
78
79    You probably want to use this via the L{install} API.
80
81    @ivar _fdmap: a dictionary, mapping an integer (a file descriptor) to a
82        4-tuple of:
83
84            - source: a C{CFRunLoopSource}; the source associated with this
85              socket.
86            - socket: a C{CFSocket} wrapping the file descriptor.
87            - descriptor: an L{IReadDescriptor} and/or L{IWriteDescriptor}
88              provider.
89            - read-write: a 2-C{list} of booleans: respectively, whether this
90              descriptor is currently registered for reading or registered for
91              writing.
92
93    @ivar _idmap: a dictionary, mapping the id() of an L{IReadDescriptor} or
94        L{IWriteDescriptor} to a C{fd} in L{_fdmap}.  Implemented in this
95        manner so that we don't have to rely (even more) on the hashability of
96        L{IReadDescriptor} providers, and we know that they won't be collected
97        since these are kept in sync with C{_fdmap}.  Necessary because the
98        .fileno() of a file descriptor may change at will, so we need to be
99        able to look up what its file descriptor I{used} to be, so that we can
100        look it up in C{_fdmap}
101
102    @ivar _cfrunloop: the L{CFRunLoop} pyobjc object wrapped by this reactor.
103
104    @ivar _inCFLoop: Is L{CFRunLoopRun} currently running?
105
106    @type _inCFLoop: C{bool}
107
108    @ivar _currentSimulator: if a CFTimer is currently scheduled with the CF
109        run loop to run Twisted callLater calls, this is a reference to it.
110        Otherwise, it is C{None}
111    """
112
113    implements(IReactorFDSet)
114
115    def __init__(self, runLoop=None, runner=None):
116        self._fdmap = {}
117        self._idmap = {}
118        if runner is None:
119            runner = CFRunLoopRun
120        self._runner = runner
121
122        if runLoop is None:
123            runLoop = CFRunLoopGetMain()
124        self._cfrunloop = runLoop
125        PosixReactorBase.__init__(self)
126
127
128    def installWaker(self):
129        """
130        Override C{installWaker} in order to use L{_WakerPlus}; otherwise this
131        should be exactly the same as the parent implementation.
132        """
133        if not self.waker:
134            self.waker = _WakerPlus(self)
135            self._internalReaders.add(self.waker)
136            self.addReader(self.waker)
137
138
139    def _socketCallback(self, cfSocket, callbackType,
140                        ignoredAddress, ignoredData, context):
141        """
142        The socket callback issued by CFRunLoop.  This will issue C{doRead} or
143        C{doWrite} calls to the L{IReadDescriptor} and L{IWriteDescriptor}
144        registered with the file descriptor that we are being notified of.
145
146        @param cfSocket: The L{CFSocket} which has got some activity.
147
148        @param callbackType: The type of activity that we are being notified
149            of.  Either L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}.
150
151        @param ignoredAddress: Unused, because this is not used for either of
152            the callback types we register for.
153
154        @param ignoredData: Unused, because this is not used for either of the
155            callback types we register for.
156
157        @param context: The data associated with this callback by
158            L{CFSocketCreateWithNative} (in L{CFReactor._watchFD}).  A 2-tuple
159            of C{(int, CFRunLoopSource)}.
160        """
161        (fd, smugglesrc) = context
162        if fd not in self._fdmap:
163            # Spurious notifications seem to be generated sometimes if you
164            # CFSocketDisableCallBacks in the middle of an event.  I don't know
165            # about this FD, any more, so let's get rid of it.
166            CFRunLoopRemoveSource(
167                self._cfrunloop, smugglesrc, kCFRunLoopCommonModes
168            )
169            return
170
171        why = None
172        isRead = False
173        src, skt, readWriteDescriptor, rw = self._fdmap[fd]
174        try:
175            if readWriteDescriptor.fileno() == -1:
176                why = _NO_FILEDESC
177            else:
178                isRead = callbackType == kCFSocketReadCallBack
179                # CFSocket seems to deliver duplicate read/write notifications
180                # sometimes, especially a duplicate writability notification
181                # when first registering the socket.  This bears further
182                # investigation, since I may have been mis-interpreting the
183                # behavior I was seeing. (Running the full Twisted test suite,
184                # while thorough, is not always entirely clear.) Until this has
185                # been more thoroughly investigated , we consult our own
186                # reading/writing state flags to determine whether we should
187                # actually attempt a doRead/doWrite first.  -glyph
188                if isRead:
189                    if rw[_READ]:
190                        why = log.callWithLogger(
191                            readWriteDescriptor, readWriteDescriptor.doRead)
192                else:
193                    if rw[_WRITE]:
194                        why = log.callWithLogger(
195                            readWriteDescriptor, readWriteDescriptor.doWrite)
196        except:
197            why = sys.exc_info()[1]
198            log.err()
199        if why:
200            self._disconnectSelectable(readWriteDescriptor, why, isRead)
201
202
203    def _watchFD(self, fd, descr, flag):
204        """
205        Register a file descriptor with the L{CFRunLoop}, or modify its state
206        so that it's listening for both notifications (read and write) rather
207        than just one; used to implement C{addReader} and C{addWriter}.
208
209        @param fd: The file descriptor.
210
211        @type fd: C{int}
212
213        @param descr: the L{IReadDescriptor} or L{IWriteDescriptor}
214
215        @param flag: the flag to register for callbacks on, either
216            L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}
217        """
218        if fd == -1:
219            raise RuntimeError("Invalid file descriptor.")
220        if fd in self._fdmap:
221            src, cfs, gotdescr, rw = self._fdmap[fd]
222            # do I need to verify that it's the same descr?
223        else:
224            ctx = []
225            ctx.append(fd)
226            cfs = CFSocketCreateWithNative(
227                kCFAllocatorDefault, fd,
228                kCFSocketReadCallBack | kCFSocketWriteCallBack |
229                kCFSocketConnectCallBack,
230                self._socketCallback, ctx
231            )
232            CFSocketSetSocketFlags(
233                cfs,
234                kCFSocketAutomaticallyReenableReadCallBack |
235                kCFSocketAutomaticallyReenableWriteCallBack |
236
237                # This extra flag is to ensure that CF doesn't (destructively,
238                # because destructively is the only way to do it) retrieve
239                # SO_ERROR and thereby break twisted.internet.tcp.BaseClient,
240                # which needs SO_ERROR to tell it whether or not it needs to
241                # call connect_ex a second time.
242                _preserveSOError
243            )
244            src = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cfs, 0)
245            ctx.append(src)
246            CFRunLoopAddSource(self._cfrunloop, src, kCFRunLoopCommonModes)
247            CFSocketDisableCallBacks(
248                cfs,
249                kCFSocketReadCallBack | kCFSocketWriteCallBack |
250                kCFSocketConnectCallBack
251            )
252            rw = [False, False]
253            self._idmap[id(descr)] = fd
254            self._fdmap[fd] = src, cfs, descr, rw
255        rw[self._flag2idx(flag)] = True
256        CFSocketEnableCallBacks(cfs, flag)
257
258
259    def _flag2idx(self, flag):
260        """
261        Convert a C{kCFSocket...} constant to an index into the read/write
262        state list (C{_READ} or C{_WRITE}) (the 4th element of the value of
263        C{self._fdmap}).
264
265        @param flag: C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}
266
267        @return: C{_READ} or C{_WRITE}
268        """
269        return {kCFSocketReadCallBack: _READ,
270                kCFSocketWriteCallBack: _WRITE}[flag]
271
272
273    def _unwatchFD(self, fd, descr, flag):
274        """
275        Unregister a file descriptor with the L{CFRunLoop}, or modify its state
276        so that it's listening for only one notification (read or write) as
277        opposed to both; used to implement C{removeReader} and C{removeWriter}.
278
279        @param fd: a file descriptor
280
281        @type fd: C{int}
282
283        @param descr: an L{IReadDescriptor} or L{IWriteDescriptor}
284
285        @param flag: L{kCFSocketWriteCallBack} L{kCFSocketReadCallBack}
286        """
287        if id(descr) not in self._idmap:
288            return
289        if fd == -1:
290            # need to deal with it in this case, I think.
291            realfd = self._idmap[id(descr)]
292        else:
293            realfd = fd
294        src, cfs, descr, rw = self._fdmap[realfd]
295        CFSocketDisableCallBacks(cfs, flag)
296        rw[self._flag2idx(flag)] = False
297        if not rw[_READ] and not rw[_WRITE]:
298            del self._idmap[id(descr)]
299            del self._fdmap[realfd]
300            CFRunLoopRemoveSource(self._cfrunloop, src, kCFRunLoopCommonModes)
301            CFSocketInvalidate(cfs)
302
303
304    def addReader(self, reader):
305        """
306        Implement L{IReactorFDSet.addReader}.
307        """
308        self._watchFD(reader.fileno(), reader, kCFSocketReadCallBack)
309
310
311    def addWriter(self, writer):
312        """
313        Implement L{IReactorFDSet.addWriter}.
314        """
315        self._watchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
316
317
318    def removeReader(self, reader):
319        """
320        Implement L{IReactorFDSet.removeReader}.
321        """
322        self._unwatchFD(reader.fileno(), reader, kCFSocketReadCallBack)
323
324
325    def removeWriter(self, writer):
326        """
327        Implement L{IReactorFDSet.removeWriter}.
328        """
329        self._unwatchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
330
331
332    def removeAll(self):
333        """
334        Implement L{IReactorFDSet.removeAll}.
335        """
336        allDesc = set([descr for src, cfs, descr, rw in self._fdmap.values()])
337        allDesc -= set(self._internalReaders)
338        for desc in allDesc:
339            self.removeReader(desc)
340            self.removeWriter(desc)
341        return list(allDesc)
342
343
344    def getReaders(self):
345        """
346        Implement L{IReactorFDSet.getReaders}.
347        """
348        return [descr for src, cfs, descr, rw in self._fdmap.values()
349                if rw[_READ]]
350
351
352    def getWriters(self):
353        """
354        Implement L{IReactorFDSet.getWriters}.
355        """
356        return [descr for src, cfs, descr, rw in self._fdmap.values()
357                if rw[_WRITE]]
358
359
360    def _moveCallLaterSooner(self, tple):
361        """
362        Override L{PosixReactorBase}'s implementation of L{IDelayedCall.reset}
363        so that it will immediately reschedule.  Normally
364        C{_moveCallLaterSooner} depends on the fact that C{runUntilCurrent} is
365        always run before the mainloop goes back to sleep, so this forces it to
366        immediately recompute how long the loop needs to stay asleep.
367        """
368        result = PosixReactorBase._moveCallLaterSooner(self, tple)
369        self._scheduleSimulate()
370        return result
371
372
373    _inCFLoop = False
374
375    def mainLoop(self):
376        """
377        Run the runner (L{CFRunLoopRun} or something that calls it), which runs
378        the run loop until C{crash()} is called.
379        """
380        self._inCFLoop = True
381        try:
382            self._runner()
383        finally:
384            self._inCFLoop = False
385
386
387    _currentSimulator = None
388
389    def _scheduleSimulate(self, force=False):
390        """
391        Schedule a call to C{self.runUntilCurrent}.  This will cancel the
392        currently scheduled call if it is already scheduled.
393
394        @param force: Even if there are no timed calls, make sure that
395            C{runUntilCurrent} runs immediately (in a 0-seconds-from-now
396            {CFRunLoopTimer}).  This is necessary for calls which need to
397            trigger behavior of C{runUntilCurrent} other than running timed
398            calls, such as draining the thread call queue or calling C{crash()}
399            when the appropriate flags are set.
400
401        @type force: C{bool}
402        """
403        if self._currentSimulator is not None:
404            CFRunLoopTimerInvalidate(self._currentSimulator)
405            self._currentSimulator = None
406        timeout = self.timeout()
407        if force:
408            timeout = 0.0
409        if timeout is not None:
410            fireDate = (CFAbsoluteTimeGetCurrent() + timeout)
411            def simulate(cftimer, extra):
412                self._currentSimulator = None
413                self.runUntilCurrent()
414                self._scheduleSimulate()
415            c = self._currentSimulator = CFRunLoopTimerCreate(
416                kCFAllocatorDefault, fireDate,
417                0, 0, 0, simulate, None
418            )
419            CFRunLoopAddTimer(self._cfrunloop, c, kCFRunLoopCommonModes)
420
421
422    def callLater(self, _seconds, _f, *args, **kw):
423        """
424        Implement L{IReactorTime.callLater}.
425        """
426        delayedCall = PosixReactorBase.callLater(
427            self, _seconds, _f, *args, **kw
428        )
429        self._scheduleSimulate()
430        return delayedCall
431
432
433    def stop(self):
434        """
435        Implement L{IReactorCore.stop}.
436        """
437        PosixReactorBase.stop(self)
438        self._scheduleSimulate(True)
439
440
441    def crash(self):
442        """
443        Implement L{IReactorCore.crash}
444        """
445        wasStarted = self._started
446        PosixReactorBase.crash(self)
447        if self._inCFLoop:
448            self._stopNow()
449        else:
450            if wasStarted:
451                self.callLater(0, self._stopNow)
452
453
454    def _stopNow(self):
455        """
456        Immediately stop the CFRunLoop (which must be running!).
457        """
458        CFRunLoopStop(self._cfrunloop)
459
460
461    def iterate(self, delay=0):
462        """
463        Emulate the behavior of C{iterate()} for things that want to call it,
464        by letting the loop run for a little while and then scheduling a timed
465        call to exit it.
466        """
467        self.callLater(delay, self._stopNow)
468        self.mainLoop()
469
470
471
472def install(runLoop=None, runner=None):
473    """
474    Configure the twisted mainloop to be run inside CFRunLoop.
475
476    @param runLoop: the run loop to use.
477
478    @param runner: the function to call in order to actually invoke the main
479        loop.  This will default to L{CFRunLoopRun} if not specified.  However,
480        this is not an appropriate choice for GUI applications, as you need to
481        run NSApplicationMain (or something like it).  For example, to run the
482        Twisted mainloop in a PyObjC application, your C{main.py} should look
483        something like this::
484
485            from PyObjCTools import AppHelper
486            from twisted.internet.cfreactor import install
487            install(runner=AppHelper.runEventLoop)
488            # initialize your application
489            reactor.run()
490
491    @return: The installed reactor.
492
493    @rtype: L{CFReactor}
494    """
495
496    reactor = CFReactor(runLoop=runLoop, runner=runner)
497    from twisted.internet.main import installReactor
498    installReactor(reactor)
499    return reactor
500
501
502