1# Majority of code shamelessly stolen from
2# http://www.v13.gr/blog/?p=303
3"""
4Authenticate via a PKI certificate.
5
6.. note::
7
8    This module is Experimental and should be used with caution
9
10Provides an authenticate function that will allow the caller to authenticate
11a user via their public cert against a pre-defined Certificate Authority.
12
13TODO: Add a 'ca_dir' option to configure a directory of CA files, a la Apache.
14
15:depends:    - pyOpenSSL module
16"""
17import logging
18
19import salt.utils.files
20
21# pylint: disable=import-error
22try:
23    try:
24        from M2Crypto import X509
25
26        HAS_M2 = True
27    except ImportError:
28        HAS_M2 = False
29        try:
30            from Cryptodome.Util import asn1
31        except ImportError:
32            from Crypto.Util import asn1  # nosec
33        import OpenSSL
34    HAS_DEPS = True
35except ImportError:
36    HAS_DEPS = False
37# pylint: enable=import-error
38
39
40log = logging.getLogger(__name__)
41
42
43def __virtual__():
44    """
45    Requires newer pycrypto and pyOpenSSL
46    """
47    if HAS_DEPS:
48        return True
49    return False
50
51
52def auth(username, password, **kwargs):
53    """
54    Returns True if the given user cert (password is the cert contents)
55    was issued by the CA and if cert's Common Name is equal to username.
56
57    Returns False otherwise.
58
59    ``username``: we need it to run the auth function from CLI/API;
60                  it should be in master config auth/acl
61    ``password``: contents of user certificate (pem-encoded user public key);
62                  why "password"? For CLI, it's the only available name
63
64    Configure the CA cert in the master config file:
65
66    .. code-block:: yaml
67
68        external_auth:
69          pki:
70            ca_file: /etc/pki/tls/ca_certs/trusted-ca.crt
71            your_user:
72              - .*
73    """
74    pem = password
75    cacert_file = __salt__["config.get"]("external_auth:pki:ca_file")
76
77    log.debug("Attempting to authenticate via pki.")
78    log.debug("Using CA file: %s", cacert_file)
79    log.debug("Certificate contents: %s", pem)
80
81    if HAS_M2:
82        cert = X509.load_cert_string(pem, X509.FORMAT_PEM)
83        cacert = X509.load_cert(cacert_file, X509.FORMAT_PEM)
84        if cert.verify(cacert.get_pubkey()):
85            log.info("Successfully authenticated certificate: %s", pem)
86            return True
87        else:
88            log.info("Failed to authenticate certificate: %s", pem)
89            return False
90
91    c = OpenSSL.crypto
92    cert = c.load_certificate(c.FILETYPE_PEM, pem)
93
94    with salt.utils.files.fopen(cacert_file) as f:
95        cacert = c.load_certificate(c.FILETYPE_PEM, f.read())
96
97    # Get the signing algorithm
98    algo = cert.get_signature_algorithm()
99
100    # Get the ASN1 format of the certificate
101    cert_asn1 = c.dump_certificate(c.FILETYPE_ASN1, cert)
102
103    # Decode the certificate
104    der = asn1.DerSequence()
105    der.decode(cert_asn1)
106
107    # The certificate has three parts:
108    # - certificate
109    # - signature algorithm
110    # - signature
111    # http://usefulfor.com/nothing/2009/06/10/x509-certificate-basics/
112    der_cert = der[0]
113    # der_algo = der[1]
114    der_sig = der[2]
115
116    # The signature is a BIT STRING (Type 3)
117    # Decode that as well
118    der_sig_in = asn1.DerObject()
119    der_sig_in.decode(der_sig)
120
121    # Get the payload
122    sig0 = der_sig_in.payload
123
124    # Do the following to see a validation error for tests
125    # der_cert=der_cert[:20]+'1'+der_cert[21:]
126
127    # First byte is the number of unused bits. This should be 0
128    # http://msdn.microsoft.com/en-us/library/windows/desktop/bb540792(v=vs.85).aspx
129    if sig0[0] != "\x00":
130        raise Exception("Number of unused bits is strange")
131    # Now get the signature itself
132    sig = sig0[1:]
133
134    # And verify the certificate
135    try:
136        c.verify(cacert, sig, der_cert, algo)
137        assert (
138            dict(cert.get_subject().get_components())["CN"] == username
139        ), "Certificate's CN should match the username"
140        log.info("Successfully authenticated certificate: %s", pem)
141        return True
142    except (OpenSSL.crypto.Error, AssertionError):
143        log.info("Failed to authenticate certificate: %s", pem)
144    return False
145