1"""ndg_httpsclient - module containing SSL peer verification class.
2"""
3__author__ = "P J Kershaw (STFC)"
4__date__ = "09/12/11"
5__copyright__ = "(C) 2012 Science and Technology Facilities Council"
6__license__ = "BSD - see LICENSE file in top-level directory"
7__contact__ = "Philip.Kershaw@stfc.ac.uk"
8__revision__ = '$Id$'
9import re
10import logging
11log = logging.getLogger(__name__)
12
13try:
14    from ndg.httpsclient.subj_alt_name import SubjectAltName
15    from pyasn1.codec.der import decoder as der_decoder
16    SUBJ_ALT_NAME_SUPPORT = True
17
18except ImportError as e:
19    SUBJ_ALT_NAME_SUPPORT = False
20    SUBJ_ALT_NAME_SUPPORT_MSG = (
21        'SubjectAltName support is disabled - check pyasn1 package '
22        'installation to enable'
23    )
24    import warnings
25    warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG)
26
27
28class ServerSSLCertVerification(object):
29    """Check server identity.  If hostname doesn't match, allow match of
30    host's Distinguished Name against server DN setting"""
31    DN_LUT = {
32        'commonName':               'CN',
33        'organisationalUnitName':   'OU',
34        'organisation':             'O',
35        'countryName':              'C',
36        'emailAddress':             'EMAILADDRESS',
37        'localityName':             'L',
38        'stateOrProvinceName':      'ST',
39        'streetAddress':            'STREET',
40        'domainComponent':          'DC',
41        'userid':                   'UID'
42    }
43    SUBJ_ALT_NAME_EXT_NAME = b'subjectAltName'
44    PARSER_RE_STR = '/(%s)=' % '|'.join(list(DN_LUT.keys()) + \
45                                        list(DN_LUT.values()))
46    PARSER_RE = re.compile(PARSER_RE_STR)
47
48    __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match')
49
50    def __init__(self, certDN=None, hostname=None, subj_alt_name_match=True):
51        """Override parent class __init__ to enable setting of certDN
52        setting
53
54        @type certDN: string
55        @param certDN: Set the expected Distinguished Name of the
56        server to avoid errors matching hostnames.  This is useful
57        where the hostname is not fully qualified
58        @type hostname: string
59        @param hostname: hostname to match against peer certificate
60        subjectAltNames or subject common name
61        @type subj_alt_name_match: bool
62        @param subj_alt_name_match: flag to enable/disable matching of hostname
63        against peer certificate subjectAltNames.  Nb. A setting of True will
64        be ignored if the pyasn1 package is not installed
65        """
66        self.__certDN = None
67        self.__hostname = None
68
69        if certDN is not None:
70            self.certDN = certDN
71
72        if hostname is not None:
73            self.hostname = hostname
74
75        if subj_alt_name_match:
76            if not SUBJ_ALT_NAME_SUPPORT:
77                log.warning('Overriding "subj_alt_name_match" keyword setting: '
78                            'peer verification with subjectAltNames is disabled')
79                self.__subj_alt_name_match = False
80            else:
81                self.__subj_alt_name_match = True
82        else:
83            log.debug('Disabling peer verification with subject '
84                      'subjectAltNames!')
85            self.__subj_alt_name_match = False
86
87    def __call__(self, connection, peerCert, errorStatus, errorDepth,
88                 preverifyOK):
89        """Verify server certificate
90
91        @type connection: OpenSSL.SSL.Connection
92        @param connection: SSL connection object
93        @type peerCert: basestring
94        @param peerCert: server host certificate as OpenSSL.crypto.X509
95        instance
96        @type errorStatus: int
97        @param errorStatus: error status passed from caller.  This is the value
98        returned by the OpenSSL C function X509_STORE_CTX_get_error().  Look-up
99        x509_vfy.h in the OpenSSL source to get the meanings of the different
100        codes.  PyOpenSSL doesn't help you!
101        @type errorDepth: int
102        @param errorDepth: a non-negative integer representing where in the
103        certificate chain the error occurred. If it is zero it occured in the
104        end entity certificate, one if it is the certificate which signed the
105        end entity certificate and so on.
106
107        @type preverifyOK: int
108        @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
109        SSL context irrespective of any verification checks done here.  If this
110        function yields an OK status, it should enforce the preverifyOK value
111        so that any error set upstream overrides and is honoured.
112        @rtype: int
113        @return: status code - 0/False = Error, 1/True = OK
114        """
115        if peerCert.has_expired():
116            # Any expired certificate in the chain should result in an error
117            log.error('Certificate %r in peer certificate chain has expired',
118                      peerCert.get_subject())
119
120            return False
121
122        elif errorDepth == 0:
123            # Only interested in DN of last certificate in the chain - this must
124            # match the expected Server DN setting
125            peerCertSubj = peerCert.get_subject()
126            peerCertDN = peerCertSubj.get_components()
127            peerCertDN.sort()
128
129            if self.certDN is None:
130                # Check hostname against peer certificate CN field instead:
131                if self.hostname is None:
132                    log.error('No "hostname" or "certDN" set to check peer '
133                              'certificate against')
134                    return False
135
136                # Check for subject alternative names
137                if self.__subj_alt_name_match:
138                    dns_names = self._get_subj_alt_name(peerCert)
139                    if self.hostname in dns_names:
140                        return preverifyOK
141
142                # If no subjectAltNames, default to check of subject Common Name
143                if peerCertSubj.commonName == self.hostname:
144                    return preverifyOK
145                else:
146                    log.error('Peer certificate CN %r doesn\'t match the '
147                              'expected CN %r', peerCertSubj.commonName,
148                              self.hostname)
149                    return False
150            else:
151                if peerCertDN == self.certDN:
152                    return preverifyOK
153                else:
154                    log.error('Peer certificate DN %r doesn\'t match the '
155                              'expected DN %r', peerCertDN, self.certDN)
156                    return False
157        else:
158            return preverifyOK
159
160    def get_verify_server_cert_func(self):
161        def verify_server_cert(connection, peerCert, errorStatus, errorDepth,
162                preverifyOK):
163            return self.__call__(connection, peerCert, errorStatus,
164                                 errorDepth, preverifyOK)
165
166        return verify_server_cert
167
168    @classmethod
169    def _get_subj_alt_name(cls, peer_cert):
170        '''Extract subjectAltName DNS name settings from certificate extensions
171
172        @param peer_cert: peer certificate in SSL connection.  subjectAltName
173        settings if any will be extracted from this
174        @type peer_cert: OpenSSL.crypto.X509
175        '''
176        # Search through extensions
177        dns_name = []
178        general_names = SubjectAltName()
179        for i in range(peer_cert.get_extension_count()):
180            ext = peer_cert.get_extension(i)
181            ext_name = ext.get_short_name()
182            if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME:
183                # PyOpenSSL returns extension data in ASN.1 encoded form
184                ext_dat = ext.get_data()
185                decoded_dat = der_decoder.decode(ext_dat,
186                                                 asn1Spec=general_names)
187
188                for name in decoded_dat:
189                    if isinstance(name, SubjectAltName):
190                        for entry in range(len(name)):
191                            component = name.getComponentByPosition(entry)
192                            dns_name.append(str(component.getComponent()))
193
194        return dns_name
195
196    def _getCertDN(self):
197        return self.__certDN
198
199    def _setCertDN(self, val):
200        if isinstance(val, str):
201            # Allow for quoted DN
202            certDN = val.strip('"')
203
204            dnFields = self.__class__.PARSER_RE.split(certDN)
205            if len(dnFields) < 2:
206                raise TypeError('Error parsing DN string: "%s"' % certDN)
207
208            self.__certDN = list(zip(dnFields[1::2], dnFields[2::2]))
209            self.__certDN.sort()
210
211        elif not isinstance(val, list):
212            for i in val:
213                if not len(i) == 2:
214                    raise TypeError('Expecting list of two element DN field, '
215                                    'DN field value pairs for "certDN" '
216                                    'attribute')
217            self.__certDN = val
218        else:
219            raise TypeError('Expecting list or string type for "certDN" '
220                            'attribute')
221
222    certDN = property(fget=_getCertDN,
223                      fset=_setCertDN,
224                      doc="Distinguished Name for Server Certificate")
225
226    # Get/Set Property methods
227    def _getHostname(self):
228        return self.__hostname
229
230    def _setHostname(self, val):
231        if not isinstance(val, str):
232            raise TypeError("Expecting string type for hostname "
233                                 "attribute")
234        self.__hostname = val
235
236    hostname = property(fget=_getHostname,
237                        fset=_setHostname,
238                        doc="hostname of server")
239