1# -*- test-case-name: twisted.conch.test.test_ckeygen -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Implementation module for the `ckeygen` command.
7"""
8
9
10import getpass
11import os
12import socket
13import sys
14from functools import wraps
15from imp import reload
16
17from twisted.conch.ssh import keys
18from twisted.python import failure, filepath, log, usage
19
20if getpass.getpass == getpass.unix_getpass:  # type: ignore[attr-defined]
21    try:
22        import termios  # hack around broken termios
23
24        termios.tcgetattr, termios.tcsetattr
25    except (ImportError, AttributeError):
26        sys.modules["termios"] = None  # type: ignore[assignment]
27        reload(getpass)
28
29supportedKeyTypes = dict()
30
31
32def _keyGenerator(keyType):
33    def assignkeygenerator(keygenerator):
34        @wraps(keygenerator)
35        def wrapper(*args, **kwargs):
36            return keygenerator(*args, **kwargs)
37
38        supportedKeyTypes[keyType] = wrapper
39        return wrapper
40
41    return assignkeygenerator
42
43
44class GeneralOptions(usage.Options):
45    synopsis = """Usage:    ckeygen [options]
46 """
47
48    longdesc = "ckeygen manipulates public/private keys in various ways."
49
50    optParameters = [
51        ["bits", "b", None, "Number of bits in the key to create."],
52        ["filename", "f", None, "Filename of the key file."],
53        ["type", "t", None, "Specify type of key to create."],
54        ["comment", "C", None, "Provide new comment."],
55        ["newpass", "N", None, "Provide new passphrase."],
56        ["pass", "P", None, "Provide old passphrase."],
57        ["format", "o", "sha256-base64", "Fingerprint format of key file."],
58        [
59            "private-key-subtype",
60            None,
61            None,
62            'OpenSSH private key subtype to write ("PEM" or "v1").',
63        ],
64    ]
65
66    optFlags = [
67        ["fingerprint", "l", "Show fingerprint of key file."],
68        ["changepass", "p", "Change passphrase of private key file."],
69        ["quiet", "q", "Quiet."],
70        ["no-passphrase", None, "Create the key with no passphrase."],
71        ["showpub", "y", "Read private key file and print public key."],
72    ]
73
74    compData = usage.Completions(
75        optActions={
76            "type": usage.CompleteList(list(supportedKeyTypes.keys())),
77            "private-key-subtype": usage.CompleteList(["PEM", "v1"]),
78        }
79    )
80
81
82def run():
83    options = GeneralOptions()
84    try:
85        options.parseOptions(sys.argv[1:])
86    except usage.UsageError as u:
87        print("ERROR: %s" % u)
88        options.opt_help()
89        sys.exit(1)
90    log.discardLogs()
91    log.deferr = handleError  # HACK
92    if options["type"]:
93        if options["type"].lower() in supportedKeyTypes:
94            print("Generating public/private %s key pair." % (options["type"]))
95            supportedKeyTypes[options["type"].lower()](options)
96        else:
97            sys.exit(
98                "Key type was %s, must be one of %s"
99                % (options["type"], ", ".join(supportedKeyTypes.keys()))
100            )
101    elif options["fingerprint"]:
102        printFingerprint(options)
103    elif options["changepass"]:
104        changePassPhrase(options)
105    elif options["showpub"]:
106        displayPublicKey(options)
107    else:
108        options.opt_help()
109        sys.exit(1)
110
111
112def enumrepresentation(options):
113    if options["format"] == "md5-hex":
114        options["format"] = keys.FingerprintFormats.MD5_HEX
115        return options
116    elif options["format"] == "sha256-base64":
117        options["format"] = keys.FingerprintFormats.SHA256_BASE64
118        return options
119    else:
120        raise keys.BadFingerPrintFormat(
121            "Unsupported fingerprint format: {}".format(options["format"])
122        )
123
124
125def handleError():
126    global exitStatus
127    exitStatus = 2
128    log.err(failure.Failure())
129    raise
130
131
132@_keyGenerator("rsa")
133def generateRSAkey(options):
134    from cryptography.hazmat.backends import default_backend
135    from cryptography.hazmat.primitives.asymmetric import rsa
136
137    if not options["bits"]:
138        options["bits"] = 1024
139    keyPrimitive = rsa.generate_private_key(
140        key_size=int(options["bits"]),
141        public_exponent=65537,
142        backend=default_backend(),
143    )
144    key = keys.Key(keyPrimitive)
145    _saveKey(key, options)
146
147
148@_keyGenerator("dsa")
149def generateDSAkey(options):
150    from cryptography.hazmat.backends import default_backend
151    from cryptography.hazmat.primitives.asymmetric import dsa
152
153    if not options["bits"]:
154        options["bits"] = 1024
155    keyPrimitive = dsa.generate_private_key(
156        key_size=int(options["bits"]),
157        backend=default_backend(),
158    )
159    key = keys.Key(keyPrimitive)
160    _saveKey(key, options)
161
162
163@_keyGenerator("ecdsa")
164def generateECDSAkey(options):
165    from cryptography.hazmat.backends import default_backend
166    from cryptography.hazmat.primitives.asymmetric import ec
167
168    if not options["bits"]:
169        options["bits"] = 256
170    # OpenSSH supports only mandatory sections of RFC5656.
171    # See https://www.openssh.com/txt/release-5.7
172    curve = b"ecdsa-sha2-nistp" + str(options["bits"]).encode("ascii")
173    keyPrimitive = ec.generate_private_key(
174        curve=keys._curveTable[curve], backend=default_backend()
175    )
176    key = keys.Key(keyPrimitive)
177    _saveKey(key, options)
178
179
180@_keyGenerator("ed25519")
181def generateEd25519key(options):
182    from cryptography.hazmat.primitives.asymmetric import ed25519
183
184    keyPrimitive = ed25519.Ed25519PrivateKey.generate()
185    key = keys.Key(keyPrimitive)
186    _saveKey(key, options)
187
188
189def _defaultPrivateKeySubtype(keyType):
190    """
191    Return a reasonable default private key subtype for a given key type.
192
193    @type keyType: L{str}
194    @param keyType: A key type, as returned by
195        L{twisted.conch.ssh.keys.Key.type}.
196
197    @rtype: L{str}
198    @return: A private OpenSSH key subtype (C{'PEM'} or C{'v1'}).
199    """
200    if keyType == "Ed25519":
201        # No PEM format is defined for Ed25519 keys.
202        return "v1"
203    else:
204        return "PEM"
205
206
207def printFingerprint(options):
208    if not options["filename"]:
209        filename = os.path.expanduser("~/.ssh/id_rsa")
210        options["filename"] = input("Enter file in which the key is (%s): " % filename)
211    if os.path.exists(options["filename"] + ".pub"):
212        options["filename"] += ".pub"
213    options = enumrepresentation(options)
214    try:
215        key = keys.Key.fromFile(options["filename"])
216        print(
217            "%s %s %s"
218            % (
219                key.size(),
220                key.fingerprint(options["format"]),
221                os.path.basename(options["filename"]),
222            )
223        )
224    except keys.BadKeyError:
225        sys.exit("bad key")
226
227
228def changePassPhrase(options):
229    if not options["filename"]:
230        filename = os.path.expanduser("~/.ssh/id_rsa")
231        options["filename"] = input("Enter file in which the key is (%s): " % filename)
232    try:
233        key = keys.Key.fromFile(options["filename"])
234    except keys.EncryptedKeyError:
235        # Raised if password not supplied for an encrypted key
236        if not options.get("pass"):
237            options["pass"] = getpass.getpass("Enter old passphrase: ")
238        try:
239            key = keys.Key.fromFile(options["filename"], passphrase=options["pass"])
240        except keys.BadKeyError:
241            sys.exit("Could not change passphrase: old passphrase error")
242        except keys.EncryptedKeyError as e:
243            sys.exit(f"Could not change passphrase: {e}")
244    except keys.BadKeyError as e:
245        sys.exit(f"Could not change passphrase: {e}")
246
247    if not options.get("newpass"):
248        while 1:
249            p1 = getpass.getpass("Enter new passphrase (empty for no passphrase): ")
250            p2 = getpass.getpass("Enter same passphrase again: ")
251            if p1 == p2:
252                break
253            print("Passphrases do not match.  Try again.")
254        options["newpass"] = p1
255
256    if options.get("private-key-subtype") is None:
257        options["private-key-subtype"] = _defaultPrivateKeySubtype(key.type())
258
259    try:
260        newkeydata = key.toString(
261            "openssh",
262            subtype=options["private-key-subtype"],
263            passphrase=options["newpass"],
264        )
265    except Exception as e:
266        sys.exit(f"Could not change passphrase: {e}")
267
268    try:
269        keys.Key.fromString(newkeydata, passphrase=options["newpass"])
270    except (keys.EncryptedKeyError, keys.BadKeyError) as e:
271        sys.exit(f"Could not change passphrase: {e}")
272
273    with open(options["filename"], "wb") as fd:
274        fd.write(newkeydata)
275
276    print("Your identification has been saved with the new passphrase.")
277
278
279def displayPublicKey(options):
280    if not options["filename"]:
281        filename = os.path.expanduser("~/.ssh/id_rsa")
282        options["filename"] = input("Enter file in which the key is (%s): " % filename)
283    try:
284        key = keys.Key.fromFile(options["filename"])
285    except keys.EncryptedKeyError:
286        if not options.get("pass"):
287            options["pass"] = getpass.getpass("Enter passphrase: ")
288        key = keys.Key.fromFile(options["filename"], passphrase=options["pass"])
289    displayKey = key.public().toString("openssh").decode("ascii")
290    print(displayKey)
291
292
293def _inputSaveFile(prompt: str) -> str:
294    """
295    Ask the user where to save the key.
296
297    This needs to be a separate function so the unit test can patch it.
298    """
299    return input(prompt)
300
301
302def _saveKey(key, options):
303    """
304    Persist a SSH key on local filesystem.
305
306    @param key: Key which is persisted on local filesystem.
307    @type key: C{keys.Key} implementation.
308
309    @param options:
310    @type options: L{dict}
311    """
312    KeyTypeMapping = {"EC": "ecdsa", "Ed25519": "ed25519", "RSA": "rsa", "DSA": "dsa"}
313    keyTypeName = KeyTypeMapping[key.type()]
314    if not options["filename"]:
315        defaultPath = os.path.expanduser(f"~/.ssh/id_{keyTypeName}")
316        newPath = _inputSaveFile(
317            f"Enter file in which to save the key ({defaultPath}): "
318        )
319
320        options["filename"] = newPath.strip() or defaultPath
321
322    if os.path.exists(options["filename"]):
323        print("{} already exists.".format(options["filename"]))
324        yn = input("Overwrite (y/n)? ")
325        if yn[0].lower() != "y":
326            sys.exit()
327
328    if options.get("no-passphrase"):
329        options["pass"] = b""
330    elif not options["pass"]:
331        while 1:
332            p1 = getpass.getpass("Enter passphrase (empty for no passphrase): ")
333            p2 = getpass.getpass("Enter same passphrase again: ")
334            if p1 == p2:
335                break
336            print("Passphrases do not match.  Try again.")
337        options["pass"] = p1
338
339    if options.get("private-key-subtype") is None:
340        options["private-key-subtype"] = _defaultPrivateKeySubtype(key.type())
341
342    comment = f"{getpass.getuser()}@{socket.gethostname()}"
343
344    filepath.FilePath(options["filename"]).setContent(
345        key.toString(
346            "openssh",
347            subtype=options["private-key-subtype"],
348            passphrase=options["pass"],
349        )
350    )
351    os.chmod(options["filename"], 33152)
352
353    filepath.FilePath(options["filename"] + ".pub").setContent(
354        key.public().toString("openssh", comment=comment)
355    )
356    options = enumrepresentation(options)
357
358    print("Your identification has been saved in {}".format(options["filename"]))
359    print("Your public key has been saved in {}.pub".format(options["filename"]))
360    print("The key fingerprint in {} is:".format(options["format"]))
361    print(key.fingerprint(options["format"]))
362
363
364if __name__ == "__main__":
365    run()
366