1__author__ = 'haho0032' 2 3import base64 4import datetime 5import dateutil.parser 6import pytz 7import six 8from OpenSSL import crypto 9from os.path import join 10from os import remove 11 12import saml2.cryptography.pki 13 14 15class WrongInput(Exception): 16 pass 17 18 19class CertificateError(Exception): 20 pass 21 22 23class PayloadError(Exception): 24 pass 25 26 27class OpenSSLWrapper(object): 28 def __init__(self): 29 pass 30 31 def create_certificate(self, cert_info, request=False, valid_from=0, 32 valid_to=315360000, sn=1, key_length=1024, 33 hash_alg="sha256", write_to_file=False, cert_dir="", 34 cipher_passphrase=None): 35 """ 36 Can create certificate requests, to be signed later by another 37 certificate with the method 38 create_cert_signed_certificate. If request is True. 39 40 Can also create self signed root certificates if request is False. 41 This is default behaviour. 42 43 :param cert_info: Contains information about the certificate. 44 Is a dictionary that must contain the keys: 45 cn = Common name. This part 46 must match the host being authenticated 47 country_code = Two letter description 48 of the country. 49 state = State 50 city = City 51 organization = Organization, can be a 52 company name. 53 organization_unit = A unit at the 54 organization, can be a department. 55 Example: 56 cert_info_ca = { 57 "cn": "company.com", 58 "country_code": "se", 59 "state": "AC", 60 "city": "Dorotea", 61 "organization": 62 "Company", 63 "organization_unit": 64 "Sales" 65 } 66 :param request: True if this is a request for certificate, 67 that should be signed. 68 False if this is a self signed certificate, 69 root certificate. 70 :param valid_from: When the certificate starts to be valid. 71 Amount of seconds from when the 72 certificate is generated. 73 :param valid_to: How long the certificate will be valid from 74 when it is generated. 75 The value is in seconds. Default is 76 315360000 seconds, a.k.a 10 years. 77 :param sn: Serial number for the certificate. Default 78 is 1. 79 :param key_length: Length of the key to be generated. Defaults 80 to 1024. 81 :param hash_alg: Hash algorithm to use for the key. Default 82 is sha256. 83 :param write_to_file: True if you want to write the certificate 84 to a file. The method will then return 85 a tuple with path to certificate file and 86 path to key file. 87 False if you want to get the result as 88 strings. The method will then return a tuple 89 with the certificate string and the key as 90 string. 91 WILL OVERWRITE ALL EXISTING FILES WITHOUT 92 ASKING! 93 :param cert_dir: Where to save the files if write_to_file is 94 true. 95 :param cipher_passphrase A dictionary with cipher and passphrase. 96 Example:: 97 {"cipher": "blowfish", "passphrase": "qwerty"} 98 99 :return: string representation of certificate, 100 string representation of private key 101 if write_to_file parameter is False otherwise 102 path to certificate file, path to private 103 key file 104 """ 105 cn = cert_info["cn"] 106 107 c_f = None 108 k_f = None 109 110 if write_to_file: 111 cert_file = "%s.crt" % cn 112 key_file = "%s.key" % cn 113 try: 114 remove(cert_file) 115 except: 116 pass 117 try: 118 remove(key_file) 119 except: 120 pass 121 c_f = join(cert_dir, cert_file) 122 k_f = join(cert_dir, key_file) 123 124 125 # create a key pair 126 k = crypto.PKey() 127 k.generate_key(crypto.TYPE_RSA, key_length) 128 129 # create a self-signed cert 130 cert = crypto.X509() 131 132 if request: 133 cert = crypto.X509Req() 134 135 if (len(cert_info["country_code"]) != 2): 136 raise WrongInput("Country code must be two letters!") 137 cert.get_subject().C = cert_info["country_code"] 138 cert.get_subject().ST = cert_info["state"] 139 cert.get_subject().L = cert_info["city"] 140 cert.get_subject().O = cert_info["organization"] 141 cert.get_subject().OU = cert_info["organization_unit"] 142 cert.get_subject().CN = cn 143 if not request: 144 cert.set_serial_number(sn) 145 cert.gmtime_adj_notBefore(valid_from) #Valid before present time 146 cert.gmtime_adj_notAfter(valid_to) #3 650 days 147 cert.set_issuer(cert.get_subject()) 148 cert.set_pubkey(k) 149 cert.sign(k, hash_alg) 150 151 try: 152 if request: 153 tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, 154 cert) 155 else: 156 tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) 157 tmp_key = None 158 if cipher_passphrase is not None: 159 passphrase = cipher_passphrase["passphrase"] 160 if isinstance(cipher_passphrase["passphrase"], 161 six.string_types): 162 passphrase = passphrase.encode('utf-8') 163 tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, 164 cipher_passphrase["cipher"], 165 passphrase) 166 else: 167 tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k) 168 if write_to_file: 169 with open(c_f, 'wt') as fc: 170 fc.write(tmp_cert.decode('utf-8')) 171 with open(k_f, 'wt') as fk: 172 fk.write(tmp_key.decode('utf-8')) 173 return c_f, k_f 174 return tmp_cert, tmp_key 175 except Exception as ex: 176 raise CertificateError("Certificate cannot be generated.", ex) 177 178 def write_str_to_file(self, file, str_data): 179 with open(file, 'wt') as f: 180 f.write(str_data) 181 182 def read_str_from_file(self, file, type="pem"): 183 with open(file, 'rb') as f: 184 str_data = f.read() 185 186 if type == "pem": 187 return str_data 188 189 if type in ["der", "cer", "crt"]: 190 return base64.b64encode(str(str_data)) 191 192 193 def create_cert_signed_certificate(self, sign_cert_str, sign_key_str, 194 request_cert_str, hash_alg="sha256", 195 valid_from=0, valid_to=315360000, sn=1, 196 passphrase=None): 197 198 """ 199 Will sign a certificate request with a give certificate. 200 :param sign_cert_str: This certificate will be used to sign with. 201 Must be a string representation of 202 the certificate. If you only have a file 203 use the method read_str_from_file to 204 get a string representation. 205 :param sign_key_str: This is the key for the ca_cert_str 206 represented as a string. 207 If you only have a file use the method 208 read_str_from_file to get a string 209 representation. 210 :param request_cert_str: This is the prepared certificate to be 211 signed. Must be a string representation of 212 the requested certificate. If you only have 213 a file use the method read_str_from_file 214 to get a string representation. 215 :param hash_alg: Hash algorithm to use for the key. Default 216 is sha256. 217 :param valid_from: When the certificate starts to be valid. 218 Amount of seconds from when the 219 certificate is generated. 220 :param valid_to: How long the certificate will be valid from 221 when it is generated. 222 The value is in seconds. Default is 223 315360000 seconds, a.k.a 10 years. 224 :param sn: Serial number for the certificate. Default 225 is 1. 226 :param passphrase: Password for the private key in sign_key_str. 227 :return: String representation of the signed 228 certificate. 229 """ 230 ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str) 231 ca_key = None 232 if passphrase is not None: 233 ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, 234 passphrase) 235 else: 236 ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str) 237 req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, 238 request_cert_str) 239 240 cert = crypto.X509() 241 cert.set_subject(req_cert.get_subject()) 242 cert.set_serial_number(sn) 243 cert.gmtime_adj_notBefore(valid_from) 244 cert.gmtime_adj_notAfter(valid_to) 245 cert.set_issuer(ca_cert.get_subject()) 246 cert.set_pubkey(req_cert.get_pubkey()) 247 cert.sign(ca_key, hash_alg) 248 249 cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) 250 if isinstance(cert_dump, six.string_types): 251 return cert_dump 252 return cert_dump.decode('utf-8') 253 254 def verify_chain(self, cert_chain_str_list, cert_str): 255 """ 256 257 :param cert_chain_str_list: Must be a list of certificate strings, 258 where the first certificate to be validate 259 is in the beginning and the root certificate is last. 260 :param cert_str: The certificate to be validated. 261 :return: 262 """ 263 for tmp_cert_str in cert_chain_str_list: 264 valid, message = self.verify(tmp_cert_str, cert_str) 265 if not valid: 266 return False, message 267 else: 268 cert_str = tmp_cert_str 269 return (True, 270 "Signed certificate is valid and correctly signed by CA " 271 "certificate.") 272 273 def certificate_not_valid_yet(self, cert): 274 starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore()) 275 now = pytz.UTC.localize(datetime.datetime.utcnow()) 276 if starts_to_be_valid < now: 277 return False 278 return True 279 280 281 def verify(self, signing_cert_str, cert_str): 282 """ 283 Verifies if a certificate is valid and signed by a given certificate. 284 285 :param signing_cert_str: This certificate will be used to verify the 286 signature. Must be a string representation 287 of the certificate. If you only have a file 288 use the method read_str_from_file to 289 get a string representation. 290 :param cert_str: This certificate will be verified if it is 291 correct. Must be a string representation 292 of the certificate. If you only have a file 293 use the method read_str_from_file to 294 get a string representation. 295 :return: Valid, Message 296 Valid = True if the certificate is valid, 297 otherwise false. 298 Message = Why the validation failed. 299 """ 300 try: 301 ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, 302 signing_cert_str) 303 cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) 304 305 if self.certificate_not_valid_yet(ca_cert): 306 return False, "CA certificate is not valid yet." 307 308 if ca_cert.has_expired() == 1: 309 return False, "CA certificate is expired." 310 311 if cert.has_expired() == 1: 312 return False, "The signed certificate is expired." 313 314 if self.certificate_not_valid_yet(cert): 315 return False, "The signed certificate is not valid yet." 316 317 if ca_cert.get_subject().CN == cert.get_subject().CN: 318 return False, ("CN may not be equal for CA certificate and the " 319 "signed certificate.") 320 321 cert_algorithm = cert.get_signature_algorithm() 322 if six.PY3: 323 cert_algorithm = cert_algorithm.decode('ascii') 324 cert_str = cert_str.encode('ascii') 325 326 cert_crypto = saml2.cryptography.pki.load_pem_x509_certificate( 327 cert_str) 328 329 try: 330 crypto.verify(ca_cert, cert_crypto.signature, 331 cert_crypto.tbs_certificate_bytes, 332 cert_algorithm) 333 return True, "Signed certificate is valid and correctly signed by CA certificate." 334 except crypto.Error as e: 335 return False, "Certificate is incorrectly signed." 336 except Exception as e: 337 return False, "Certificate is not valid for an unknown reason. %s" % str(e) 338