1# Copyright 2014-2016 OpenMarket Ltd
2# Copyright 2019 New Vector Ltd
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17
18from service_identity import VerificationError
19from service_identity.pyopenssl import verify_hostname, verify_ip_address
20from zope.interface import implementer
21
22from OpenSSL import SSL, crypto
23from twisted.internet._sslverify import _defaultCurveName
24from twisted.internet.abstract import isIPAddress, isIPv6Address
25from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
26from twisted.internet.ssl import (
27    CertificateOptions,
28    ContextFactory,
29    TLSVersion,
30    platformTrust,
31)
32from twisted.protocols.tls import TLSMemoryBIOProtocol
33from twisted.python.failure import Failure
34from twisted.web.iweb import IPolicyForHTTPS
35
36from synapse.config.homeserver import HomeServerConfig
37
38logger = logging.getLogger(__name__)
39
40
41_TLS_VERSION_MAP = {
42    "1": TLSVersion.TLSv1_0,
43    "1.1": TLSVersion.TLSv1_1,
44    "1.2": TLSVersion.TLSv1_2,
45    "1.3": TLSVersion.TLSv1_3,
46}
47
48
49class ServerContextFactory(ContextFactory):
50    """Factory for PyOpenSSL SSL contexts that are used to handle incoming
51    connections.
52
53    TODO: replace this with an implementation of IOpenSSLServerConnectionCreator,
54    per https://github.com/matrix-org/synapse/issues/1691
55    """
56
57    def __init__(self, config: HomeServerConfig):
58        # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
59        # switch to those (see https://github.com/pyca/cryptography/issues/5379).
60        #
61        # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
62        # or v3, but is a synonym for TLS_METHOD, which allows the client and server
63        # to negotiate an appropriate version of TLS constrained by the version options
64        # set with context.set_options.
65        #
66        self._context = SSL.Context(SSL.SSLv23_METHOD)
67        self.configure_context(self._context, config)
68
69    @staticmethod
70    def configure_context(context: SSL.Context, config: HomeServerConfig) -> None:
71        try:
72            _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
73            context.set_tmp_ecdh(_ecCurve)
74        except Exception:
75            logger.exception("Failed to enable elliptic curve for TLS")
76
77        context.set_options(
78            SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
79        )
80        context.use_certificate_chain_file(config.tls.tls_certificate_file)
81        assert config.tls.tls_private_key is not None
82        context.use_privatekey(config.tls.tls_private_key)
83
84        # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
85        context.set_cipher_list(
86            b"ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
87        )
88
89    def getContext(self) -> SSL.Context:
90        return self._context
91
92
93@implementer(IPolicyForHTTPS)
94class FederationPolicyForHTTPS:
95    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
96    to remote servers for federation.
97
98    Uses one of two OpenSSL context objects for all connections, depending on whether
99    we should do SSL certificate verification.
100
101    get_options decides whether we should do SSL certificate verification and
102    constructs an SSLClientConnectionCreator factory accordingly.
103    """
104
105    def __init__(self, config: HomeServerConfig):
106        self._config = config
107
108        # Check if we're using a custom list of a CA certificates
109        trust_root = config.tls.federation_ca_trust_root
110        if trust_root is None:
111            # Use CA root certs provided by OpenSSL
112            trust_root = platformTrust()
113
114        # "insecurelyLowerMinimumTo" is the argument that will go lower than
115        # Twisted's default, which is why it is marked as "insecure" (since
116        # Twisted's defaults are reasonably secure). But, since Twisted is
117        # moving to TLS 1.2 by default, we want to respect the config option if
118        # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
119        # let us do).
120        minTLS = _TLS_VERSION_MAP[config.tls.federation_client_minimum_tls_version]
121
122        _verify_ssl = CertificateOptions(
123            trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
124        )
125        self._verify_ssl_context = _verify_ssl.getContext()
126        self._verify_ssl_context.set_info_callback(_context_info_cb)
127
128        _no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
129        self._no_verify_ssl_context = _no_verify_ssl.getContext()
130        self._no_verify_ssl_context.set_info_callback(_context_info_cb)
131
132        self._should_verify = self._config.tls.federation_verify_certificates
133
134        self._federation_certificate_verification_whitelist = (
135            self._config.tls.federation_certificate_verification_whitelist
136        )
137
138    def get_options(self, host: bytes) -> IOpenSSLClientConnectionCreator:
139        # IPolicyForHTTPS.get_options takes bytes, but we want to compare
140        # against the str whitelist. The hostnames in the whitelist are already
141        # IDNA-encoded like the hosts will be here.
142        ascii_host = host.decode("ascii")
143
144        # Check if certificate verification has been enabled
145        should_verify = self._should_verify
146
147        # Check if we've disabled certificate verification for this host
148        if self._should_verify:
149            for regex in self._federation_certificate_verification_whitelist:
150                if regex.match(ascii_host):
151                    should_verify = False
152                    break
153
154        ssl_context = (
155            self._verify_ssl_context if should_verify else self._no_verify_ssl_context
156        )
157
158        return SSLClientConnectionCreator(host, ssl_context, should_verify)
159
160    def creatorForNetloc(
161        self, hostname: bytes, port: int
162    ) -> IOpenSSLClientConnectionCreator:
163        """Implements the IPolicyForHTTPS interface so that this can be passed
164        directly to agents.
165        """
166        return self.get_options(hostname)
167
168
169@implementer(IPolicyForHTTPS)
170class RegularPolicyForHTTPS:
171    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
172    to remote servers, for other than federation.
173
174    Always uses the same OpenSSL context object, which uses the default OpenSSL CA
175    trust root.
176    """
177
178    def __init__(self) -> None:
179        trust_root = platformTrust()
180        self._ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
181        self._ssl_context.set_info_callback(_context_info_cb)
182
183    def creatorForNetloc(
184        self, hostname: bytes, port: int
185    ) -> IOpenSSLClientConnectionCreator:
186        return SSLClientConnectionCreator(hostname, self._ssl_context, True)
187
188
189def _context_info_cb(ssl_connection: SSL.Connection, where: int, ret: int) -> None:
190    """The 'information callback' for our openssl context objects.
191
192    Note: Once this is set as the info callback on a Context object, the Context should
193    only be used with the SSLClientConnectionCreator.
194    """
195    # we assume that the app_data on the connection object has been set to
196    # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
197    tls_protocol = ssl_connection.get_app_data()
198    try:
199        # ... we further assume that SSLClientConnectionCreator has set the
200        # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
201        tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
202    except BaseException:  # taken from the twisted implementation
203        logger.exception("Error during info_callback")
204        f = Failure()
205        tls_protocol.failVerification(f)
206
207
208@implementer(IOpenSSLClientConnectionCreator)
209class SSLClientConnectionCreator:
210    """Creates openssl connection objects for client connections.
211
212    Replaces twisted.internet.ssl.ClientTLSOptions
213    """
214
215    def __init__(self, hostname: bytes, ctx: SSL.Context, verify_certs: bool):
216        self._ctx = ctx
217        self._verifier = ConnectionVerifier(hostname, verify_certs)
218
219    def clientConnectionForTLS(
220        self, tls_protocol: TLSMemoryBIOProtocol
221    ) -> SSL.Connection:
222        context = self._ctx
223        connection = SSL.Connection(context, None)
224
225        # as per twisted.internet.ssl.ClientTLSOptions, we set the application
226        # data to our TLSMemoryBIOProtocol...
227        connection.set_app_data(tls_protocol)
228
229        # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
230        # tls_protocol so that the SSL context's info callback has something to
231        # call to do the cert verification.
232        tls_protocol._synapse_tls_verifier = self._verifier  # type: ignore[attr-defined]
233        return connection
234
235
236class ConnectionVerifier:
237    """Set the SNI, and do cert verification
238
239    This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
240    the ssl context's info callback.
241    """
242
243    # This code is based on twisted.internet.ssl.ClientTLSOptions.
244
245    def __init__(self, hostname: bytes, verify_certs: bool):
246        self._verify_certs = verify_certs
247
248        _decoded = hostname.decode("ascii")
249        if isIPAddress(_decoded) or isIPv6Address(_decoded):
250            self._is_ip_address = True
251        else:
252            self._is_ip_address = False
253
254        self._hostnameBytes = hostname
255        self._hostnameASCII = self._hostnameBytes.decode("ascii")
256
257    def verify_context_info_cb(
258        self, ssl_connection: SSL.Connection, where: int
259    ) -> None:
260        if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
261            ssl_connection.set_tlsext_host_name(self._hostnameBytes)
262
263        if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
264            try:
265                if self._is_ip_address:
266                    verify_ip_address(ssl_connection, self._hostnameASCII)
267                else:
268                    verify_hostname(ssl_connection, self._hostnameASCII)
269            except VerificationError:
270                f = Failure()
271                tls_protocol = ssl_connection.get_app_data()
272                tls_protocol.failVerification(f)
273