1# Author: Ovidiu Predescu
2# Date: July 2011
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""Bridges between the Twisted reactor and Tornado IOLoop.
16
17This module lets you run applications and libraries written for
18Twisted in a Tornado application.  It can be used in two modes,
19depending on which library's underlying event loop you want to use.
20
21This module has been tested with Twisted versions 11.0.0 and newer.
22"""
23# pylint: skip-file
24
25from __future__ import absolute_import, division, print_function
26
27import datetime
28import functools
29import numbers
30import socket
31import sys
32
33import twisted.internet.abstract  # type: ignore
34from twisted.internet.defer import Deferred  # type: ignore
35from twisted.internet.posixbase import PosixReactorBase  # type: ignore
36from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor  # type: ignore
37from twisted.python import failure, log  # type: ignore
38from twisted.internet import error  # type: ignore
39import twisted.names.cache  # type: ignore
40import twisted.names.client  # type: ignore
41import twisted.names.hosts  # type: ignore
42import twisted.names.resolve  # type: ignore
43
44from zope.interface import implementer  # type: ignore
45
46from salt.ext.tornado.concurrent import Future
47from salt.ext.tornado.escape import utf8
48from salt.ext.tornado import gen
49import salt.ext.tornado.ioloop
50from salt.ext.tornado.log import app_log
51from salt.ext.tornado.netutil import Resolver
52from salt.ext.tornado.stack_context import NullContext, wrap
53from salt.ext.tornado.ioloop import IOLoop
54from salt.ext.tornado.util import timedelta_to_seconds
55
56
57@implementer(IDelayedCall)
58class TornadoDelayedCall(object):
59    """DelayedCall object for Tornado."""
60    def __init__(self, reactor, seconds, f, *args, **kw):
61        self._reactor = reactor
62        self._func = functools.partial(f, *args, **kw)
63        self._time = self._reactor.seconds() + seconds
64        self._timeout = self._reactor._io_loop.add_timeout(self._time,
65                                                           self._called)
66        self._active = True
67
68    def _called(self):
69        self._active = False
70        self._reactor._removeDelayedCall(self)
71        try:
72            self._func()
73        except:
74            app_log.error("_called caught exception", exc_info=True)
75
76    def getTime(self):
77        return self._time
78
79    def cancel(self):
80        self._active = False
81        self._reactor._io_loop.remove_timeout(self._timeout)
82        self._reactor._removeDelayedCall(self)
83
84    def delay(self, seconds):
85        self._reactor._io_loop.remove_timeout(self._timeout)
86        self._time += seconds
87        self._timeout = self._reactor._io_loop.add_timeout(self._time,
88                                                           self._called)
89
90    def reset(self, seconds):
91        self._reactor._io_loop.remove_timeout(self._timeout)
92        self._time = self._reactor.seconds() + seconds
93        self._timeout = self._reactor._io_loop.add_timeout(self._time,
94                                                           self._called)
95
96    def active(self):
97        return self._active
98
99
100@implementer(IReactorTime, IReactorFDSet)
101class TornadoReactor(PosixReactorBase):
102    """Twisted reactor built on the Tornado IOLoop.
103
104    `TornadoReactor` implements the Twisted reactor interface on top of
105    the Tornado IOLoop.  To use it, simply call `install` at the beginning
106    of the application::
107
108        import tornado.platform.twisted
109        tornado.platform.twisted.install()
110        from twisted.internet import reactor
111
112    When the app is ready to start, call ``IOLoop.current().start()``
113    instead of ``reactor.run()``.
114
115    It is also possible to create a non-global reactor by calling
116    ``tornado.platform.twisted.TornadoReactor(io_loop)``.  However, if
117    the `.IOLoop` and reactor are to be short-lived (such as those used in
118    unit tests), additional cleanup may be required.  Specifically, it is
119    recommended to call::
120
121        reactor.fireSystemEvent('shutdown')
122        reactor.disconnectAll()
123
124    before closing the `.IOLoop`.
125
126    .. versionchanged:: 4.1
127       The ``io_loop`` argument is deprecated.
128    """
129    def __init__(self, io_loop=None):
130        if not io_loop:
131            io_loop = salt.ext.tornado.ioloop.IOLoop.current()
132        self._io_loop = io_loop
133        self._readers = {}  # map of reader objects to fd
134        self._writers = {}  # map of writer objects to fd
135        self._fds = {}  # a map of fd to a (reader, writer) tuple
136        self._delayedCalls = {}
137        PosixReactorBase.__init__(self)
138        self.addSystemEventTrigger('during', 'shutdown', self.crash)
139
140        # IOLoop.start() bypasses some of the reactor initialization.
141        # Fire off the necessary events if they weren't already triggered
142        # by reactor.run().
143        def start_if_necessary():
144            if not self._started:
145                self.fireSystemEvent('startup')
146        self._io_loop.add_callback(start_if_necessary)
147
148    # IReactorTime
149    def seconds(self):
150        return self._io_loop.time()
151
152    def callLater(self, seconds, f, *args, **kw):
153        dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
154        self._delayedCalls[dc] = True
155        return dc
156
157    def getDelayedCalls(self):
158        return [x for x in self._delayedCalls if x._active]
159
160    def _removeDelayedCall(self, dc):
161        if dc in self._delayedCalls:
162            del self._delayedCalls[dc]
163
164    # IReactorThreads
165    def callFromThread(self, f, *args, **kw):
166        assert callable(f), "%s is not callable" % f
167        with NullContext():
168            # This NullContext is mainly for an edge case when running
169            # TwistedIOLoop on top of a TornadoReactor.
170            # TwistedIOLoop.add_callback uses reactor.callFromThread and
171            # should not pick up additional StackContexts along the way.
172            self._io_loop.add_callback(f, *args, **kw)
173
174    # We don't need the waker code from the super class, Tornado uses
175    # its own waker.
176    def installWaker(self):
177        pass
178
179    def wakeUp(self):
180        pass
181
182    # IReactorFDSet
183    def _invoke_callback(self, fd, events):
184        if fd not in self._fds:
185            return
186        (reader, writer) = self._fds[fd]
187        if reader:
188            err = None
189            if reader.fileno() == -1:
190                err = error.ConnectionLost()
191            elif events & IOLoop.READ:
192                err = log.callWithLogger(reader, reader.doRead)
193            if err is None and events & IOLoop.ERROR:
194                err = error.ConnectionLost()
195            if err is not None:
196                self.removeReader(reader)
197                reader.readConnectionLost(failure.Failure(err))
198        if writer:
199            err = None
200            if writer.fileno() == -1:
201                err = error.ConnectionLost()
202            elif events & IOLoop.WRITE:
203                err = log.callWithLogger(writer, writer.doWrite)
204            if err is None and events & IOLoop.ERROR:
205                err = error.ConnectionLost()
206            if err is not None:
207                self.removeWriter(writer)
208                writer.writeConnectionLost(failure.Failure(err))
209
210    def addReader(self, reader):
211        if reader in self._readers:
212            # Don't add the reader if it's already there
213            return
214        fd = reader.fileno()
215        self._readers[reader] = fd
216        if fd in self._fds:
217            (_, writer) = self._fds[fd]
218            self._fds[fd] = (reader, writer)
219            if writer:
220                # We already registered this fd for write events,
221                # update it for read events as well.
222                self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
223        else:
224            with NullContext():
225                self._fds[fd] = (reader, None)
226                self._io_loop.add_handler(fd, self._invoke_callback,
227                                          IOLoop.READ)
228
229    def addWriter(self, writer):
230        if writer in self._writers:
231            return
232        fd = writer.fileno()
233        self._writers[writer] = fd
234        if fd in self._fds:
235            (reader, _) = self._fds[fd]
236            self._fds[fd] = (reader, writer)
237            if reader:
238                # We already registered this fd for read events,
239                # update it for write events as well.
240                self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
241        else:
242            with NullContext():
243                self._fds[fd] = (None, writer)
244                self._io_loop.add_handler(fd, self._invoke_callback,
245                                          IOLoop.WRITE)
246
247    def removeReader(self, reader):
248        if reader in self._readers:
249            fd = self._readers.pop(reader)
250            (_, writer) = self._fds[fd]
251            if writer:
252                # We have a writer so we need to update the IOLoop for
253                # write events only.
254                self._fds[fd] = (None, writer)
255                self._io_loop.update_handler(fd, IOLoop.WRITE)
256            else:
257                # Since we have no writer registered, we remove the
258                # entry from _fds and unregister the handler from the
259                # IOLoop
260                del self._fds[fd]
261                self._io_loop.remove_handler(fd)
262
263    def removeWriter(self, writer):
264        if writer in self._writers:
265            fd = self._writers.pop(writer)
266            (reader, _) = self._fds[fd]
267            if reader:
268                # We have a reader so we need to update the IOLoop for
269                # read events only.
270                self._fds[fd] = (reader, None)
271                self._io_loop.update_handler(fd, IOLoop.READ)
272            else:
273                # Since we have no reader registered, we remove the
274                # entry from the _fds and unregister the handler from
275                # the IOLoop.
276                del self._fds[fd]
277                self._io_loop.remove_handler(fd)
278
279    def removeAll(self):
280        return self._removeAll(self._readers, self._writers)
281
282    def getReaders(self):
283        return self._readers.keys()
284
285    def getWriters(self):
286        return self._writers.keys()
287
288    # The following functions are mainly used in twisted-style test cases;
289    # it is expected that most users of the TornadoReactor will call
290    # IOLoop.start() instead of Reactor.run().
291    def stop(self):
292        PosixReactorBase.stop(self)
293        fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown")
294        self._io_loop.add_callback(fire_shutdown)
295
296    def crash(self):
297        PosixReactorBase.crash(self)
298        self._io_loop.stop()
299
300    def doIteration(self, delay):
301        raise NotImplementedError("doIteration")
302
303    def mainLoop(self):
304        # Since this class is intended to be used in applications
305        # where the top-level event loop is ``io_loop.start()`` rather
306        # than ``reactor.run()``, it is implemented a little
307        # differently than other Twisted reactors. We override
308        # ``mainLoop`` instead of ``doIteration`` and must implement
309        # timed call functionality on top of `.IOLoop.add_timeout`
310        # rather than using the implementation in
311        # ``PosixReactorBase``.
312        self._io_loop.start()
313
314
315class _TestReactor(TornadoReactor):
316    """Subclass of TornadoReactor for use in unittests.
317
318    This can't go in the test.py file because of import-order dependencies
319    with the Twisted reactor test builder.
320    """
321    def __init__(self):
322        # always use a new ioloop
323        super(_TestReactor, self).__init__(IOLoop())
324
325    def listenTCP(self, port, factory, backlog=50, interface=''):
326        # default to localhost to avoid firewall prompts on the mac
327        if not interface:
328            interface = '127.0.0.1'
329        return super(_TestReactor, self).listenTCP(
330            port, factory, backlog=backlog, interface=interface)
331
332    def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
333        if not interface:
334            interface = '127.0.0.1'
335        return super(_TestReactor, self).listenUDP(
336            port, protocol, interface=interface, maxPacketSize=maxPacketSize)
337
338
339def install(io_loop=None):
340    """Install this package as the default Twisted reactor.
341
342    ``install()`` must be called very early in the startup process,
343    before most other twisted-related imports. Conversely, because it
344    initializes the `.IOLoop`, it cannot be called before
345    `.fork_processes` or multi-process `~.TCPServer.start`. These
346    conflicting requirements make it difficult to use `.TornadoReactor`
347    in multi-process mode, and an external process manager such as
348    ``supervisord`` is recommended instead.
349
350    .. versionchanged:: 4.1
351       The ``io_loop`` argument is deprecated.
352
353    """
354    if not io_loop:
355        io_loop = salt.ext.tornado.ioloop.IOLoop.current()
356    reactor = TornadoReactor(io_loop)
357    from twisted.internet.main import installReactor  # type: ignore
358    installReactor(reactor)
359    return reactor
360
361
362@implementer(IReadDescriptor, IWriteDescriptor)
363class _FD(object):
364    def __init__(self, fd, fileobj, handler):
365        self.fd = fd
366        self.fileobj = fileobj
367        self.handler = handler
368        self.reading = False
369        self.writing = False
370        self.lost = False
371
372    def fileno(self):
373        return self.fd
374
375    def doRead(self):
376        if not self.lost:
377            self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.READ)
378
379    def doWrite(self):
380        if not self.lost:
381            self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.WRITE)
382
383    def connectionLost(self, reason):
384        if not self.lost:
385            self.handler(self.fileobj, salt.ext.tornado.ioloop.IOLoop.ERROR)
386            self.lost = True
387
388    def logPrefix(self):
389        return ''
390
391
392class TwistedIOLoop(salt.ext.tornado.ioloop.IOLoop):
393    """IOLoop implementation that runs on Twisted.
394
395    `TwistedIOLoop` implements the Tornado IOLoop interface on top of
396    the Twisted reactor. Recommended usage::
397
398        from salt.ext.tornado.platform.twisted import TwistedIOLoop
399        from twisted.internet import reactor
400        TwistedIOLoop().install()
401        # Set up your tornado application as usual using `IOLoop.instance`
402        reactor.run()
403
404    Uses the global Twisted reactor by default.  To create multiple
405    ``TwistedIOLoops`` in the same process, you must pass a unique reactor
406    when constructing each one.
407
408    Not compatible with `tornado.process.Subprocess.set_exit_callback`
409    because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict
410    with each other.
411
412    See also :meth:`tornado.ioloop.IOLoop.install` for general notes on
413    installing alternative IOLoops.
414    """
415    def initialize(self, reactor=None, **kwargs):
416        super(TwistedIOLoop, self).initialize(**kwargs)
417        if reactor is None:
418            import twisted.internet.reactor  # type: ignore
419            reactor = twisted.internet.reactor
420        self.reactor = reactor
421        self.fds = {}
422
423    def close(self, all_fds=False):
424        fds = self.fds
425        self.reactor.removeAll()
426        for c in self.reactor.getDelayedCalls():
427            c.cancel()
428        if all_fds:
429            for fd in fds.values():
430                self.close_fd(fd.fileobj)
431
432    def add_handler(self, fd, handler, events):
433        if fd in self.fds:
434            raise ValueError('fd %s added twice' % fd)
435        fd, fileobj = self.split_fd(fd)
436        self.fds[fd] = _FD(fd, fileobj, wrap(handler))
437        if events & salt.ext.tornado.ioloop.IOLoop.READ:
438            self.fds[fd].reading = True
439            self.reactor.addReader(self.fds[fd])
440        if events & salt.ext.tornado.ioloop.IOLoop.WRITE:
441            self.fds[fd].writing = True
442            self.reactor.addWriter(self.fds[fd])
443
444    def update_handler(self, fd, events):
445        fd, fileobj = self.split_fd(fd)
446        if events & salt.ext.tornado.ioloop.IOLoop.READ:
447            if not self.fds[fd].reading:
448                self.fds[fd].reading = True
449                self.reactor.addReader(self.fds[fd])
450        else:
451            if self.fds[fd].reading:
452                self.fds[fd].reading = False
453                self.reactor.removeReader(self.fds[fd])
454        if events & salt.ext.tornado.ioloop.IOLoop.WRITE:
455            if not self.fds[fd].writing:
456                self.fds[fd].writing = True
457                self.reactor.addWriter(self.fds[fd])
458        else:
459            if self.fds[fd].writing:
460                self.fds[fd].writing = False
461                self.reactor.removeWriter(self.fds[fd])
462
463    def remove_handler(self, fd):
464        fd, fileobj = self.split_fd(fd)
465        if fd not in self.fds:
466            return
467        self.fds[fd].lost = True
468        if self.fds[fd].reading:
469            self.reactor.removeReader(self.fds[fd])
470        if self.fds[fd].writing:
471            self.reactor.removeWriter(self.fds[fd])
472        del self.fds[fd]
473
474    def start(self):
475        old_current = IOLoop.current(instance=False)
476        try:
477            self._setup_logging()
478            self.make_current()
479            self.reactor.run()
480        finally:
481            if old_current is None:
482                IOLoop.clear_current()
483            else:
484                old_current.make_current()
485
486    def stop(self):
487        self.reactor.crash()
488
489    def add_timeout(self, deadline, callback, *args, **kwargs):
490        # This method could be simplified (since tornado 4.0) by
491        # overriding call_at instead of add_timeout, but we leave it
492        # for now as a test of backwards-compatibility.
493        if isinstance(deadline, numbers.Real):
494            delay = max(deadline - self.time(), 0)
495        elif isinstance(deadline, datetime.timedelta):
496            delay = timedelta_to_seconds(deadline)
497        else:
498            raise TypeError("Unsupported deadline %r")
499        return self.reactor.callLater(
500            delay, self._run_callback,
501            functools.partial(wrap(callback), *args, **kwargs))
502
503    def remove_timeout(self, timeout):
504        if timeout.active():
505            timeout.cancel()
506
507    def add_callback(self, callback, *args, **kwargs):
508        self.reactor.callFromThread(
509            self._run_callback,
510            functools.partial(wrap(callback), *args, **kwargs))
511
512    def add_callback_from_signal(self, callback, *args, **kwargs):
513        self.add_callback(callback, *args, **kwargs)
514
515
516class TwistedResolver(Resolver):
517    """Twisted-based asynchronous resolver.
518
519    This is a non-blocking and non-threaded resolver.  It is
520    recommended only when threads cannot be used, since it has
521    limitations compared to the standard ``getaddrinfo``-based
522    `~tornado.netutil.Resolver` and
523    `~tornado.netutil.ThreadedResolver`.  Specifically, it returns at
524    most one result, and arguments other than ``host`` and ``family``
525    are ignored.  It may fail to resolve when ``family`` is not
526    ``socket.AF_UNSPEC``.
527
528    Requires Twisted 12.1 or newer.
529
530    .. versionchanged:: 4.1
531       The ``io_loop`` argument is deprecated.
532    """
533    def initialize(self, io_loop=None):
534        self.io_loop = io_loop or IOLoop.current()
535        # partial copy of twisted.names.client.createResolver, which doesn't
536        # allow for a reactor to be passed in.
537        self.reactor = salt.ext.tornado.platform.twisted.TornadoReactor(io_loop)
538
539        host_resolver = twisted.names.hosts.Resolver('/etc/hosts')
540        cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor)
541        real_resolver = twisted.names.client.Resolver('/etc/resolv.conf',
542                                                      reactor=self.reactor)
543        self.resolver = twisted.names.resolve.ResolverChain(
544            [host_resolver, cache_resolver, real_resolver])
545
546    @gen.coroutine
547    def resolve(self, host, port, family=0):
548        # getHostByName doesn't accept IP addresses, so if the input
549        # looks like an IP address just return it immediately.
550        if twisted.internet.abstract.isIPAddress(host):
551            resolved = host
552            resolved_family = socket.AF_INET
553        elif twisted.internet.abstract.isIPv6Address(host):
554            resolved = host
555            resolved_family = socket.AF_INET6
556        else:
557            deferred = self.resolver.getHostByName(utf8(host))
558            resolved = yield gen.Task(deferred.addBoth)
559            if isinstance(resolved, failure.Failure):
560                try:
561                    resolved.raiseException()
562                except twisted.names.error.DomainError as e:
563                    raise IOError(e)
564            elif twisted.internet.abstract.isIPAddress(resolved):
565                resolved_family = socket.AF_INET
566            elif twisted.internet.abstract.isIPv6Address(resolved):
567                resolved_family = socket.AF_INET6
568            else:
569                resolved_family = socket.AF_UNSPEC
570        if family != socket.AF_UNSPEC and family != resolved_family:
571            raise Exception('Requested socket family %d but got %d' %
572                            (family, resolved_family))
573        result = [
574            (resolved_family, (resolved, port)),
575        ]
576        raise gen.Return(result)
577
578
579if hasattr(gen.convert_yielded, 'register'):
580    @gen.convert_yielded.register(Deferred)  # type: ignore
581    def _(d):
582        f = Future()
583
584        def errback(failure):
585            try:
586                failure.raiseException()
587                # Should never happen, but just in case
588                raise Exception("errback called without error")
589            except:
590                f.set_exc_info(sys.exc_info())
591        d.addCallbacks(f.set_result, errback)
592        return f
593