1#!/usr/local/bin/python3.8
2# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny
3import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
4try:
5    from urllib.request import urlopen, Request # Python 3
6except ImportError:
7    from urllib2 import urlopen, Request # Python 2
8
9DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
10DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
11
12LOGGER = logging.getLogger(__name__)
13LOGGER.addHandler(logging.StreamHandler())
14LOGGER.setLevel(logging.INFO)
15
16def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None):
17    directory, acct_headers, alg, jwk = None, None, None, None # global variables
18
19    # helper functions - base64 encode for jose spec
20    def _b64(b):
21        return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
22
23    # helper function - run external commands
24    def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
25        proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
26        out, err = proc.communicate(cmd_input)
27        if proc.returncode != 0:
28            raise IOError("{0}\n{1}".format(err_msg, err))
29        return out
30
31    # helper function - make request and automatically parse json response
32    def _do_request(url, data=None, err_msg="Error", depth=0):
33        try:
34            resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
35            resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
36        except IOError as e:
37            resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
38            code, headers = getattr(e, "code", None), {}
39        try:
40            resp_data = json.loads(resp_data) # try to parse json results
41        except ValueError:
42            pass # ignore json parsing errors
43        if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce":
44            raise IndexError(resp_data) # allow 100 retrys for bad nonces
45        if code not in [200, 201, 204]:
46            raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
47        return resp_data, code, headers
48
49    # helper function - make signed requests
50    def _send_signed_request(url, payload, err_msg, depth=0):
51        payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8'))
52        new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce']
53        protected = {"url": url, "alg": alg, "nonce": new_nonce}
54        protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']})
55        protected64 = _b64(json.dumps(protected).encode('utf8'))
56        protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8')
57        out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error")
58        data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)})
59        try:
60            return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth)
61        except IndexError: # retry bad nonces (they raise IndexError)
62            return _send_signed_request(url, payload, err_msg, depth=(depth + 1))
63
64    # helper function - poll until complete
65    def _poll_until_not(url, pending_statuses, err_msg):
66        result, t0 = None, time.time()
67        while result is None or result['status'] in pending_statuses:
68            assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
69            time.sleep(0 if result is None else 2)
70            result, _, _ = _send_signed_request(url, None, err_msg)
71        return result
72
73    # parse account key to get public key
74    log.info("Parsing account key...")
75    out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error")
76    pub_pattern = r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
77    pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
78    pub_exp = "{0:x}".format(int(pub_exp))
79    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
80    alg = "RS256"
81    jwk = {
82        "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
83        "kty": "RSA",
84        "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
85    }
86    accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':'))
87    thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
88
89    # find domains
90    log.info("Parsing CSR...")
91    out = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {0}".format(csr))
92    domains = set([])
93    common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8'))
94    if common_name is not None:
95        domains.add(common_name.group(1))
96    subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
97    if subject_alt_names is not None:
98        for san in subject_alt_names.group(1).split(", "):
99            if san.startswith("DNS:"):
100                domains.add(san[4:])
101    log.info("Found domains: {0}".format(", ".join(domains)))
102
103    # get the ACME directory of urls
104    log.info("Getting directory...")
105    directory_url = CA + "/directory" if CA != DEFAULT_CA else directory_url # backwards compatibility with deprecated CA kwarg
106    directory, _, _ = _do_request(directory_url, err_msg="Error getting directory")
107    log.info("Directory found!")
108
109    # create account, update contact details (if any), and set the global key identifier
110    log.info("Registering account...")
111    reg_payload = {"termsOfServiceAgreed": True}
112    account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
113    log.info("Registered!" if code == 201 else "Already registered!")
114    if contact is not None:
115        account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
116        log.info("Updated contact details:\n{0}".format("\n".join(account['contact'])))
117
118    # create a new order
119    log.info("Creating new order...")
120    order_payload = {"identifiers": [{"type": "dns", "value": d} for d in domains]}
121    order, _, order_headers = _send_signed_request(directory['newOrder'], order_payload, "Error creating new order")
122    log.info("Order created!")
123
124    # get the authorizations that need to be completed
125    for auth_url in order['authorizations']:
126        authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges")
127        domain = authorization['identifier']['value']
128        log.info("Verifying {0}...".format(domain))
129
130        # find the http-01 challenge and write the challenge file
131        challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0]
132        token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
133        keyauthorization = "{0}.{1}".format(token, thumbprint)
134        wellknown_path = os.path.join(acme_dir, token)
135        with open(wellknown_path, "w") as wellknown_file:
136            wellknown_file.write(keyauthorization)
137
138        # check that the file is in place
139        try:
140            wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
141            assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization)
142        except (AssertionError, ValueError) as e:
143            raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e))
144
145        # say the challenge is done
146        _send_signed_request(challenge['url'], {}, "Error submitting challenges: {0}".format(domain))
147        authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain))
148        if authorization['status'] != "valid":
149            raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization))
150        os.remove(wellknown_path)
151        log.info("{0} verified!".format(domain))
152
153    # finalize the order with the csr
154    log.info("Signing certificate...")
155    csr_der = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error")
156    _send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order")
157
158    # poll the order to monitor when it's done
159    order = _poll_until_not(order_headers['Location'], ["pending", "processing"], "Error checking order status")
160    if order['status'] != "valid":
161        raise ValueError("Order failed: {0}".format(order))
162
163    # download the certificate
164    certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed")
165    log.info("Certificate signed!")
166    return certificate_pem
167
168def main(argv=None):
169    parser = argparse.ArgumentParser(
170        formatter_class=argparse.RawDescriptionHelpFormatter,
171        description=textwrap.dedent("""\
172            This script automates the process of getting a signed TLS certificate from Let's Encrypt using
173            the ACME protocol. It will need to be run on your server and have access to your private
174            account key, so PLEASE READ THROUGH IT! It's only ~200 lines, so it won't take long.
175
176            Example Usage:
177            python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt
178
179            Example Crontab Renewal (once per month):
180            0 0 1 * * python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > /path/to/signed_chain.crt 2>> /var/log/acme_tiny.log
181            """)
182    )
183    parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
184    parser.add_argument("--csr", required=True, help="path to your certificate signing request")
185    parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
186    parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
187    parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA")
188    parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt")
189    parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!")
190    parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key")
191
192    args = parser.parse_args(argv)
193    LOGGER.setLevel(args.quiet or LOGGER.level)
194    signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact)
195    sys.stdout.write(signed_crt)
196
197if __name__ == "__main__": # pragma: no cover
198    main(sys.argv[1:])
199