1#!/usr/local/bin/python3.8 2# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny 3import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging 4try: 5 from urllib.request import urlopen, Request # Python 3 6except ImportError: 7 from urllib2 import urlopen, Request # Python 2 8 9DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD 10DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" 11 12LOGGER = logging.getLogger(__name__) 13LOGGER.addHandler(logging.StreamHandler()) 14LOGGER.setLevel(logging.INFO) 15 16def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None): 17 directory, acct_headers, alg, jwk = None, None, None, None # global variables 18 19 # helper functions - base64 encode for jose spec 20 def _b64(b): 21 return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") 22 23 # helper function - run external commands 24 def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): 25 proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 26 out, err = proc.communicate(cmd_input) 27 if proc.returncode != 0: 28 raise IOError("{0}\n{1}".format(err_msg, err)) 29 return out 30 31 # helper function - make request and automatically parse json response 32 def _do_request(url, data=None, err_msg="Error", depth=0): 33 try: 34 resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"})) 35 resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers 36 except IOError as e: 37 resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) 38 code, headers = getattr(e, "code", None), {} 39 try: 40 resp_data = json.loads(resp_data) # try to parse json results 41 except ValueError: 42 pass # ignore json parsing errors 43 if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce": 44 raise IndexError(resp_data) # allow 100 retrys for bad nonces 45 if code not in [200, 201, 204]: 46 raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data)) 47 return resp_data, code, headers 48 49 # helper function - make signed requests 50 def _send_signed_request(url, payload, err_msg, depth=0): 51 payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8')) 52 new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce'] 53 protected = {"url": url, "alg": alg, "nonce": new_nonce} 54 protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']}) 55 protected64 = _b64(json.dumps(protected).encode('utf8')) 56 protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8') 57 out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error") 58 data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)}) 59 try: 60 return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth) 61 except IndexError: # retry bad nonces (they raise IndexError) 62 return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) 63 64 # helper function - poll until complete 65 def _poll_until_not(url, pending_statuses, err_msg): 66 result, t0 = None, time.time() 67 while result is None or result['status'] in pending_statuses: 68 assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout 69 time.sleep(0 if result is None else 2) 70 result, _, _ = _send_signed_request(url, None, err_msg) 71 return result 72 73 # parse account key to get public key 74 log.info("Parsing account key...") 75 out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error") 76 pub_pattern = r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)" 77 pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups() 78 pub_exp = "{0:x}".format(int(pub_exp)) 79 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 80 alg = "RS256" 81 jwk = { 82 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 83 "kty": "RSA", 84 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 85 } 86 accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':')) 87 thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest()) 88 89 # find domains 90 log.info("Parsing CSR...") 91 out = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {0}".format(csr)) 92 domains = set([]) 93 common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8')) 94 if common_name is not None: 95 domains.add(common_name.group(1)) 96 subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL) 97 if subject_alt_names is not None: 98 for san in subject_alt_names.group(1).split(", "): 99 if san.startswith("DNS:"): 100 domains.add(san[4:]) 101 log.info("Found domains: {0}".format(", ".join(domains))) 102 103 # get the ACME directory of urls 104 log.info("Getting directory...") 105 directory_url = CA + "/directory" if CA != DEFAULT_CA else directory_url # backwards compatibility with deprecated CA kwarg 106 directory, _, _ = _do_request(directory_url, err_msg="Error getting directory") 107 log.info("Directory found!") 108 109 # create account, update contact details (if any), and set the global key identifier 110 log.info("Registering account...") 111 reg_payload = {"termsOfServiceAgreed": True} 112 account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering") 113 log.info("Registered!" if code == 201 else "Already registered!") 114 if contact is not None: 115 account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details") 116 log.info("Updated contact details:\n{0}".format("\n".join(account['contact']))) 117 118 # create a new order 119 log.info("Creating new order...") 120 order_payload = {"identifiers": [{"type": "dns", "value": d} for d in domains]} 121 order, _, order_headers = _send_signed_request(directory['newOrder'], order_payload, "Error creating new order") 122 log.info("Order created!") 123 124 # get the authorizations that need to be completed 125 for auth_url in order['authorizations']: 126 authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges") 127 domain = authorization['identifier']['value'] 128 log.info("Verifying {0}...".format(domain)) 129 130 # find the http-01 challenge and write the challenge file 131 challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0] 132 token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) 133 keyauthorization = "{0}.{1}".format(token, thumbprint) 134 wellknown_path = os.path.join(acme_dir, token) 135 with open(wellknown_path, "w") as wellknown_file: 136 wellknown_file.write(keyauthorization) 137 138 # check that the file is in place 139 try: 140 wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) 141 assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization) 142 except (AssertionError, ValueError) as e: 143 raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) 144 145 # say the challenge is done 146 _send_signed_request(challenge['url'], {}, "Error submitting challenges: {0}".format(domain)) 147 authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain)) 148 if authorization['status'] != "valid": 149 raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization)) 150 os.remove(wellknown_path) 151 log.info("{0} verified!".format(domain)) 152 153 # finalize the order with the csr 154 log.info("Signing certificate...") 155 csr_der = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error") 156 _send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order") 157 158 # poll the order to monitor when it's done 159 order = _poll_until_not(order_headers['Location'], ["pending", "processing"], "Error checking order status") 160 if order['status'] != "valid": 161 raise ValueError("Order failed: {0}".format(order)) 162 163 # download the certificate 164 certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed") 165 log.info("Certificate signed!") 166 return certificate_pem 167 168def main(argv=None): 169 parser = argparse.ArgumentParser( 170 formatter_class=argparse.RawDescriptionHelpFormatter, 171 description=textwrap.dedent("""\ 172 This script automates the process of getting a signed TLS certificate from Let's Encrypt using 173 the ACME protocol. It will need to be run on your server and have access to your private 174 account key, so PLEASE READ THROUGH IT! It's only ~200 lines, so it won't take long. 175 176 Example Usage: 177 python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt 178 179 Example Crontab Renewal (once per month): 180 0 0 1 * * python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > /path/to/signed_chain.crt 2>> /var/log/acme_tiny.log 181 """) 182 ) 183 parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") 184 parser.add_argument("--csr", required=True, help="path to your certificate signing request") 185 parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory") 186 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") 187 parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA") 188 parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") 189 parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!") 190 parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key") 191 192 args = parser.parse_args(argv) 193 LOGGER.setLevel(args.quiet or LOGGER.level) 194 signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact) 195 sys.stdout.write(signed_crt) 196 197if __name__ == "__main__": # pragma: no cover 198 main(sys.argv[1:]) 199