1__author__ = 'haho0032'
2
3import base64
4import datetime
5import dateutil.parser
6import pytz
7import six
8from OpenSSL import crypto
9from os.path import join
10from os import remove
11
12import saml2.cryptography.pki
13
14
15class WrongInput(Exception):
16    pass
17
18
19class CertificateError(Exception):
20    pass
21
22
23class PayloadError(Exception):
24    pass
25
26
27class OpenSSLWrapper(object):
28    def __init__(self):
29        pass
30
31    def create_certificate(self, cert_info, request=False, valid_from=0,
32                           valid_to=315360000, sn=1, key_length=1024,
33                           hash_alg="sha256", write_to_file=False, cert_dir="",
34                           cipher_passphrase=None):
35        """
36        Can create certificate requests, to be signed later by another
37        certificate with the method
38        create_cert_signed_certificate. If request is True.
39
40        Can also create self signed root certificates if request is False.
41        This is default behaviour.
42
43        :param cert_info:         Contains information about the certificate.
44                                  Is a dictionary that must contain the keys:
45                                  cn                = Common name. This part
46                                  must match the host being authenticated
47                                  country_code      = Two letter description
48                                  of the country.
49                                  state             = State
50                                  city              = City
51                                  organization      = Organization, can be a
52                                  company name.
53                                  organization_unit = A unit at the
54                                  organization, can be a department.
55                                  Example:
56                                                    cert_info_ca = {
57                                                        "cn": "company.com",
58                                                        "country_code": "se",
59                                                        "state": "AC",
60                                                        "city": "Dorotea",
61                                                        "organization":
62                                                        "Company",
63                                                        "organization_unit":
64                                                        "Sales"
65                                                    }
66        :param request:           True if this is a request for certificate,
67                                  that should be signed.
68                                  False if this is a self signed certificate,
69                                  root certificate.
70        :param valid_from:        When the certificate starts to be valid.
71                                  Amount of seconds from when the
72                                  certificate is generated.
73        :param valid_to:          How long the certificate will be valid from
74                                  when it is generated.
75                                  The value is in seconds. Default is
76                                  315360000 seconds, a.k.a 10 years.
77        :param sn:                Serial number for the certificate. Default
78                                  is 1.
79        :param key_length:        Length of the key to be generated. Defaults
80                                  to 1024.
81        :param hash_alg:          Hash algorithm to use for the key. Default
82                                  is sha256.
83        :param write_to_file:     True if you want to write the certificate
84                                  to a file. The method will then return
85                                  a tuple with path to certificate file and
86                                  path to key file.
87                                  False if you want to get the result as
88                                  strings. The method will then return a tuple
89                                  with the certificate string and the key as
90                                  string.
91                                  WILL OVERWRITE ALL EXISTING FILES WITHOUT
92                                  ASKING!
93        :param cert_dir:          Where to save the files if write_to_file is
94                                  true.
95        :param cipher_passphrase  A dictionary with cipher and passphrase.
96        Example::
97                {"cipher": "blowfish", "passphrase": "qwerty"}
98
99        :return:                  string representation of certificate,
100                                  string representation of private key
101                                  if write_to_file parameter is False otherwise
102                                  path to certificate file, path to private
103                                  key file
104        """
105        cn = cert_info["cn"]
106
107        c_f = None
108        k_f = None
109
110        if write_to_file:
111            cert_file = "%s.crt" % cn
112            key_file = "%s.key" % cn
113            try:
114                remove(cert_file)
115            except:
116                pass
117            try:
118                remove(key_file)
119            except:
120                pass
121            c_f = join(cert_dir, cert_file)
122            k_f = join(cert_dir, key_file)
123
124
125        # create a key pair
126        k = crypto.PKey()
127        k.generate_key(crypto.TYPE_RSA, key_length)
128
129        # create a self-signed cert
130        cert = crypto.X509()
131
132        if request:
133            cert = crypto.X509Req()
134
135        if (len(cert_info["country_code"]) != 2):
136            raise WrongInput("Country code must be two letters!")
137        cert.get_subject().C = cert_info["country_code"]
138        cert.get_subject().ST = cert_info["state"]
139        cert.get_subject().L = cert_info["city"]
140        cert.get_subject().O = cert_info["organization"]
141        cert.get_subject().OU = cert_info["organization_unit"]
142        cert.get_subject().CN = cn
143        if not request:
144            cert.set_serial_number(sn)
145            cert.gmtime_adj_notBefore(valid_from)  #Valid before present time
146            cert.gmtime_adj_notAfter(valid_to)  #3 650 days
147            cert.set_issuer(cert.get_subject())
148        cert.set_pubkey(k)
149        cert.sign(k, hash_alg)
150
151        try:
152            if request:
153                tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM,
154                                                           cert)
155            else:
156                tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
157            tmp_key = None
158            if cipher_passphrase is not None:
159                passphrase = cipher_passphrase["passphrase"]
160                if isinstance(cipher_passphrase["passphrase"],
161                              six.string_types):
162                    passphrase = passphrase.encode('utf-8')
163                tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k,
164                                                 cipher_passphrase["cipher"],
165                                                 passphrase)
166            else:
167                tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
168            if write_to_file:
169                with open(c_f, 'wt') as fc:
170                    fc.write(tmp_cert.decode('utf-8'))
171                with open(k_f, 'wt') as fk:
172                    fk.write(tmp_key.decode('utf-8'))
173                return c_f, k_f
174            return tmp_cert, tmp_key
175        except Exception as ex:
176            raise CertificateError("Certificate cannot be generated.", ex)
177
178    def write_str_to_file(self, file, str_data):
179        with open(file, 'wt') as f:
180            f.write(str_data)
181
182    def read_str_from_file(self, file, type="pem"):
183        with open(file, 'rb') as f:
184            str_data = f.read()
185
186        if type == "pem":
187            return str_data
188
189        if type in ["der", "cer", "crt"]:
190            return base64.b64encode(str(str_data))
191
192
193    def create_cert_signed_certificate(self, sign_cert_str, sign_key_str,
194                                       request_cert_str, hash_alg="sha256",
195                                       valid_from=0, valid_to=315360000, sn=1,
196                                       passphrase=None):
197
198        """
199        Will sign a certificate request with a give certificate.
200        :param sign_cert_str:     This certificate will be used to sign with.
201                                  Must be a string representation of
202                                  the certificate. If you only have a file
203                                  use the method read_str_from_file to
204                                  get a string representation.
205        :param sign_key_str:        This is the key for the ca_cert_str
206                                  represented as a string.
207                                  If you only have a file use the method
208                                  read_str_from_file to get a string
209                                  representation.
210        :param request_cert_str:  This is the prepared certificate to be
211                                  signed. Must be a string representation of
212                                  the requested certificate. If you only have
213                                  a file use the method read_str_from_file
214                                  to get a string representation.
215        :param hash_alg:          Hash algorithm to use for the key. Default
216                                  is sha256.
217        :param valid_from:        When the certificate starts to be valid.
218                                  Amount of seconds from when the
219                                  certificate is generated.
220        :param valid_to:          How long the certificate will be valid from
221                                  when it is generated.
222                                  The value is in seconds. Default is
223                                  315360000 seconds, a.k.a 10 years.
224        :param sn:                Serial number for the certificate. Default
225                                  is 1.
226        :param passphrase:        Password for the private key in sign_key_str.
227        :return:                  String representation of the signed
228                                  certificate.
229        """
230        ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
231        ca_key = None
232        if passphrase is not None:
233            ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str,
234                                            passphrase)
235        else:
236            ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
237        req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM,
238                                                   request_cert_str)
239
240        cert = crypto.X509()
241        cert.set_subject(req_cert.get_subject())
242        cert.set_serial_number(sn)
243        cert.gmtime_adj_notBefore(valid_from)
244        cert.gmtime_adj_notAfter(valid_to)
245        cert.set_issuer(ca_cert.get_subject())
246        cert.set_pubkey(req_cert.get_pubkey())
247        cert.sign(ca_key, hash_alg)
248
249        cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
250        if isinstance(cert_dump, six.string_types):
251            return cert_dump
252        return cert_dump.decode('utf-8')
253
254    def verify_chain(self, cert_chain_str_list, cert_str):
255        """
256
257        :param cert_chain_str_list: Must be a list of certificate strings,
258        where the first certificate to be validate
259        is in the beginning and the root certificate is last.
260        :param cert_str: The certificate to be validated.
261        :return:
262        """
263        for tmp_cert_str in cert_chain_str_list:
264            valid, message = self.verify(tmp_cert_str, cert_str)
265            if not valid:
266                return False, message
267            else:
268                cert_str = tmp_cert_str
269            return (True,
270                    "Signed certificate is valid and correctly signed by CA "
271                    "certificate.")
272
273    def certificate_not_valid_yet(self, cert):
274        starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
275        now = pytz.UTC.localize(datetime.datetime.utcnow())
276        if starts_to_be_valid < now:
277            return False
278        return True
279
280
281    def verify(self, signing_cert_str, cert_str):
282        """
283        Verifies if a certificate is valid and signed by a given certificate.
284
285        :param signing_cert_str: This certificate will be used to verify the
286                                  signature. Must be a string representation
287                                 of the certificate. If you only have a file
288                                 use the method read_str_from_file to
289                                 get a string representation.
290        :param cert_str:         This certificate will be verified if it is
291                                  correct. Must be a string representation
292                                 of the certificate. If you only have a file
293                                 use the method read_str_from_file to
294                                 get a string representation.
295        :return:                 Valid, Message
296                                 Valid = True if the certificate is valid,
297                                 otherwise false.
298                                 Message = Why the validation failed.
299        """
300        try:
301            ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
302                                              signing_cert_str)
303            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
304
305            if self.certificate_not_valid_yet(ca_cert):
306                return False, "CA certificate is not valid yet."
307
308            if ca_cert.has_expired() == 1:
309                return False, "CA certificate is expired."
310
311            if cert.has_expired() == 1:
312                return False, "The signed certificate is expired."
313
314            if self.certificate_not_valid_yet(cert):
315                return False, "The signed certificate is not valid yet."
316
317            if ca_cert.get_subject().CN == cert.get_subject().CN:
318                return False, ("CN may not be equal for CA certificate and the "
319                               "signed certificate.")
320
321            cert_algorithm = cert.get_signature_algorithm()
322            if six.PY3:
323                cert_algorithm = cert_algorithm.decode('ascii')
324                cert_str = cert_str.encode('ascii')
325
326            cert_crypto = saml2.cryptography.pki.load_pem_x509_certificate(
327                    cert_str)
328
329            try:
330                crypto.verify(ca_cert, cert_crypto.signature,
331                              cert_crypto.tbs_certificate_bytes,
332                              cert_algorithm)
333                return True, "Signed certificate is valid and correctly signed by CA certificate."
334            except crypto.Error as e:
335                return False, "Certificate is incorrectly signed."
336        except Exception as e:
337            return False, "Certificate is not valid for an unknown reason. %s" % str(e)
338