1#!/usr/bin/env python
2# This Source Code Form is subject to the terms of the Mozilla Public
3# License, v. 2.0. If a copy of the MPL was not distributed with this
4# file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6# This script exists to generate the Certificate Authority and server
7# certificates used for SSL testing in Mochitest. The already generated
8# certs are located at $topsrcdir/build/pgo/certs/ .
9
10import mozinfo
11import os
12import random
13import re
14import shutil
15import subprocess
16import sys
17import tempfile
18import distutils
19
20from mozbuild.base import MozbuildObject
21from mozfile import NamedTemporaryFile, TemporaryDirectory
22from mozprofile.permissions import ServerLocations
23
24dbFiles = [
25  re.compile("^cert[0-9]+\.db$"),
26  re.compile("^key[0-9]+\.db$"),
27  re.compile("^secmod\.db$")
28]
29
30def unlinkDbFiles(path):
31  for root, dirs, files in os.walk(path):
32    for name in files:
33      for dbFile in dbFiles:
34        if dbFile.match(name) and os.path.exists(os.path.join(root, name)):
35          os.unlink(os.path.join(root, name))
36
37def dbFilesExist(path):
38  for root, dirs, files in os.walk(path):
39    for name in files:
40      for dbFile in dbFiles:
41        if dbFile.match(name) and os.path.exists(os.path.join(root, name)):
42          return True
43  return False
44
45def runUtil(util, args, inputdata = None, outputstream = None):
46  env = os.environ.copy()
47  if mozinfo.os == "linux":
48    pathvar = "LD_LIBRARY_PATH"
49    app_path = os.path.dirname(util)
50    if pathvar in env:
51      env[pathvar] = "%s%s%s" % (app_path, os.pathsep, env[pathvar])
52    else:
53      env[pathvar] = app_path
54  proc = subprocess.Popen([util] + args, env=env,
55                          stdin=subprocess.PIPE if inputdata else None,
56                          stdout=outputstream)
57  proc.communicate(inputdata)
58  return proc.returncode
59
60def createRandomFile(randomFile):
61  for count in xrange(0, 2048):
62    randomFile.write(chr(random.randint(0, 255)))
63
64def writeCertspecForServerLocations(fd):
65    locations = ServerLocations(os.path.join(build.topsrcdir,
66                                             "build", "pgo",
67                                             "server-locations.txt"))
68    SAN=[]
69    for loc in [i for i in iter(locations) if i.scheme == "https" and "nocert" not in i.options]:
70      customCertOption = False
71      customCertRE = re.compile("^cert=(?:\w+)")
72      for _ in [i for i in loc.options if customCertRE.match(i)]:
73        customCertOption = True
74        break
75
76      if not customCertOption:
77        SAN.append(loc.host)
78
79    fd.write("issuer:printableString/CN=Temporary Certificate Authority/O=Mozilla Testing/OU=Profile Guided Optimization\n")
80    fd.write("subject:{}\n".format(SAN[0]))
81    fd.write("extension:subjectAlternativeName:{}\n".format(",".join(SAN)))
82
83def constructCertDatabase(build, srcDir):
84  certutil = build.get_binary_path(what="certutil")
85  pk12util = build.get_binary_path(what="pk12util")
86  openssl = distutils.spawn.find_executable("openssl")
87  pycert = os.path.join(build.topsrcdir, "security", "manager", "ssl", "tests",
88                        "unit", "pycert.py")
89  pykey = os.path.join(build.topsrcdir, "security", "manager", "ssl", "tests",
90                        "unit", "pykey.py")
91
92
93  with NamedTemporaryFile() as pwfile, NamedTemporaryFile() as rndfile, TemporaryDirectory() as pemfolder:
94    pgoCAPath = os.path.join(srcDir, "pgoca.p12")
95
96    pwfile.write("\n")
97    pwfile.flush()
98
99    if dbFilesExist(srcDir):
100      # Make sure all DB files from src are really deleted
101      unlinkDbFiles(srcDir)
102
103    # Copy  all .certspec and .keyspec files to a temporary directory
104    for root, dirs, files in os.walk(srcDir):
105      for spec in [i for i in files if i.endswith(".certspec") or i.endswith(".keyspec")]:
106        shutil.copyfile(os.path.join(root, spec), os.path.join(pemfolder, spec))
107
108    # Write a certspec for the "server-locations.txt" file to that temporary directory
109    pgoserver_certspec = os.path.join(pemfolder, "pgoserver.certspec")
110    if os.path.exists(pgoserver_certspec):
111      raise Exception("{} already exists, which isn't allowed".format(pgoserver_certspec))
112    with open(pgoserver_certspec, "w") as fd:
113      writeCertspecForServerLocations(fd)
114
115    # Generate certs for all certspecs
116    for root, dirs, files in os.walk(pemfolder):
117      for certspec in [i for i in files if i.endswith(".certspec")]:
118        name = certspec.split(".certspec")[0]
119        pem = os.path.join(pemfolder, "{}.cert.pem".format(name))
120
121        print("Generating public certificate {} (pem={})".format(name, pem))
122
123        with open(os.path.join(root, certspec), "r") as certspec_file:
124          certspec_data = certspec_file.read()
125          with open(pem, "w") as pem_file:
126            status = runUtil(pycert, [], inputdata=certspec_data, outputstream=pem_file)
127            if status:
128              return status
129
130        status = runUtil(certutil, ["-A", "-n", name, "-t", "P,,", "-i", pem, "-d", srcDir, "-f", pwfile.name])
131        if status:
132          return status
133
134
135      for keyspec in [i for i in files if i.endswith(".keyspec")]:
136        parts = keyspec.split(".")
137        name = parts[0]
138        key_type = parts[1]
139        if key_type not in ["ca", "client", "server"]:
140          raise Exception("{}: keyspec filenames must be of the form XXX.client.keyspec or XXX.ca.keyspec (key_type={})".format(keyspec, key_type))
141        key_pem = os.path.join(pemfolder, "{}.key.pem".format(name))
142
143        print("Generating private key {} (pem={})".format(name, key_pem))
144
145        with open(os.path.join(root, keyspec), "r") as keyspec_file:
146          keyspec_data = keyspec_file.read()
147          with open(key_pem, "w") as pem_file:
148            status = runUtil(pykey, [], inputdata=keyspec_data, outputstream=pem_file)
149            if status:
150              return status
151
152        cert_pem = os.path.join(pemfolder, "{}.cert.pem".format(name))
153        if not os.path.exists(cert_pem):
154          raise Exception("There has to be a corresponding certificate named {} for the keyspec {}".format(cert_pem, keyspec))
155
156        p12 = os.path.join(pemfolder, "{}.key.p12".format(name))
157        print("Converting private key {} to PKCS12 (p12={})".format(key_pem, p12))
158        status = runUtil(openssl, ["pkcs12", "-export", "-inkey", key_pem, "-in", cert_pem, "-name", name, "-out", p12, "-passout", "file:"+pwfile.name])
159        if status:
160          return status
161
162        print("Importing private key {} to database".format(key_pem))
163        status = runUtil(pk12util, ["-i", p12, "-d", srcDir, "-w", pwfile.name, "-k", pwfile.name])
164        if status:
165          return status
166
167        if key_type == "ca":
168          shutil.copyfile(cert_pem, os.path.join(srcDir, "{}.ca".format(name)))
169        elif key_type == "client":
170          shutil.copyfile(p12, os.path.join(srcDir, "{}.client".format(name)))
171        elif key_type == "server":
172          pass # Nothing to do for server keys
173        else:
174          raise Exception("State error: Unknown keyspec key_type: {}".format(key_type))
175
176  return 0
177
178build = MozbuildObject.from_environment()
179certdir = os.path.join(build.topsrcdir, "build", "pgo", "certs")
180certificateStatus = constructCertDatabase(build, certdir)
181if certificateStatus:
182  print "TEST-UNEXPECTED-FAIL | SSL Server Certificate generation"
183sys.exit(certificateStatus)
184