1#
2# Copyright 2011 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Miscellaneous network utility code."""
17
18from __future__ import absolute_import, division, print_function
19
20import errno
21import os
22import sys
23import socket
24import stat
25
26from tornado.concurrent import dummy_executor, run_on_executor
27from tornado import gen
28from tornado.ioloop import IOLoop
29from tornado.platform.auto import set_close_exec
30from tornado.util import PY3, Configurable, errno_from_exception
31
32try:
33    import ssl
34except ImportError:
35    # ssl is not available on Google App Engine
36    ssl = None
37
38if PY3:
39    xrange = range
40
41if ssl is not None:
42    # Note that the naming of ssl.Purpose is confusing; the purpose
43    # of a context is to authentiate the opposite side of the connection.
44    _client_ssl_defaults = ssl.create_default_context(
45        ssl.Purpose.SERVER_AUTH)
46    _server_ssl_defaults = ssl.create_default_context(
47        ssl.Purpose.CLIENT_AUTH)
48    if hasattr(ssl, 'OP_NO_COMPRESSION'):
49        # See netutil.ssl_options_to_context
50        _client_ssl_defaults.options |= ssl.OP_NO_COMPRESSION
51        _server_ssl_defaults.options |= ssl.OP_NO_COMPRESSION
52else:
53    # Google App Engine
54    _client_ssl_defaults = dict(cert_reqs=None,
55                                ca_certs=None)
56    _server_ssl_defaults = {}
57
58# ThreadedResolver runs getaddrinfo on a thread. If the hostname is unicode,
59# getaddrinfo attempts to import encodings.idna. If this is done at
60# module-import time, the import lock is already held by the main thread,
61# leading to deadlock. Avoid it by caching the idna encoder on the main
62# thread now.
63u'foo'.encode('idna')
64
65# For undiagnosed reasons, 'latin1' codec may also need to be preloaded.
66u'foo'.encode('latin1')
67
68# These errnos indicate that a non-blocking operation must be retried
69# at a later time.  On most platforms they're the same value, but on
70# some they differ.
71_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
72
73if hasattr(errno, "WSAEWOULDBLOCK"):
74    _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)  # type: ignore
75
76# Default backlog used when calling sock.listen()
77_DEFAULT_BACKLOG = 128
78
79
80def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
81                 backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):
82    """Creates listening sockets bound to the given port and address.
83
84    Returns a list of socket objects (multiple sockets are returned if
85    the given address maps to multiple IP addresses, which is most common
86    for mixed IPv4 and IPv6 use).
87
88    Address may be either an IP address or hostname.  If it's a hostname,
89    the server will listen on all IP addresses associated with the
90    name.  Address may be an empty string or None to listen on all
91    available interfaces.  Family may be set to either `socket.AF_INET`
92    or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
93    both will be used if available.
94
95    The ``backlog`` argument has the same meaning as for
96    `socket.listen() <socket.socket.listen>`.
97
98    ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
99    ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.
100
101    ``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket
102    in the list. If your platform doesn't support this option ValueError will
103    be raised.
104    """
105    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):
106        raise ValueError("the platform doesn't support SO_REUSEPORT")
107
108    sockets = []
109    if address == "":
110        address = None
111    if not socket.has_ipv6 and family == socket.AF_UNSPEC:
112        # Python can be compiled with --disable-ipv6, which causes
113        # operations on AF_INET6 sockets to fail, but does not
114        # automatically exclude those results from getaddrinfo
115        # results.
116        # http://bugs.python.org/issue16208
117        family = socket.AF_INET
118    if flags is None:
119        flags = socket.AI_PASSIVE
120    bound_port = None
121    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
122                                      0, flags)):
123        af, socktype, proto, canonname, sockaddr = res
124        if (sys.platform == 'darwin' and address == 'localhost' and
125                af == socket.AF_INET6 and sockaddr[3] != 0):
126            # Mac OS X includes a link-local address fe80::1%lo0 in the
127            # getaddrinfo results for 'localhost'.  However, the firewall
128            # doesn't understand that this is a local address and will
129            # prompt for access (often repeatedly, due to an apparent
130            # bug in its ability to remember granting access to an
131            # application). Skip these addresses.
132            continue
133        try:
134            sock = socket.socket(af, socktype, proto)
135        except socket.error as e:
136            if errno_from_exception(e) == errno.EAFNOSUPPORT:
137                continue
138            raise
139        set_close_exec(sock.fileno())
140        if os.name != 'nt':
141            try:
142                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
143            except socket.error as e:
144                if errno_from_exception(e) != errno.ENOPROTOOPT:
145                    # Hurd doesn't support SO_REUSEADDR.
146                    raise
147        if reuse_port:
148            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
149        if af == socket.AF_INET6:
150            # On linux, ipv6 sockets accept ipv4 too by default,
151            # but this makes it impossible to bind to both
152            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
153            # separate sockets *must* be used to listen for both ipv4
154            # and ipv6.  For consistency, always disable ipv4 on our
155            # ipv6 sockets and use a separate ipv4 socket when needed.
156            #
157            # Python 2.x on windows doesn't have IPPROTO_IPV6.
158            if hasattr(socket, "IPPROTO_IPV6"):
159                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
160
161        # automatic port allocation with port=None
162        # should bind on the same port on IPv4 and IPv6
163        host, requested_port = sockaddr[:2]
164        if requested_port == 0 and bound_port is not None:
165            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
166
167        sock.setblocking(0)
168        sock.bind(sockaddr)
169        bound_port = sock.getsockname()[1]
170        sock.listen(backlog)
171        sockets.append(sock)
172    return sockets
173
174
175if hasattr(socket, 'AF_UNIX'):
176    def bind_unix_socket(file, mode=0o600, backlog=_DEFAULT_BACKLOG):
177        """Creates a listening unix socket.
178
179        If a socket with the given name already exists, it will be deleted.
180        If any other file with that name exists, an exception will be
181        raised.
182
183        Returns a socket object (not a list of socket objects like
184        `bind_sockets`)
185        """
186        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
187        set_close_exec(sock.fileno())
188        try:
189            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
190        except socket.error as e:
191            if errno_from_exception(e) != errno.ENOPROTOOPT:
192                # Hurd doesn't support SO_REUSEADDR
193                raise
194        sock.setblocking(0)
195        try:
196            st = os.stat(file)
197        except OSError as err:
198            if errno_from_exception(err) != errno.ENOENT:
199                raise
200        else:
201            if stat.S_ISSOCK(st.st_mode):
202                os.remove(file)
203            else:
204                raise ValueError("File %s exists and is not a socket", file)
205        sock.bind(file)
206        os.chmod(file, mode)
207        sock.listen(backlog)
208        return sock
209
210
211def add_accept_handler(sock, callback):
212    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.
213
214    When a connection is accepted, ``callback(connection, address)`` will
215    be run (``connection`` is a socket object, and ``address`` is the
216    address of the other end of the connection).  Note that this signature
217    is different from the ``callback(fd, events)`` signature used for
218    `.IOLoop` handlers.
219
220    A callable is returned which, when called, will remove the `.IOLoop`
221    event handler and stop processing further incoming connections.
222
223    .. versionchanged:: 5.0
224       The ``io_loop`` argument (deprecated since version 4.1) has been removed.
225
226    .. versionchanged:: 5.0
227       A callable is returned (``None`` was returned before).
228    """
229    io_loop = IOLoop.current()
230    removed = [False]
231
232    def accept_handler(fd, events):
233        # More connections may come in while we're handling callbacks;
234        # to prevent starvation of other tasks we must limit the number
235        # of connections we accept at a time.  Ideally we would accept
236        # up to the number of connections that were waiting when we
237        # entered this method, but this information is not available
238        # (and rearranging this method to call accept() as many times
239        # as possible before running any callbacks would have adverse
240        # effects on load balancing in multiprocess configurations).
241        # Instead, we use the (default) listen backlog as a rough
242        # heuristic for the number of connections we can reasonably
243        # accept at once.
244        for i in xrange(_DEFAULT_BACKLOG):
245            if removed[0]:
246                # The socket was probably closed
247                return
248            try:
249                connection, address = sock.accept()
250            except socket.error as e:
251                # _ERRNO_WOULDBLOCK indicate we have accepted every
252                # connection that is available.
253                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
254                    return
255                # ECONNABORTED indicates that there was a connection
256                # but it was closed while still in the accept queue.
257                # (observed on FreeBSD).
258                if errno_from_exception(e) == errno.ECONNABORTED:
259                    continue
260                raise
261            set_close_exec(connection.fileno())
262            callback(connection, address)
263
264    def remove_handler():
265        io_loop.remove_handler(sock)
266        removed[0] = True
267
268    io_loop.add_handler(sock, accept_handler, IOLoop.READ)
269    return remove_handler
270
271
272def is_valid_ip(ip):
273    """Returns true if the given string is a well-formed IP address.
274
275    Supports IPv4 and IPv6.
276    """
277    if not ip or '\x00' in ip:
278        # getaddrinfo resolves empty strings to localhost, and truncates
279        # on zero bytes.
280        return False
281    try:
282        res = socket.getaddrinfo(ip, 0, socket.AF_UNSPEC,
283                                 socket.SOCK_STREAM,
284                                 0, socket.AI_NUMERICHOST)
285        return bool(res)
286    except socket.gaierror as e:
287        if e.args[0] == socket.EAI_NONAME:
288            return False
289        raise
290    return True
291
292
293class Resolver(Configurable):
294    """Configurable asynchronous DNS resolver interface.
295
296    By default, a blocking implementation is used (which simply calls
297    `socket.getaddrinfo`).  An alternative implementation can be
298    chosen with the `Resolver.configure <.Configurable.configure>`
299    class method::
300
301        Resolver.configure('tornado.netutil.ThreadedResolver')
302
303    The implementations of this interface included with Tornado are
304
305    * `tornado.netutil.DefaultExecutorResolver`
306    * `tornado.netutil.BlockingResolver` (deprecated)
307    * `tornado.netutil.ThreadedResolver` (deprecated)
308    * `tornado.netutil.OverrideResolver`
309    * `tornado.platform.twisted.TwistedResolver`
310    * `tornado.platform.caresresolver.CaresResolver`
311
312    .. versionchanged:: 5.0
313       The default implementation has changed from `BlockingResolver` to
314       `DefaultExecutorResolver`.
315    """
316    @classmethod
317    def configurable_base(cls):
318        return Resolver
319
320    @classmethod
321    def configurable_default(cls):
322        return DefaultExecutorResolver
323
324    def resolve(self, host, port, family=socket.AF_UNSPEC, callback=None):
325        """Resolves an address.
326
327        The ``host`` argument is a string which may be a hostname or a
328        literal IP address.
329
330        Returns a `.Future` whose result is a list of (family,
331        address) pairs, where address is a tuple suitable to pass to
332        `socket.connect <socket.socket.connect>` (i.e. a ``(host,
333        port)`` pair for IPv4; additional fields may be present for
334        IPv6). If a ``callback`` is passed, it will be run with the
335        result as an argument when it is complete.
336
337        :raises IOError: if the address cannot be resolved.
338
339        .. versionchanged:: 4.4
340           Standardized all implementations to raise `IOError`.
341
342        .. deprecated:: 5.1
343           The ``callback`` argument is deprecated and will be removed in 6.0.
344           Use the returned awaitable object instead.
345        """
346        raise NotImplementedError()
347
348    def close(self):
349        """Closes the `Resolver`, freeing any resources used.
350
351        .. versionadded:: 3.1
352
353        """
354        pass
355
356
357def _resolve_addr(host, port, family=socket.AF_UNSPEC):
358    # On Solaris, getaddrinfo fails if the given port is not found
359    # in /etc/services and no socket type is given, so we must pass
360    # one here.  The socket type used here doesn't seem to actually
361    # matter (we discard the one we get back in the results),
362    # so the addresses we return should still be usable with SOCK_DGRAM.
363    addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
364    results = []
365    for family, socktype, proto, canonname, address in addrinfo:
366        results.append((family, address))
367    return results
368
369
370class DefaultExecutorResolver(Resolver):
371    """Resolver implementation using `.IOLoop.run_in_executor`.
372
373    .. versionadded:: 5.0
374    """
375    @gen.coroutine
376    def resolve(self, host, port, family=socket.AF_UNSPEC):
377        result = yield IOLoop.current().run_in_executor(
378            None, _resolve_addr, host, port, family)
379        raise gen.Return(result)
380
381
382class ExecutorResolver(Resolver):
383    """Resolver implementation using a `concurrent.futures.Executor`.
384
385    Use this instead of `ThreadedResolver` when you require additional
386    control over the executor being used.
387
388    The executor will be shut down when the resolver is closed unless
389    ``close_resolver=False``; use this if you want to reuse the same
390    executor elsewhere.
391
392    .. versionchanged:: 5.0
393       The ``io_loop`` argument (deprecated since version 4.1) has been removed.
394
395    .. deprecated:: 5.0
396       The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead
397       of this class.
398    """
399    def initialize(self, executor=None, close_executor=True):
400        self.io_loop = IOLoop.current()
401        if executor is not None:
402            self.executor = executor
403            self.close_executor = close_executor
404        else:
405            self.executor = dummy_executor
406            self.close_executor = False
407
408    def close(self):
409        if self.close_executor:
410            self.executor.shutdown()
411        self.executor = None
412
413    @run_on_executor
414    def resolve(self, host, port, family=socket.AF_UNSPEC):
415        return _resolve_addr(host, port, family)
416
417
418class BlockingResolver(ExecutorResolver):
419    """Default `Resolver` implementation, using `socket.getaddrinfo`.
420
421    The `.IOLoop` will be blocked during the resolution, although the
422    callback will not be run until the next `.IOLoop` iteration.
423
424    .. deprecated:: 5.0
425       The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead
426       of this class.
427    """
428    def initialize(self):
429        super(BlockingResolver, self).initialize()
430
431
432class ThreadedResolver(ExecutorResolver):
433    """Multithreaded non-blocking `Resolver` implementation.
434
435    Requires the `concurrent.futures` package to be installed
436    (available in the standard library since Python 3.2,
437    installable with ``pip install futures`` in older versions).
438
439    The thread pool size can be configured with::
440
441        Resolver.configure('tornado.netutil.ThreadedResolver',
442                           num_threads=10)
443
444    .. versionchanged:: 3.1
445       All ``ThreadedResolvers`` share a single thread pool, whose
446       size is set by the first one to be created.
447
448    .. deprecated:: 5.0
449       The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead
450       of this class.
451    """
452    _threadpool = None  # type: ignore
453    _threadpool_pid = None  # type: int
454
455    def initialize(self, num_threads=10):
456        threadpool = ThreadedResolver._create_threadpool(num_threads)
457        super(ThreadedResolver, self).initialize(
458            executor=threadpool, close_executor=False)
459
460    @classmethod
461    def _create_threadpool(cls, num_threads):
462        pid = os.getpid()
463        if cls._threadpool_pid != pid:
464            # Threads cannot survive after a fork, so if our pid isn't what it
465            # was when we created the pool then delete it.
466            cls._threadpool = None
467        if cls._threadpool is None:
468            from concurrent.futures import ThreadPoolExecutor
469            cls._threadpool = ThreadPoolExecutor(num_threads)
470            cls._threadpool_pid = pid
471        return cls._threadpool
472
473
474class OverrideResolver(Resolver):
475    """Wraps a resolver with a mapping of overrides.
476
477    This can be used to make local DNS changes (e.g. for testing)
478    without modifying system-wide settings.
479
480    The mapping can be in three formats::
481
482        {
483            # Hostname to host or ip
484            "example.com": "127.0.1.1",
485
486            # Host+port to host+port
487            ("login.example.com", 443): ("localhost", 1443),
488
489            # Host+port+address family to host+port
490            ("login.example.com", 443, socket.AF_INET6): ("::1", 1443),
491        }
492
493    .. versionchanged:: 5.0
494       Added support for host-port-family triplets.
495    """
496    def initialize(self, resolver, mapping):
497        self.resolver = resolver
498        self.mapping = mapping
499
500    def close(self):
501        self.resolver.close()
502
503    def resolve(self, host, port, family=socket.AF_UNSPEC, *args, **kwargs):
504        if (host, port, family) in self.mapping:
505            host, port = self.mapping[(host, port, family)]
506        elif (host, port) in self.mapping:
507            host, port = self.mapping[(host, port)]
508        elif host in self.mapping:
509            host = self.mapping[host]
510        return self.resolver.resolve(host, port, family, *args, **kwargs)
511
512
513# These are the keyword arguments to ssl.wrap_socket that must be translated
514# to their SSLContext equivalents (the other arguments are still passed
515# to SSLContext.wrap_socket).
516_SSL_CONTEXT_KEYWORDS = frozenset(['ssl_version', 'certfile', 'keyfile',
517                                   'cert_reqs', 'ca_certs', 'ciphers'])
518
519
520def ssl_options_to_context(ssl_options):
521    """Try to convert an ``ssl_options`` dictionary to an
522    `~ssl.SSLContext` object.
523
524    The ``ssl_options`` dictionary contains keywords to be passed to
525    `ssl.wrap_socket`.  In Python 2.7.9+, `ssl.SSLContext` objects can
526    be used instead.  This function converts the dict form to its
527    `~ssl.SSLContext` equivalent, and may be used when a component which
528    accepts both forms needs to upgrade to the `~ssl.SSLContext` version
529    to use features like SNI or NPN.
530    """
531    if isinstance(ssl_options, ssl.SSLContext):
532        return ssl_options
533    assert isinstance(ssl_options, dict)
534    assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
535    # Can't use create_default_context since this interface doesn't
536    # tell us client vs server.
537    context = ssl.SSLContext(
538        ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
539    if 'certfile' in ssl_options:
540        context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
541    if 'cert_reqs' in ssl_options:
542        context.verify_mode = ssl_options['cert_reqs']
543    if 'ca_certs' in ssl_options:
544        context.load_verify_locations(ssl_options['ca_certs'])
545    if 'ciphers' in ssl_options:
546        context.set_ciphers(ssl_options['ciphers'])
547    if hasattr(ssl, 'OP_NO_COMPRESSION'):
548        # Disable TLS compression to avoid CRIME and related attacks.
549        # This constant depends on openssl version 1.0.
550        # TODO: Do we need to do this ourselves or can we trust
551        # the defaults?
552        context.options |= ssl.OP_NO_COMPRESSION
553    return context
554
555
556def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
557    """Returns an ``ssl.SSLSocket`` wrapping the given socket.
558
559    ``ssl_options`` may be either an `ssl.SSLContext` object or a
560    dictionary (as accepted by `ssl_options_to_context`).  Additional
561    keyword arguments are passed to ``wrap_socket`` (either the
562    `~ssl.SSLContext` method or the `ssl` module function as
563    appropriate).
564    """
565    context = ssl_options_to_context(ssl_options)
566    if ssl.HAS_SNI:
567        # In python 3.4, wrap_socket only accepts the server_hostname
568        # argument if HAS_SNI is true.
569        # TODO: add a unittest (python added server-side SNI support in 3.4)
570        # In the meantime it can be manually tested with
571        # python3 -m tornado.httpclient https://sni.velox.ch
572        return context.wrap_socket(socket, server_hostname=server_hostname,
573                                   **kwargs)
574    else:
575        return context.wrap_socket(socket, **kwargs)
576