1#!/usr/bin/env python
2
3"""
4Dummy server used for unit testing.
5"""
6from __future__ import print_function
7
8import logging
9import os
10import sys
11import threading
12import socket
13import warnings
14import ssl
15from datetime import datetime
16
17from urllib3.exceptions import HTTPWarning
18
19from cryptography.hazmat.backends import default_backend
20from cryptography.hazmat.primitives import serialization
21import tornado.httpserver
22import tornado.ioloop
23import tornado.netutil
24import tornado.web
25import trustme
26
27
28log = logging.getLogger(__name__)
29
30CERTS_PATH = os.path.join(os.path.dirname(__file__), "certs")
31DEFAULT_CERTS = {
32    "certfile": os.path.join(CERTS_PATH, "server.crt"),
33    "keyfile": os.path.join(CERTS_PATH, "server.key"),
34    "cert_reqs": ssl.CERT_OPTIONAL,
35    "ca_certs": os.path.join(CERTS_PATH, "cacert.pem"),
36}
37DEFAULT_CA = os.path.join(CERTS_PATH, "cacert.pem")
38DEFAULT_CA_KEY = os.path.join(CERTS_PATH, "cacert.key")
39
40
41def _resolves_to_ipv6(host):
42    """ Returns True if the system resolves host to an IPv6 address by default. """
43    resolves_to_ipv6 = False
44    try:
45        for res in socket.getaddrinfo(host, None, socket.AF_UNSPEC):
46            af, _, _, _, _ = res
47            if af == socket.AF_INET6:
48                resolves_to_ipv6 = True
49    except socket.gaierror:
50        pass
51
52    return resolves_to_ipv6
53
54
55def _has_ipv6(host):
56    """ Returns True if the system can bind an IPv6 address. """
57    sock = None
58    has_ipv6 = False
59
60    if socket.has_ipv6:
61        # has_ipv6 returns true if cPython was compiled with IPv6 support.
62        # It does not tell us if the system has IPv6 support enabled. To
63        # determine that we must bind to an IPv6 address.
64        # https://github.com/urllib3/urllib3/pull/611
65        # https://bugs.python.org/issue658327
66        try:
67            sock = socket.socket(socket.AF_INET6)
68            sock.bind((host, 0))
69            has_ipv6 = _resolves_to_ipv6("localhost")
70        except Exception:
71            pass
72
73    if sock:
74        sock.close()
75    return has_ipv6
76
77
78# Some systems may have IPv6 support but DNS may not be configured
79# properly. We can not count that localhost will resolve to ::1 on all
80# systems. See https://github.com/urllib3/urllib3/pull/611 and
81# https://bugs.python.org/issue18792
82HAS_IPV6_AND_DNS = _has_ipv6("localhost")
83HAS_IPV6 = _has_ipv6("::1")
84
85
86# Different types of servers we have:
87
88
89class NoIPv6Warning(HTTPWarning):
90    "IPv6 is not available"
91    pass
92
93
94class SocketServerThread(threading.Thread):
95    """
96    :param socket_handler: Callable which receives a socket argument for one
97        request.
98    :param ready_event: Event which gets set when the socket handler is
99        ready to receive requests.
100    """
101
102    USE_IPV6 = HAS_IPV6_AND_DNS
103
104    def __init__(self, socket_handler, host="localhost", port=8081, ready_event=None):
105        threading.Thread.__init__(self)
106        self.daemon = True
107
108        self.socket_handler = socket_handler
109        self.host = host
110        self.ready_event = ready_event
111
112    def _start_server(self):
113        if self.USE_IPV6:
114            sock = socket.socket(socket.AF_INET6)
115        else:
116            warnings.warn("No IPv6 support. Falling back to IPv4.", NoIPv6Warning)
117            sock = socket.socket(socket.AF_INET)
118        if sys.platform != "win32":
119            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
120        sock.bind((self.host, 0))
121        self.port = sock.getsockname()[1]
122
123        # Once listen() returns, the server socket is ready
124        sock.listen(1)
125
126        if self.ready_event:
127            self.ready_event.set()
128
129        self.socket_handler(sock)
130        sock.close()
131
132    def run(self):
133        self.server = self._start_server()
134
135
136def run_tornado_app(app, io_loop, certs, scheme, host):
137    assert io_loop == tornado.ioloop.IOLoop.current()
138
139    # We can't use fromtimestamp(0) because of CPython issue 29097, so we'll
140    # just construct the datetime object directly.
141    app.last_req = datetime(1970, 1, 1)
142
143    if scheme == "https":
144        http_server = tornado.httpserver.HTTPServer(app, ssl_options=certs)
145    else:
146        http_server = tornado.httpserver.HTTPServer(app)
147
148    sockets = tornado.netutil.bind_sockets(None, address=host)
149    port = sockets[0].getsockname()[1]
150    http_server.add_sockets(sockets)
151    return http_server, port
152
153
154def run_loop_in_thread(io_loop):
155    t = threading.Thread(target=io_loop.start)
156    t.start()
157    return t
158
159
160def get_unreachable_address():
161    # reserved as per rfc2606
162    return ("something.invalid", 54321)
163
164
165if __name__ == "__main__":
166    # For debugging dummyserver itself - python -m dummyserver.server
167    from .testcase import TestingApp
168
169    host = "127.0.0.1"
170
171    io_loop = tornado.ioloop.IOLoop.current()
172    app = tornado.web.Application([(r".*", TestingApp)])
173    server, port = run_tornado_app(app, io_loop, None, "http", host)
174    server_thread = run_loop_in_thread(io_loop)
175
176    print("Listening on http://{host}:{port}".format(host=host, port=port))
177
178
179def encrypt_key_pem(private_key_pem, password):
180    private_key = serialization.load_pem_private_key(
181        private_key_pem.bytes(), password=None, backend=default_backend()
182    )
183    encrypted_key = private_key.private_bytes(
184        serialization.Encoding.PEM,
185        serialization.PrivateFormat.TraditionalOpenSSL,
186        serialization.BestAvailableEncryption(password),
187    )
188    return trustme.Blob(encrypted_key)
189