1# Author: Ovidiu Predescu 2# Date: July 2011 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15"""Bridges between the Twisted reactor and Tornado IOLoop. 16 17This module lets you run applications and libraries written for 18Twisted in a Tornado application. It can be used in two modes, 19depending on which library's underlying event loop you want to use. 20 21This module has been tested with Twisted versions 11.0.0 and newer. 22""" 23# pylint: skip-file 24 25from __future__ import absolute_import, division, print_function 26 27import datetime 28import functools 29import numbers 30import socket 31import sys 32 33import twisted.internet.abstract # type: ignore 34from twisted.internet.defer import Deferred # type: ignore 35from twisted.internet.posixbase import PosixReactorBase # type: ignore 36from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor # type: ignore 37from twisted.python import failure, log # type: ignore 38from twisted.internet import error # type: ignore 39import twisted.names.cache # type: ignore 40import twisted.names.client # type: ignore 41import twisted.names.hosts # type: ignore 42import twisted.names.resolve # type: ignore 43 44from zope.interface import implementer # type: ignore 45 46from salt.ext.tornado.concurrent import Future 47from salt.ext.tornado.escape import utf8 48from salt.ext.tornado import gen 49import salt.ext.tornado.ioloop 50from salt.ext.tornado.log import app_log 51from salt.ext.tornado.netutil import Resolver 52from salt.ext.tornado.stack_context import NullContext, wrap 53from salt.ext.tornado.ioloop import IOLoop 54from salt.ext.tornado.util import timedelta_to_seconds 55 56 57@implementer(IDelayedCall) 58class TornadoDelayedCall(object): 59 """DelayedCall object for Tornado.""" 60 def __init__(self, reactor, seconds, f, *args, **kw): 61 self._reactor = reactor 62 self._func = functools.partial(f, *args, **kw) 63 self._time = self._reactor.seconds() + seconds 64 self._timeout = self._reactor._io_loop.add_timeout(self._time, 65 self._called) 66 self._active = True 67 68 def _called(self): 69 self._active = False 70 self._reactor._removeDelayedCall(self) 71 try: 72 self._func() 73 except: 74 app_log.error("_called caught exception", exc_info=True) 75 76 def getTime(self): 77 return self._time 78 79 def cancel(self): 80 self._active = False 81 self._reactor._io_loop.remove_timeout(self._timeout) 82 self._reactor._removeDelayedCall(self) 83 84 def delay(self, seconds): 85 self._reactor._io_loop.remove_timeout(self._timeout) 86 self._time += seconds 87 self._timeout = self._reactor._io_loop.add_timeout(self._time, 88 self._called) 89 90 def reset(self, seconds): 91 self._reactor._io_loop.remove_timeout(self._timeout) 92 self._time = self._reactor.seconds() + seconds 93 self._timeout = self._reactor._io_loop.add_timeout(self._time, 94 self._called) 95 96 def active(self): 97 return self._active 98 99 100@implementer(IReactorTime, IReactorFDSet) 101class TornadoReactor(PosixReactorBase): 102 """Twisted reactor built on the Tornado IOLoop. 103 104 `TornadoReactor` implements the Twisted reactor interface on top of 105 the Tornado IOLoop. To use it, simply call `install` at the beginning 106 of the application:: 107 108 import tornado.platform.twisted 109 tornado.platform.twisted.install() 110 from twisted.internet import reactor 111 112 When the app is ready to start, call ``IOLoop.current().start()`` 113 instead of ``reactor.run()``. 114 115 It is also possible to create a non-global reactor by calling 116 ``tornado.platform.twisted.TornadoReactor(io_loop)``. However, if 117 the `.IOLoop` and reactor are to be short-lived (such as those used in 118 unit tests), additional cleanup may be required. Specifically, it is 119 recommended to call:: 120 121 reactor.fireSystemEvent('shutdown') 122 reactor.disconnectAll() 123 124 before closing the `.IOLoop`. 125 126 .. versionchanged:: 4.1 127 The ``io_loop`` argument is deprecated. 128 """ 129 def __init__(self, io_loop=None): 130 if not io_loop: 131 io_loop = salt.ext.tornado.ioloop.IOLoop.current() 132 self._io_loop = io_loop 133 self._readers = {} # map of reader objects to fd 134 self._writers = {} # map of writer objects to fd 135 self._fds = {} # a map of fd to a (reader, writer) tuple 136 self._delayedCalls = {} 137 PosixReactorBase.__init__(self) 138 self.addSystemEventTrigger('during', 'shutdown', self.crash) 139 140 # IOLoop.start() bypasses some of the reactor initialization. 141 # Fire off the necessary events if they weren't already triggered 142 # by reactor.run(). 143 def start_if_necessary(): 144 if not self._started: 145 self.fireSystemEvent('startup') 146 self._io_loop.add_callback(start_if_necessary) 147 148 # IReactorTime 149 def seconds(self): 150 return self._io_loop.time() 151 152 def callLater(self, seconds, f, *args, **kw): 153 dc = TornadoDelayedCall(self, seconds, f, *args, **kw) 154 self._delayedCalls[dc] = True 155 return dc 156 157 def getDelayedCalls(self): 158 return [x for x in self._delayedCalls if x._active] 159 160 def _removeDelayedCall(self, dc): 161 if dc in self._delayedCalls: 162 del self._delayedCalls[dc] 163 164 # IReactorThreads 165 def callFromThread(self, f, *args, **kw): 166 assert callable(f), "%s is not callable" % f 167 with NullContext(): 168 # This NullContext is mainly for an edge case when running 169 # TwistedIOLoop on top of a TornadoReactor. 170 # TwistedIOLoop.add_callback uses reactor.callFromThread and 171 # should not pick up additional StackContexts along the way. 172 self._io_loop.add_callback(f, *args, **kw) 173 174 # We don't need the waker code from the super class, Tornado uses 175 # its own waker. 176 def installWaker(self): 177 pass 178 179 def wakeUp(self): 180 pass 181 182 # IReactorFDSet 183 def _invoke_callback(self, fd, events): 184 if fd not in self._fds: 185 return 186 (reader, writer) = self._fds[fd] 187 if reader: 188 err = None 189 if reader.fileno() == -1: 190 err = error.ConnectionLost() 191 elif events & IOLoop.READ: 192 err = log.callWithLogger(reader, reader.doRead) 193 if err is None and events & IOLoop.ERROR: 194 err = error.ConnectionLost() 195 if err is not None: 196 self.removeReader(reader) 197 reader.readConnectionLost(failure.Failure(err)) 198 if writer: 199 err = None 200 if writer.fileno() == -1: 201 err = error.ConnectionLost() 202 elif events & IOLoop.WRITE: 203 err = log.callWithLogger(writer, writer.doWrite) 204 if err is None and events & IOLoop.ERROR: 205 err = error.ConnectionLost() 206 if err is not None: 207 self.removeWriter(writer) 208 writer.writeConnectionLost(failure.Failure(err)) 209 210 def addReader(self, reader): 211 if reader in self._readers: 212 # Don't add the reader if it's already there 213 return 214 fd = reader.fileno() 215 self._readers[reader] = fd 216 if fd in self._fds: 217 (_, writer) = self._fds[fd] 218 self._fds[fd] = (reader, writer) 219 if writer: 220 # We already registered this fd for write events, 221 # update it for read events as well. 222 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) 223 else: 224 with NullContext(): 225 self._fds[fd] = (reader, None) 226 self._io_loop.add_handler(fd, self._invoke_callback, 227 IOLoop.READ) 228 229 def addWriter(self, writer): 230 if writer in self._writers: 231 return 232 fd = writer.fileno() 233 self._writers[writer] = fd 234 if fd in self._fds: 235 (reader, _) = self._fds[fd] 236 self._fds[fd] = (reader, writer) 237 if reader: 238 # We already registered this fd for read events, 239 # update it for write events as well. 240 self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) 241 else: 242 with NullContext(): 243 self._fds[fd] = (None, writer) 244 self._io_loop.add_handler(fd, self._invoke_callback, 245 IOLoop.WRITE) 246 247 def removeReader(self, reader): 248 if reader in self._readers: 249 fd = self._readers.pop(reader) 250 (_, writer) = self._fds[fd] 251 if writer: 252 # We have a writer so we need to update the IOLoop for 253 # write events only. 254 self._fds[fd] = (None, writer) 255 self._io_loop.update_handler(fd, IOLoop.WRITE) 256 else: 257 # Since we have no writer registered, we remove the 258 # entry from _fds and unregister the handler from the 259 # IOLoop 260 del self._fds[fd] 261 self._io_loop.remove_handler(fd) 262 263 def removeWriter(self, writer): 264 if writer in self._writers: 265 fd = self._writers.pop(writer) 266 (reader, _) = self._fds[fd] 267 if reader: 268 # We have a reader so we need to update the IOLoop for 269 # read events only. 270 self._fds[fd] = (reader, None) 271 self._io_loop.update_handler(fd, IOLoop.READ) 272 else: 273 # Since we have no reader registered, we remove the 274 # entry from the _fds and unregister the handler from 275 # the IOLoop. 276 del self._fds[fd] 277 self._io_loop.remove_handler(fd) 278 279 def removeAll(self): 280 return self._removeAll(self._readers, self._writers) 281 282 def getReaders(self): 283 return self._readers.keys() 284 285 def getWriters(self): 286 return self._writers.keys() 287 288 # The following functions are mainly used in twisted-style test cases; 289 # it is expected that most users of the TornadoReactor will call 290 # IOLoop.start() instead of Reactor.run(). 291 def stop(self): 292 PosixReactorBase.stop(self) 293 fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown") 294 self._io_loop.add_callback(fire_shutdown) 295 296 def crash(self): 297 PosixReactorBase.crash(self) 298 self._io_loop.stop() 299 300 def doIteration(self, delay): 301 raise NotImplementedError("doIteration") 302 303 def mainLoop(self): 304 # Since this class is intended to be used in applications 305 # where the top-level event loop is ``io_loop.start()`` rather 306 # than ``reactor.run()``, it is implemented a little 307 # differently than other Twisted reactors. We override 308 # ``mainLoop`` instead of ``doIteration`` and must implement 309 # timed call functionality on top of `.IOLoop.add_timeout` 310 # rather than using the implementation in 311 # ``PosixReactorBase``. 312 self._io_loop.start() 313 314 315class _TestReactor(TornadoReactor): 316 """Subclass of TornadoReactor for use in unittests. 317 318 This can't go in the test.py file because of import-order dependencies 319 with the Twisted reactor test builder. 320 """ 321 def __init__(self): 322 # always use a new ioloop 323 super(_TestReactor, self).__init__(IOLoop()) 324 325 def listenTCP(self, port, factory, backlog=50, interface=''): 326 # default to localhost to avoid firewall prompts on the mac 327 if not interface: 328 interface = '127.0.0.1' 329 return super(_TestReactor, self).listenTCP( 330 port, factory, backlog=backlog, interface=interface) 331 332 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): 333 if not interface: 334 interface = '127.0.0.1' 335 return super(_TestReactor, self).listenUDP( 336 port, protocol, interface=interface, maxPacketSize=maxPacketSize) 337 338 339def install(io_loop=None): 340 """Install this package as the default Twisted reactor. 341 342 ``install()`` must be called very early in the startup process, 343 before most other twisted-related imports. Conversely, because it 344 initializes the `.IOLoop`, it cannot be called before 345 `.fork_processes` or multi-process `~.TCPServer.start`. These 346 conflicting requirements make it difficult to use `.TornadoReactor` 347 in multi-process mode, and an external process manager such as 348 ``supervisord`` is recommended instead. 349 350 .. versionchanged:: 4.1 351 The ``io_loop`` argument is deprecated. 352 353 """ 354 if not io_loop: 355 io_loop = salt.ext.tornado.ioloop.IOLoop.current() 356 reactor = TornadoReactor(io_loop) 357 from twisted.internet.main import installReactor # type: ignore 358 installReactor(reactor) 359 return reactor 360 361 362@implementer(IReadDescriptor, IWriteDescriptor) 363class _FD(object): 364 def __init__(self, fd, fileobj, handler): 365 self.fd = fd 366 self.fileobj = fileobj 367 self.handler = handler 368 self.reading = False 369 self.writing = False 370 self.lost = False 371 372 def fileno(self): 373 return self.fd 374 375 def doRead(self): 376 if not self.lost: 377 self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.READ) 378 379 def doWrite(self): 380 if not self.lost: 381 self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.WRITE) 382 383 def connectionLost(self, reason): 384 if not self.lost: 385 self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.ERROR) 386 self.lost = True 387 388 def logPrefix(self): 389 return '' 390 391 392class TwistedIOLoop(salt.ext.tornado.ioloop.IOLoop): 393 """IOLoop implementation that runs on Twisted. 394 395 `TwistedIOLoop` implements the Tornado IOLoop interface on top of 396 the Twisted reactor. Recommended usage:: 397 398 from salt.ext.tornado.platform.twisted import TwistedIOLoop 399 from twisted.internet import reactor 400 TwistedIOLoop().install() 401 # Set up your tornado application as usual using `IOLoop.instance` 402 reactor.run() 403 404 Uses the global Twisted reactor by default. To create multiple 405 ``TwistedIOLoops`` in the same process, you must pass a unique reactor 406 when constructing each one. 407 408 Not compatible with `tornado.process.Subprocess.set_exit_callback` 409 because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict 410 with each other. 411 412 See also :meth:`tornado.ioloop.IOLoop.install` for general notes on 413 installing alternative IOLoops. 414 """ 415 def initialize(self, reactor=None, **kwargs): 416 super(TwistedIOLoop, self).initialize(**kwargs) 417 if reactor is None: 418 import twisted.internet.reactor # type: ignore 419 reactor = twisted.internet.reactor 420 self.reactor = reactor 421 self.fds = {} 422 423 def close(self, all_fds=False): 424 fds = self.fds 425 self.reactor.removeAll() 426 for c in self.reactor.getDelayedCalls(): 427 c.cancel() 428 if all_fds: 429 for fd in fds.values(): 430 self.close_fd(fd.fileobj) 431 432 def add_handler(self, fd, handler, events): 433 if fd in self.fds: 434 raise ValueError('fd %s added twice' % fd) 435 fd, fileobj = self.split_fd(fd) 436 self.fds[fd] = _FD(fd, fileobj, wrap(handler)) 437 if events & salt.ext.tornado.ioloop.IOLoop.READ: 438 self.fds[fd].reading = True 439 self.reactor.addReader(self.fds[fd]) 440 if events & salt.ext.tornado.ioloop.IOLoop.WRITE: 441 self.fds[fd].writing = True 442 self.reactor.addWriter(self.fds[fd]) 443 444 def update_handler(self, fd, events): 445 fd, fileobj = self.split_fd(fd) 446 if events & salt.ext.tornado.ioloop.IOLoop.READ: 447 if not self.fds[fd].reading: 448 self.fds[fd].reading = True 449 self.reactor.addReader(self.fds[fd]) 450 else: 451 if self.fds[fd].reading: 452 self.fds[fd].reading = False 453 self.reactor.removeReader(self.fds[fd]) 454 if events & salt.ext.tornado.ioloop.IOLoop.WRITE: 455 if not self.fds[fd].writing: 456 self.fds[fd].writing = True 457 self.reactor.addWriter(self.fds[fd]) 458 else: 459 if self.fds[fd].writing: 460 self.fds[fd].writing = False 461 self.reactor.removeWriter(self.fds[fd]) 462 463 def remove_handler(self, fd): 464 fd, fileobj = self.split_fd(fd) 465 if fd not in self.fds: 466 return 467 self.fds[fd].lost = True 468 if self.fds[fd].reading: 469 self.reactor.removeReader(self.fds[fd]) 470 if self.fds[fd].writing: 471 self.reactor.removeWriter(self.fds[fd]) 472 del self.fds[fd] 473 474 def start(self): 475 old_current = IOLoop.current(instance=False) 476 try: 477 self._setup_logging() 478 self.make_current() 479 self.reactor.run() 480 finally: 481 if old_current is None: 482 IOLoop.clear_current() 483 else: 484 old_current.make_current() 485 486 def stop(self): 487 self.reactor.crash() 488 489 def add_timeout(self, deadline, callback, *args, **kwargs): 490 # This method could be simplified (since tornado 4.0) by 491 # overriding call_at instead of add_timeout, but we leave it 492 # for now as a test of backwards-compatibility. 493 if isinstance(deadline, numbers.Real): 494 delay = max(deadline - self.time(), 0) 495 elif isinstance(deadline, datetime.timedelta): 496 delay = timedelta_to_seconds(deadline) 497 else: 498 raise TypeError("Unsupported deadline %r") 499 return self.reactor.callLater( 500 delay, self._run_callback, 501 functools.partial(wrap(callback), *args, **kwargs)) 502 503 def remove_timeout(self, timeout): 504 if timeout.active(): 505 timeout.cancel() 506 507 def add_callback(self, callback, *args, **kwargs): 508 self.reactor.callFromThread( 509 self._run_callback, 510 functools.partial(wrap(callback), *args, **kwargs)) 511 512 def add_callback_from_signal(self, callback, *args, **kwargs): 513 self.add_callback(callback, *args, **kwargs) 514 515 516class TwistedResolver(Resolver): 517 """Twisted-based asynchronous resolver. 518 519 This is a non-blocking and non-threaded resolver. It is 520 recommended only when threads cannot be used, since it has 521 limitations compared to the standard ``getaddrinfo``-based 522 `~tornado.netutil.Resolver` and 523 `~tornado.netutil.ThreadedResolver`. Specifically, it returns at 524 most one result, and arguments other than ``host`` and ``family`` 525 are ignored. It may fail to resolve when ``family`` is not 526 ``socket.AF_UNSPEC``. 527 528 Requires Twisted 12.1 or newer. 529 530 .. versionchanged:: 4.1 531 The ``io_loop`` argument is deprecated. 532 """ 533 def initialize(self, io_loop=None): 534 self.io_loop = io_loop or IOLoop.current() 535 # partial copy of twisted.names.client.createResolver, which doesn't 536 # allow for a reactor to be passed in. 537 self.reactor = salt.ext.tornado.platform.twisted.TornadoReactor(io_loop) 538 539 host_resolver = twisted.names.hosts.Resolver('/etc/hosts') 540 cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor) 541 real_resolver = twisted.names.client.Resolver('/etc/resolv.conf', 542 reactor=self.reactor) 543 self.resolver = twisted.names.resolve.ResolverChain( 544 [host_resolver, cache_resolver, real_resolver]) 545 546 @gen.coroutine 547 def resolve(self, host, port, family=0): 548 # getHostByName doesn't accept IP addresses, so if the input 549 # looks like an IP address just return it immediately. 550 if twisted.internet.abstract.isIPAddress(host): 551 resolved = host 552 resolved_family = socket.AF_INET 553 elif twisted.internet.abstract.isIPv6Address(host): 554 resolved = host 555 resolved_family = socket.AF_INET6 556 else: 557 deferred = self.resolver.getHostByName(utf8(host)) 558 resolved = yield gen.Task(deferred.addBoth) 559 if isinstance(resolved, failure.Failure): 560 try: 561 resolved.raiseException() 562 except twisted.names.error.DomainError as e: 563 raise IOError(e) 564 elif twisted.internet.abstract.isIPAddress(resolved): 565 resolved_family = socket.AF_INET 566 elif twisted.internet.abstract.isIPv6Address(resolved): 567 resolved_family = socket.AF_INET6 568 else: 569 resolved_family = socket.AF_UNSPEC 570 if family != socket.AF_UNSPEC and family != resolved_family: 571 raise Exception('Requested socket family %d but got %d' % 572 (family, resolved_family)) 573 result = [ 574 (resolved_family, (resolved, port)), 575 ] 576 raise gen.Return(result) 577 578 579if hasattr(gen.convert_yielded, 'register'): 580 @gen.convert_yielded.register(Deferred) # type: ignore 581 def _(d): 582 f = Future() 583 584 def errback(failure): 585 try: 586 failure.raiseException() 587 # Should never happen, but just in case 588 raise Exception("errback called without error") 589 except: 590 f.set_exc_info(sys.exc_info()) 591 d.addCallbacks(f.set_result, errback) 592 return f 593