1#!/usr/bin/env python
2# This Source Code Form is subject to the terms of the Mozilla Public
3# License, v. 2.0. If a copy of the MPL was not distributed with this
4# file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6# This script exists to generate the Certificate Authority and server
7# certificates used for SSL testing in Mochitest. The already generated
8# certs are located at $topsrcdir/build/pgo/certs/ .
9
10import mozinfo
11import os
12import random
13import re
14import shutil
15import subprocess
16import sys
17
18from mozbuild.base import MozbuildObject, BinaryNotFoundException
19from mozfile import NamedTemporaryFile, TemporaryDirectory
20from mozprofile.permissions import ServerLocations
21from distutils.spawn import find_executable
22
23dbFiles = [
24    re.compile("^cert[0-9]+\.db$"),
25    re.compile("^key[0-9]+\.db$"),
26    re.compile("^secmod\.db$"),
27]
28
29
30def unlinkDbFiles(path):
31    for root, dirs, files in os.walk(path):
32        for name in files:
33            for dbFile in dbFiles:
34                if dbFile.match(name) and os.path.exists(os.path.join(root, name)):
35                    os.unlink(os.path.join(root, name))
36
37
38def dbFilesExist(path):
39    for root, dirs, files in os.walk(path):
40        for name in files:
41            for dbFile in dbFiles:
42                if dbFile.match(name) and os.path.exists(os.path.join(root, name)):
43                    return True
44    return False
45
46
47def runUtil(util, args, inputdata=None, outputstream=None):
48    env = os.environ.copy()
49    if mozinfo.os == "linux":
50        pathvar = "LD_LIBRARY_PATH"
51        app_path = os.path.dirname(util)
52        if pathvar in env:
53            env[pathvar] = "%s%s%s" % (app_path, os.pathsep, env[pathvar])
54        else:
55            env[pathvar] = app_path
56    proc = subprocess.Popen(
57        [util] + args,
58        env=env,
59        stdin=subprocess.PIPE if inputdata else None,
60        stdout=outputstream,
61        universal_newlines=True,
62    )
63    proc.communicate(inputdata)
64    return proc.returncode
65
66
67def createRandomFile(randomFile):
68    for count in xrange(0, 2048):
69        randomFile.write(chr(random.randint(0, 255)))
70
71
72def writeCertspecForServerLocations(fd):
73    locations = ServerLocations(
74        os.path.join(build.topsrcdir, "build", "pgo", "server-locations.txt")
75    )
76    SAN = []
77    for loc in [
78        i for i in iter(locations) if i.scheme == "https" and "nocert" not in i.options
79    ]:
80        customCertOption = False
81        customCertRE = re.compile("^cert=(?:\w+)")
82        for _ in [i for i in loc.options if customCertRE.match(i)]:
83            customCertOption = True
84            break
85
86        if "ipV4Address" in loc.options:
87            loc.host = "ip4:" + loc.host
88
89        if not customCertOption:
90            SAN.append(loc.host)
91
92    fd.write(
93        "issuer:printableString/CN=Temporary Certificate Authority/O=Mozilla Testing/OU=Profile Guided Optimization\n"  # NOQA: E501
94    )
95    fd.write("subject:{}\n".format(SAN[0]))
96    fd.write("extension:subjectAlternativeName:{}\n".format(",".join(SAN)))
97
98
99def constructCertDatabase(build, srcDir):
100    try:
101        certutil = build.get_binary_path(what="certutil")
102        pk12util = build.get_binary_path(what="pk12util")
103    except BinaryNotFoundException as e:
104        print("{}\n\n{}\n".format(e, e.help()))
105        return 1
106    openssl = find_executable("openssl")
107    pycert = os.path.join(build.topsrcdir, "security", "manager", "tools", "pycert.py")
108    pykey = os.path.join(build.topsrcdir, "security", "manager", "tools", "pykey.py")
109
110    with NamedTemporaryFile(mode="wt+") as pwfile, TemporaryDirectory() as pemfolder:
111        pwfile.write("\n")
112        pwfile.flush()
113
114        if dbFilesExist(srcDir):
115            # Make sure all DB files from src are really deleted
116            unlinkDbFiles(srcDir)
117
118        # Copy  all .certspec and .keyspec files to a temporary directory
119        for root, dirs, files in os.walk(srcDir):
120            for spec in [
121                i for i in files if i.endswith(".certspec") or i.endswith(".keyspec")
122            ]:
123                shutil.copyfile(os.path.join(root, spec), os.path.join(pemfolder, spec))
124
125        # Write a certspec for the "server-locations.txt" file to that temporary directory
126        pgoserver_certspec = os.path.join(pemfolder, "pgoserver.certspec")
127        if os.path.exists(pgoserver_certspec):
128            raise Exception(
129                "{} already exists, which isn't allowed".format(pgoserver_certspec)
130            )
131        with open(pgoserver_certspec, "w") as fd:
132            writeCertspecForServerLocations(fd)
133
134        # Generate certs for all certspecs
135        for root, dirs, files in os.walk(pemfolder):
136            for certspec in [i for i in files if i.endswith(".certspec")]:
137                name = certspec.split(".certspec")[0]
138                pem = os.path.join(pemfolder, "{}.cert.pem".format(name))
139
140                print("Generating public certificate {} (pem={})".format(name, pem))
141
142                with open(os.path.join(root, certspec), "r") as certspec_file:
143                    certspec_data = certspec_file.read()
144                    with open(pem, "w") as pem_file:
145                        status = runUtil(
146                            pycert, [], inputdata=certspec_data, outputstream=pem_file
147                        )
148                        if status:
149                            return status
150
151                status = runUtil(
152                    certutil,
153                    [
154                        "-A",
155                        "-n",
156                        name,
157                        "-t",
158                        "P,,",
159                        "-i",
160                        pem,
161                        "-d",
162                        srcDir,
163                        "-f",
164                        pwfile.name,
165                    ],
166                )
167                if status:
168                    return status
169
170            for keyspec in [i for i in files if i.endswith(".keyspec")]:
171                parts = keyspec.split(".")
172                name = parts[0]
173                key_type = parts[1]
174                if key_type not in ["ca", "client", "server"]:
175                    raise Exception(
176                        "{}: keyspec filenames must be of the form XXX.client.keyspec "
177                        "or XXX.ca.keyspec (key_type={})".format(keyspec, key_type)
178                    )
179                key_pem = os.path.join(pemfolder, "{}.key.pem".format(name))
180
181                print("Generating private key {} (pem={})".format(name, key_pem))
182
183                with open(os.path.join(root, keyspec), "r") as keyspec_file:
184                    keyspec_data = keyspec_file.read()
185                    with open(key_pem, "w") as pem_file:
186                        status = runUtil(
187                            pykey, [], inputdata=keyspec_data, outputstream=pem_file
188                        )
189                        if status:
190                            return status
191
192                cert_pem = os.path.join(pemfolder, "{}.cert.pem".format(name))
193                if not os.path.exists(cert_pem):
194                    raise Exception(
195                        "There has to be a corresponding certificate named {} for "
196                        "the keyspec {}".format(cert_pem, keyspec)
197                    )
198
199                p12 = os.path.join(pemfolder, "{}.key.p12".format(name))
200                print(
201                    "Converting private key {} to PKCS12 (p12={})".format(key_pem, p12)
202                )
203                status = runUtil(
204                    openssl,
205                    [
206                        "pkcs12",
207                        "-export",
208                        "-inkey",
209                        key_pem,
210                        "-in",
211                        cert_pem,
212                        "-name",
213                        name,
214                        "-out",
215                        p12,
216                        "-passout",
217                        "file:" + pwfile.name,
218                    ],
219                )
220                if status:
221                    return status
222
223                print("Importing private key {} to database".format(key_pem))
224                status = runUtil(
225                    pk12util,
226                    ["-i", p12, "-d", srcDir, "-w", pwfile.name, "-k", pwfile.name],
227                )
228                if status:
229                    return status
230
231                if key_type == "ca":
232                    shutil.copyfile(
233                        cert_pem, os.path.join(srcDir, "{}.ca".format(name))
234                    )
235                elif key_type == "client":
236                    shutil.copyfile(p12, os.path.join(srcDir, "{}.client".format(name)))
237                elif key_type == "server":
238                    pass  # Nothing to do for server keys
239                else:
240                    raise Exception(
241                        "State error: Unknown keyspec key_type: {}".format(key_type)
242                    )
243
244    return 0
245
246
247build = MozbuildObject.from_environment()
248certdir = os.path.join(build.topsrcdir, "build", "pgo", "certs")
249certificateStatus = constructCertDatabase(build, certdir)
250if certificateStatus:
251    print("TEST-UNEXPECTED-FAIL | SSL Server Certificate generation")
252sys.exit(certificateStatus)
253