1"""ndg_httpsclient - module containing SSL peer verification class. 2""" 3__author__ = "P J Kershaw (STFC)" 4__date__ = "09/12/11" 5__copyright__ = "(C) 2012 Science and Technology Facilities Council" 6__license__ = "BSD - see LICENSE file in top-level directory" 7__contact__ = "Philip.Kershaw@stfc.ac.uk" 8__revision__ = '$Id$' 9import re 10import logging 11log = logging.getLogger(__name__) 12 13try: 14 from ndg.httpsclient.subj_alt_name import SubjectAltName 15 from pyasn1.codec.der import decoder as der_decoder 16 SUBJ_ALT_NAME_SUPPORT = True 17 18except ImportError as e: 19 SUBJ_ALT_NAME_SUPPORT = False 20 SUBJ_ALT_NAME_SUPPORT_MSG = ( 21 'SubjectAltName support is disabled - check pyasn1 package ' 22 'installation to enable' 23 ) 24 import warnings 25 warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG) 26 27 28class ServerSSLCertVerification(object): 29 """Check server identity. If hostname doesn't match, allow match of 30 host's Distinguished Name against server DN setting""" 31 DN_LUT = { 32 'commonName': 'CN', 33 'organisationalUnitName': 'OU', 34 'organisation': 'O', 35 'countryName': 'C', 36 'emailAddress': 'EMAILADDRESS', 37 'localityName': 'L', 38 'stateOrProvinceName': 'ST', 39 'streetAddress': 'STREET', 40 'domainComponent': 'DC', 41 'userid': 'UID' 42 } 43 SUBJ_ALT_NAME_EXT_NAME = b'subjectAltName' 44 PARSER_RE_STR = '/(%s)=' % '|'.join(list(DN_LUT.keys()) + \ 45 list(DN_LUT.values())) 46 PARSER_RE = re.compile(PARSER_RE_STR) 47 48 __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match') 49 50 def __init__(self, certDN=None, hostname=None, subj_alt_name_match=True): 51 """Override parent class __init__ to enable setting of certDN 52 setting 53 54 @type certDN: string 55 @param certDN: Set the expected Distinguished Name of the 56 server to avoid errors matching hostnames. This is useful 57 where the hostname is not fully qualified 58 @type hostname: string 59 @param hostname: hostname to match against peer certificate 60 subjectAltNames or subject common name 61 @type subj_alt_name_match: bool 62 @param subj_alt_name_match: flag to enable/disable matching of hostname 63 against peer certificate subjectAltNames. Nb. A setting of True will 64 be ignored if the pyasn1 package is not installed 65 """ 66 self.__certDN = None 67 self.__hostname = None 68 69 if certDN is not None: 70 self.certDN = certDN 71 72 if hostname is not None: 73 self.hostname = hostname 74 75 if subj_alt_name_match: 76 if not SUBJ_ALT_NAME_SUPPORT: 77 log.warning('Overriding "subj_alt_name_match" keyword setting: ' 78 'peer verification with subjectAltNames is disabled') 79 self.__subj_alt_name_match = False 80 else: 81 self.__subj_alt_name_match = True 82 else: 83 log.debug('Disabling peer verification with subject ' 84 'subjectAltNames!') 85 self.__subj_alt_name_match = False 86 87 def __call__(self, connection, peerCert, errorStatus, errorDepth, 88 preverifyOK): 89 """Verify server certificate 90 91 @type connection: OpenSSL.SSL.Connection 92 @param connection: SSL connection object 93 @type peerCert: basestring 94 @param peerCert: server host certificate as OpenSSL.crypto.X509 95 instance 96 @type errorStatus: int 97 @param errorStatus: error status passed from caller. This is the value 98 returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up 99 x509_vfy.h in the OpenSSL source to get the meanings of the different 100 codes. PyOpenSSL doesn't help you! 101 @type errorDepth: int 102 @param errorDepth: a non-negative integer representing where in the 103 certificate chain the error occurred. If it is zero it occured in the 104 end entity certificate, one if it is the certificate which signed the 105 end entity certificate and so on. 106 107 @type preverifyOK: int 108 @param preverifyOK: the error status - 0 = Error, 1 = OK of the current 109 SSL context irrespective of any verification checks done here. If this 110 function yields an OK status, it should enforce the preverifyOK value 111 so that any error set upstream overrides and is honoured. 112 @rtype: int 113 @return: status code - 0/False = Error, 1/True = OK 114 """ 115 if peerCert.has_expired(): 116 # Any expired certificate in the chain should result in an error 117 log.error('Certificate %r in peer certificate chain has expired', 118 peerCert.get_subject()) 119 120 return False 121 122 elif errorDepth == 0: 123 # Only interested in DN of last certificate in the chain - this must 124 # match the expected Server DN setting 125 peerCertSubj = peerCert.get_subject() 126 peerCertDN = peerCertSubj.get_components() 127 peerCertDN.sort() 128 129 if self.certDN is None: 130 # Check hostname against peer certificate CN field instead: 131 if self.hostname is None: 132 log.error('No "hostname" or "certDN" set to check peer ' 133 'certificate against') 134 return False 135 136 # Check for subject alternative names 137 if self.__subj_alt_name_match: 138 dns_names = self._get_subj_alt_name(peerCert) 139 if self.hostname in dns_names: 140 return preverifyOK 141 142 # If no subjectAltNames, default to check of subject Common Name 143 if peerCertSubj.commonName == self.hostname: 144 return preverifyOK 145 else: 146 log.error('Peer certificate CN %r doesn\'t match the ' 147 'expected CN %r', peerCertSubj.commonName, 148 self.hostname) 149 return False 150 else: 151 if peerCertDN == self.certDN: 152 return preverifyOK 153 else: 154 log.error('Peer certificate DN %r doesn\'t match the ' 155 'expected DN %r', peerCertDN, self.certDN) 156 return False 157 else: 158 return preverifyOK 159 160 def get_verify_server_cert_func(self): 161 def verify_server_cert(connection, peerCert, errorStatus, errorDepth, 162 preverifyOK): 163 return self.__call__(connection, peerCert, errorStatus, 164 errorDepth, preverifyOK) 165 166 return verify_server_cert 167 168 @classmethod 169 def _get_subj_alt_name(cls, peer_cert): 170 '''Extract subjectAltName DNS name settings from certificate extensions 171 172 @param peer_cert: peer certificate in SSL connection. subjectAltName 173 settings if any will be extracted from this 174 @type peer_cert: OpenSSL.crypto.X509 175 ''' 176 # Search through extensions 177 dns_name = [] 178 general_names = SubjectAltName() 179 for i in range(peer_cert.get_extension_count()): 180 ext = peer_cert.get_extension(i) 181 ext_name = ext.get_short_name() 182 if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME: 183 # PyOpenSSL returns extension data in ASN.1 encoded form 184 ext_dat = ext.get_data() 185 decoded_dat = der_decoder.decode(ext_dat, 186 asn1Spec=general_names) 187 188 for name in decoded_dat: 189 if isinstance(name, SubjectAltName): 190 for entry in range(len(name)): 191 component = name.getComponentByPosition(entry) 192 dns_name.append(str(component.getComponent())) 193 194 return dns_name 195 196 def _getCertDN(self): 197 return self.__certDN 198 199 def _setCertDN(self, val): 200 if isinstance(val, str): 201 # Allow for quoted DN 202 certDN = val.strip('"') 203 204 dnFields = self.__class__.PARSER_RE.split(certDN) 205 if len(dnFields) < 2: 206 raise TypeError('Error parsing DN string: "%s"' % certDN) 207 208 self.__certDN = list(zip(dnFields[1::2], dnFields[2::2])) 209 self.__certDN.sort() 210 211 elif not isinstance(val, list): 212 for i in val: 213 if not len(i) == 2: 214 raise TypeError('Expecting list of two element DN field, ' 215 'DN field value pairs for "certDN" ' 216 'attribute') 217 self.__certDN = val 218 else: 219 raise TypeError('Expecting list or string type for "certDN" ' 220 'attribute') 221 222 certDN = property(fget=_getCertDN, 223 fset=_setCertDN, 224 doc="Distinguished Name for Server Certificate") 225 226 # Get/Set Property methods 227 def _getHostname(self): 228 return self.__hostname 229 230 def _setHostname(self, val): 231 if not isinstance(val, str): 232 raise TypeError("Expecting string type for hostname " 233 "attribute") 234 self.__hostname = val 235 236 hostname = property(fget=_getHostname, 237 fset=_setHostname, 238 doc="hostname of server") 239