1# -*- test-case-name: twisted.internet.test -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6This module provides base support for Twisted to interact with the glib/gtk 7mainloops. 8 9The classes in this module should not be used directly, but rather you should 10import gireactor or gtk3reactor for GObject Introspection based applications, 11or glib2reactor or gtk2reactor for applications using legacy static bindings. 12""" 13 14 15import sys 16 17from zope.interface import implementer 18 19from twisted.internet import base, posixbase, selectreactor 20from twisted.internet.interfaces import IReactorFDSet 21from twisted.python import log 22 23 24def ensureNotImported(moduleNames, errorMessage, preventImports=[]): 25 """ 26 Check whether the given modules were imported, and if requested, ensure 27 they will not be importable in the future. 28 29 @param moduleNames: A list of module names we make sure aren't imported. 30 @type moduleNames: C{list} of C{str} 31 32 @param preventImports: A list of module name whose future imports should 33 be prevented. 34 @type preventImports: C{list} of C{str} 35 36 @param errorMessage: Message to use when raising an C{ImportError}. 37 @type errorMessage: C{str} 38 39 @raise ImportError: with given error message if a given module name 40 has already been imported. 41 """ 42 for name in moduleNames: 43 if sys.modules.get(name) is not None: 44 raise ImportError(errorMessage) 45 46 # Disable module imports to avoid potential problems. 47 for name in preventImports: 48 sys.modules[name] = None 49 50 51class GlibWaker(posixbase._UnixWaker): 52 """ 53 Run scheduled events after waking up. 54 """ 55 56 def doRead(self): 57 posixbase._UnixWaker.doRead(self) 58 self.reactor._simulate() 59 60 61@implementer(IReactorFDSet) 62class GlibReactorBase(posixbase.PosixReactorBase, posixbase._PollLikeMixin): 63 """ 64 Base class for GObject event loop reactors. 65 66 Notification for I/O events (reads and writes on file descriptors) is done 67 by the gobject-based event loop. File descriptors are registered with 68 gobject with the appropriate flags for read/write/disconnect notification. 69 70 Time-based events, the results of C{callLater} and C{callFromThread}, are 71 handled differently. Rather than registering each event with gobject, a 72 single gobject timeout is registered for the earliest scheduled event, the 73 output of C{reactor.timeout()}. For example, if there are timeouts in 1, 2 74 and 3.4 seconds, a single timeout is registered for 1 second in the 75 future. When this timeout is hit, C{_simulate} is called, which calls the 76 appropriate Twisted-level handlers, and a new timeout is added to gobject 77 by the C{_reschedule} method. 78 79 To handle C{callFromThread} events, we use a custom waker that calls 80 C{_simulate} whenever it wakes up. 81 82 @ivar _sources: A dictionary mapping L{FileDescriptor} instances to 83 GSource handles. 84 85 @ivar _reads: A set of L{FileDescriptor} instances currently monitored for 86 reading. 87 88 @ivar _writes: A set of L{FileDescriptor} instances currently monitored for 89 writing. 90 91 @ivar _simtag: A GSource handle for the next L{simulate} call. 92 """ 93 94 # Install a waker that knows it needs to call C{_simulate} in order to run 95 # callbacks queued from a thread: 96 _wakerFactory = GlibWaker 97 98 def __init__(self, glib_module, gtk_module, useGtk=False): 99 self._simtag = None 100 self._reads = set() 101 self._writes = set() 102 self._sources = {} 103 self._glib = glib_module 104 self._gtk = gtk_module 105 posixbase.PosixReactorBase.__init__(self) 106 107 self._source_remove = self._glib.source_remove 108 self._timeout_add = self._glib.timeout_add 109 110 def _mainquit(): 111 if self._gtk.main_level(): 112 self._gtk.main_quit() 113 114 if useGtk: 115 self._pending = self._gtk.events_pending 116 self._iteration = self._gtk.main_iteration_do 117 self._crash = _mainquit 118 self._run = self._gtk.main 119 else: 120 self.context = self._glib.main_context_default() 121 self._pending = self.context.pending 122 self._iteration = self.context.iteration 123 self.loop = self._glib.MainLoop() 124 self._crash = lambda: self._glib.idle_add(self.loop.quit) 125 self._run = self.loop.run 126 127 def _handleSignals(self): 128 # First, install SIGINT and friends: 129 base._SignalReactorMixin._handleSignals(self) 130 # Next, since certain versions of gtk will clobber our signal handler, 131 # set all signal handlers again after the event loop has started to 132 # ensure they're *really* set. We don't call this twice so we don't 133 # leak file descriptors created in the SIGCHLD initialization: 134 self.callLater(0, posixbase.PosixReactorBase._handleSignals, self) 135 136 # The input_add function in pygtk1 checks for objects with a 137 # 'fileno' method and, if present, uses the result of that method 138 # as the input source. The pygtk2 input_add does not do this. The 139 # function below replicates the pygtk1 functionality. 140 141 # In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and 142 # g_io_add_watch() takes different condition bitfields than 143 # gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this 144 # bug. 145 def input_add(self, source, condition, callback): 146 if hasattr(source, "fileno"): 147 # handle python objects 148 def wrapper(ignored, condition): 149 return callback(source, condition) 150 151 fileno = source.fileno() 152 else: 153 fileno = source 154 wrapper = callback 155 return self._glib.io_add_watch( 156 fileno, condition, wrapper, priority=self._glib.PRIORITY_DEFAULT_IDLE 157 ) 158 159 def _ioEventCallback(self, source, condition): 160 """ 161 Called by event loop when an I/O event occurs. 162 """ 163 log.callWithLogger(source, self._doReadOrWrite, source, source, condition) 164 return True # True = don't auto-remove the source 165 166 def _add(self, source, primary, other, primaryFlag, otherFlag): 167 """ 168 Add the given L{FileDescriptor} for monitoring either for reading or 169 writing. If the file is already monitored for the other operation, we 170 delete the previous registration and re-register it for both reading 171 and writing. 172 """ 173 if source in primary: 174 return 175 flags = primaryFlag 176 if source in other: 177 self._source_remove(self._sources[source]) 178 flags |= otherFlag 179 self._sources[source] = self.input_add(source, flags, self._ioEventCallback) 180 primary.add(source) 181 182 def addReader(self, reader): 183 """ 184 Add a L{FileDescriptor} for monitoring of data available to read. 185 """ 186 self._add(reader, self._reads, self._writes, self.INFLAGS, self.OUTFLAGS) 187 188 def addWriter(self, writer): 189 """ 190 Add a L{FileDescriptor} for monitoring ability to write data. 191 """ 192 self._add(writer, self._writes, self._reads, self.OUTFLAGS, self.INFLAGS) 193 194 def getReaders(self): 195 """ 196 Retrieve the list of current L{FileDescriptor} monitored for reading. 197 """ 198 return list(self._reads) 199 200 def getWriters(self): 201 """ 202 Retrieve the list of current L{FileDescriptor} monitored for writing. 203 """ 204 return list(self._writes) 205 206 def removeAll(self): 207 """ 208 Remove monitoring for all registered L{FileDescriptor}s. 209 """ 210 return self._removeAll(self._reads, self._writes) 211 212 def _remove(self, source, primary, other, flags): 213 """ 214 Remove monitoring the given L{FileDescriptor} for either reading or 215 writing. If it's still monitored for the other operation, we 216 re-register the L{FileDescriptor} for only that operation. 217 """ 218 if source not in primary: 219 return 220 self._source_remove(self._sources[source]) 221 primary.remove(source) 222 if source in other: 223 self._sources[source] = self.input_add(source, flags, self._ioEventCallback) 224 else: 225 self._sources.pop(source) 226 227 def removeReader(self, reader): 228 """ 229 Stop monitoring the given L{FileDescriptor} for reading. 230 """ 231 self._remove(reader, self._reads, self._writes, self.OUTFLAGS) 232 233 def removeWriter(self, writer): 234 """ 235 Stop monitoring the given L{FileDescriptor} for writing. 236 """ 237 self._remove(writer, self._writes, self._reads, self.INFLAGS) 238 239 def iterate(self, delay=0): 240 """ 241 One iteration of the event loop, for trial's use. 242 243 This is not used for actual reactor runs. 244 """ 245 self.runUntilCurrent() 246 while self._pending(): 247 self._iteration(0) 248 249 def crash(self): 250 """ 251 Crash the reactor. 252 """ 253 posixbase.PosixReactorBase.crash(self) 254 self._crash() 255 256 def stop(self): 257 """ 258 Stop the reactor. 259 """ 260 posixbase.PosixReactorBase.stop(self) 261 # The base implementation only sets a flag, to ensure shutting down is 262 # not reentrant. Unfortunately, this flag is not meaningful to the 263 # gobject event loop. We therefore call wakeUp() to ensure the event 264 # loop will call back into Twisted once this iteration is done. This 265 # will result in self.runUntilCurrent() being called, where the stop 266 # flag will trigger the actual shutdown process, eventually calling 267 # crash() which will do the actual gobject event loop shutdown. 268 self.wakeUp() 269 270 def run(self, installSignalHandlers=True): 271 """ 272 Run the reactor. 273 """ 274 self.callWhenRunning(self._reschedule) 275 self.startRunning(installSignalHandlers=installSignalHandlers) 276 if self._started: 277 self._run() 278 279 def callLater(self, *args, **kwargs): 280 """ 281 Schedule a C{DelayedCall}. 282 """ 283 result = posixbase.PosixReactorBase.callLater(self, *args, **kwargs) 284 # Make sure we'll get woken up at correct time to handle this new 285 # scheduled call: 286 self._reschedule() 287 return result 288 289 def _reschedule(self): 290 """ 291 Schedule a glib timeout for C{_simulate}. 292 """ 293 if self._simtag is not None: 294 self._source_remove(self._simtag) 295 self._simtag = None 296 timeout = self.timeout() 297 if timeout is not None: 298 self._simtag = self._timeout_add( 299 int(timeout * 1000), 300 self._simulate, 301 priority=self._glib.PRIORITY_DEFAULT_IDLE, 302 ) 303 304 def _simulate(self): 305 """ 306 Run timers, and then reschedule glib timeout for next scheduled event. 307 """ 308 self.runUntilCurrent() 309 self._reschedule() 310 311 312class PortableGlibReactorBase(selectreactor.SelectReactor): 313 """ 314 Base class for GObject event loop reactors that works on Windows. 315 316 Sockets aren't supported by GObject's input_add on Win32. 317 """ 318 319 def __init__(self, glib_module, gtk_module, useGtk=False): 320 self._simtag = None 321 self._glib = glib_module 322 self._gtk = gtk_module 323 selectreactor.SelectReactor.__init__(self) 324 325 self._source_remove = self._glib.source_remove 326 self._timeout_add = self._glib.timeout_add 327 328 def _mainquit(): 329 if self._gtk.main_level(): 330 self._gtk.main_quit() 331 332 if useGtk: 333 self._crash = _mainquit 334 self._run = self._gtk.main 335 else: 336 self.loop = self._glib.MainLoop() 337 self._crash = lambda: self._glib.idle_add(self.loop.quit) 338 self._run = self.loop.run 339 340 def crash(self): 341 selectreactor.SelectReactor.crash(self) 342 self._crash() 343 344 def run(self, installSignalHandlers=True): 345 self.startRunning(installSignalHandlers=installSignalHandlers) 346 self._timeout_add(0, self.simulate) 347 if self._started: 348 self._run() 349 350 def simulate(self): 351 """ 352 Run simulation loops and reschedule callbacks. 353 """ 354 if self._simtag is not None: 355 self._source_remove(self._simtag) 356 self.iterate() 357 timeout = min(self.timeout(), 0.01) 358 if timeout is None: 359 timeout = 0.01 360 self._simtag = self._timeout_add( 361 int(timeout * 1000), 362 self.simulate, 363 priority=self._glib.PRIORITY_DEFAULT_IDLE, 364 ) 365