1# -*- coding: utf-8 -*-
2# vim:ts=4:sw=4:expandtab
3
4""" BinGPG is a "gpg" or "gpg2" command line wrapper which
5implements all operations we need for Autocrypt usage.
6It is not meant as a general wrapper outside Autocrypt
7contexts.
8"""
9
10from __future__ import print_function, unicode_literals
11import logging
12from distutils.version import LooseVersion as V
13import six
14import os
15import sys
16from subprocess import Popen, PIPE
17from contextlib import contextmanager
18import tempfile
19import re
20iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt')
21
22
23def cached_property(f):
24    # returns a property definition which lazily computes and
25    # caches the result of calling f.  The property also allows
26    # setting the value (before or after read).
27    def get(self):
28        propcache = self.__dict__.setdefault("_property_cache", {})
29        key = f.__name__
30        try:
31            return propcache[key]
32        except KeyError:
33            x = self._property_cache[key] = f(self)
34            return x
35
36    def set(self, val):
37        propcache = self.__dict__.setdefault("_property_cache", {})
38        propcache[f.__name__] = val
39    return property(get, set)
40
41
42class InvocationFailure(Exception):
43    def __init__(self, ret, cmd, out, err, extrainfo=None):
44        self.ret = ret
45        self.cmd = cmd
46        self.out = out
47        self.err = err
48        self.extrainfo = extrainfo
49
50    def __str__(self):
51        lines = ["GPG Command '%s' retcode=%d" % (self.cmd, self.ret)]
52        for name, olines in [("stdout:", self.out.__str__()),
53                             ("stderr:", self.err)]:
54            lines.append(name)
55            for line in olines.splitlines():
56                lines.append("  " + line)
57        if self.extrainfo:
58            lines.append(self.extrainfo)
59        return "\n".join(lines)
60
61
62class BinGPG(object):
63    """ basic wrapper for gpg command line invocations. """
64    InvocationFailure = InvocationFailure
65
66    def __init__(self, homedir=None, gpgpath="gpg"):
67        """
68        :type homedir: unicode or None
69        :param homedir: gpg home directory, if None system gpg homedir is used.
70        :type gpgpath: unicode
71        :param gpgpath:
72            If the path contains path separators and points
73            to an existing file we use it directly.
74            If it contains no path separators, we lookup
75            the path to the binary under the system's PATH.
76            If we can not determine an eventual binary
77            we raise ValueError.
78        """
79        self.homedir = homedir
80        p = find_executable(gpgpath)
81        if p is None:
82            raise ValueError("could not find binary for {!r}".format(gpgpath))
83        self.gpgpath = p
84        self._ensure_init()
85
86    def __str__(self):
87        return "BinGPG(gpgpath={gpgpath!r}, homedir={homedir!r})".format(
88            gpgpath=self.gpgpath, homedir=self.homedir)
89
90    @cached_property
91    def _version_info(self):
92        return self._gpg_out(['--version'])
93
94    @cached_property
95    def gpg_version(self):
96        vline = self._version_info.split('\n', 1)[0]
97        return V(vline.split(' ')[2])
98
99    @cached_property
100    def isgpg2(self, min_version=V("2.0")):
101        return self.gpg_version >= min_version
102
103    @cached_property
104    def isgpg21(self, min_version=V("2.1")):
105        return self.gpg_version >= min_version
106
107    def _ensure_init(self):
108        if self.homedir is None:
109            return
110
111        if not os.path.exists(self.homedir):
112            # we create the dir if the basedir exists, otherwise we fail
113            os.makedirs(self.homedir)
114            os.chmod(self.homedir, 0o700)
115
116        # fix bad defaults for certain gpg2 versions
117        if V("2.0") <= self.gpg_version < V("2.1.12"):
118            p = os.path.join(self.homedir, "gpg-agent.conf")
119            if not os.path.exists(p):
120                with open(p, "w") as f:
121                    f.write("allow-loopback-pinentry\n")
122
123    def killagent(self):
124        if self.isgpg2:
125            args = [find_executable("gpg-connect-agent"), "--no-autostart"]
126            args += self._homedirflags + ["KILLAGENT"]
127            popen = Popen(args)
128            popen.wait()
129
130    @contextmanager
131    def temp_written_file(self, data):
132        with tempfile.NamedTemporaryFile(delete=False) as f:
133            f.write(data)
134        try:
135            yield f.name
136        finally:
137            os.remove(f.name)
138
139    @property
140    def _homedirflags(self):
141        return ["--homedir", self.homedir] if self.homedir else []
142
143    @cached_property
144    def _nopassphrase(self):
145        opts = ["--passphrase", "123"]
146        if self.isgpg21:
147            opts.append("--pinentry-mode=loopback")
148        return opts
149
150    def _gpg_out(self, argv, input=None, strict=False, encoding="utf8"):
151        return self._gpg_outerr(argv, input=input, strict=strict, encoding=encoding)[0]
152
153    def _gpg_outerr(self, argv, input=None, strict=False, encoding="utf8"):
154        """ return stdout and stderr output of invoking gpg with the
155        specified parameters.
156
157        If the invocation leads to a non-zero exit
158        status an InvocationFailure exception is thrown.  It is also
159        thrown if strict is True and there was non-empty stderr output.
160        stderr output will always be returned as a text type (utf8-decoded)
161        while stdout output is returned decoded if encoding is set (default is "utf8").
162        If you want binary stdout output specify encoding=None.
163        """
164        assert input is None or isinstance(input, bytes)
165        args = [self.gpgpath, "--batch"] + self._homedirflags
166        # make sure we use unicode for all provided arguments
167
168        def ensure_unicode(x):
169            return x.decode("utf8") if isinstance(x, bytes) else x
170        args.extend(map(ensure_unicode, argv))
171
172        # open the process with a C locale, pipe everything
173        env = os.environ.copy()
174        env["LANG"] = "C"
175        env["LANGUAGE"] = "C"
176        env["LC_ALL"] = "en_US.UTF-8"
177        popen = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
178
179        # some debugging info
180        G = os.environ.get("GNUPGHOME")
181        extra = "" if not G else ("GNUPGHOME=" + G + " ")
182        logging.debug("$ %s%s", extra, " ".join(args))
183
184        out, err = popen.communicate(input=input)
185        ret = popen.wait()
186        if ret == 130:
187            raise KeyboardInterrupt("detected in gpg invocation")
188        err = err.decode("utf-8")
189        if encoding:
190            out = out.decode(encoding)
191        if ret != 0 or (strict and err):
192            raise self.InvocationFailure(ret, " ".join(args),
193                                         out=out, err=err)
194        return out, err
195
196    def supports_eddsa(self):
197        for l in self._version_info.split('\n'):
198            if l.startswith('Pubkey:'):
199                return 'eddsa' in map(
200                    lambda x: x.strip().lower(), l.split(':', 1)[1].split(','))
201        return False
202
203    def gen_secret_key(self, emailadr):
204        spec = "\n".join([
205            "Key-Type: RSA",
206            "Key-Length: 3072",
207            "Key-Usage: sign",
208            "Subkey-Type: RSA",
209            "Subkey-Length: 3072",
210            "Subkey-Usage: encrypt",
211            "Name-Email: " + emailadr,
212            "Expire-Date: 0",
213            "%commit"
214        ]).encode("utf8")
215        with self.temp_written_file(spec) as fn:
216            try:
217                out, err = self._gpg_outerr(self._nopassphrase + ["--gen-key", fn])
218            except InvocationFailure as e:
219                e.extrainfo = open(fn).read()
220                raise
221
222        keyhandle = self._find_keyhandle(err)
223        logging.debug("created secret key: %s", keyhandle)
224        return keyhandle
225
226    def list_secret_keyinfos(self, keyhandle=None):
227        args = ["--skip-verify", "--with-colons", "--list-secret-keys"]
228        if keyhandle is not None:
229            args.append(keyhandle)
230        return self._parse_list(args, ("sec", "ssb"))
231
232    def get_secret_keyhandle(self, keyhandle):
233        assert isinstance(keyhandle, six.text_type)
234        for k in self.list_secret_keyinfos(keyhandle):
235            is_in_uids = any(keyhandle in uid for uid in k.uids)
236            if is_in_uids or k.match(keyhandle):
237                return k.id
238        return None
239
240    def list_public_keyinfos(self, keyhandle=None):
241        args = ["--skip-verify", "--with-colons", "--list-public-keys"]
242        if keyhandle is not None:
243            args.append(keyhandle)
244        return self._parse_list(args, ("pub", "sub"))
245
246    def _parse_list(self, args, types):
247        out = self._gpg_out(args)
248        keyinfos = []
249        last_main_type_keyinfo = None
250        for line in out.splitlines():
251            parts = line.split(":")
252            if parts[0] in types:
253                keyinfos.append(
254                    KeyInfo(type=parts[3], bits=int(parts[2]), uid=parts[9],
255                            id=parts[4], date_created=parts[5]))
256                if parts[0] == types[0]:
257                    last_main_type_keyinfo = keyinfos[-1]
258            elif parts[0] == "uid":
259                last_main_type_keyinfo.uids.append(parts[9])
260        return keyinfos
261
262    def _find_keyhandle(self, string, _pattern=re.compile("key (?:ID )?([0-9A-F]+)")):
263        m = _pattern.search(string)
264        assert m and len(m.groups()) == 1, string
265        x = m.groups()[0]
266
267        # now search the fingerprint if we only have a shortid
268        if len(x) <= 8:   # keyid has 8 hex bytes
269            keyinfos = self.list_public_keyinfos(x)
270            for k in keyinfos:
271                if k.match(x):
272                    return k.id
273            raise ValueError("could not find fingerprint %r in %r" % (x, keyinfos))
274        # note that this might be a 16-char fingerprint or a 40-char one (gpg-2.1.18)
275        return x
276
277    def list_secret_key_packets(self, keyhandle):
278        return self.list_packets(self.get_secret_keydata(keyhandle))
279
280    def list_public_key_packets(self, keyhandle):
281        return self.list_packets(self.get_public_keydata(keyhandle))
282
283    def list_packets(self, keydata):
284        out = self._gpg_out(["--list-packets"], input=keydata)
285        # build up a list of (pkgname, pkgvalue, lines) tuples
286        packets = []
287        lines = []
288        last_package_type = None
289        for rawline in out.splitlines():
290            line = rawline.strip()
291            c = line[0:1]
292            if c == "#":
293                continue
294            if c == ":":
295                i = line[1:].find(c)
296                if i != -1:
297                    ptype = line[1: i + 1]
298                    pvalue = line[i + 2:].strip()
299                    if last_package_type is not None:
300                        packets.append(last_package_type + (lines,))
301                        lines = []
302                    last_package_type = (ptype, pvalue)
303            else:
304                assert last_package_type, line
305                lines.append(line)
306        else:
307            packets.append(last_package_type + (lines,))
308        return packets
309
310    def get_public_keydata(self, keyhandle, armor=False):
311        args = ["-a"] if armor else []
312        args.extend(["--export-options=export-minimal", "--export", str(keyhandle)])
313        out = self._gpg_out(args, strict=True, encoding=None)
314        return out
315
316    def get_secret_keydata(self, keyhandle, armor=False):
317        args = ["-a"] if armor else []
318        args.extend(self._nopassphrase + ["--export-options=export-minimal",
319                    "--export-secret-key", keyhandle])
320        return self._gpg_out(args, strict=True, encoding=None)
321
322    def encrypt(self, data, recipients, signkey=None, text=False):
323        opts = self._nopassphrase + ["--encrypt", "--always-trust"]
324        for r in recipients:
325            opts.extend(["-r", r])
326        if signkey:
327            opts.extend(["--sign", "-u", signkey])
328        if text:
329            opts.extend(["--armor"])
330        # print(opts)
331        return self._gpg_out(opts, input=data, encoding=None)
332
333    def sign(self, data, keyhandle):
334        args = self._nopassphrase + ["--detach-sign", "-u", keyhandle]
335        return self._gpg_out(args, input=data, encoding=None)
336
337    def verify(self, data, signature):
338        with self.temp_written_file(signature) as sig_fn:
339            out, err = self._gpg_outerr(["--verify", sig_fn, "-"], input=data)
340        return self._find_keyhandle(err)
341
342    def decrypt(self, enc_data):
343        args = self._nopassphrase + ["--with-colons", "--decrypt"]
344        out, err = self._gpg_outerr(args, input=enc_data, encoding=None)
345        lines = err.splitlines()
346        keyinfos = []
347        while lines:
348            line1 = lines.pop(0)
349            m = re.match(r"gpg.*with (\d+)-bit (\w+).*"
350                         r"ID (\w+).*created (.*)", line1)
351            if m:
352                bits, keytype, id, date = m.groups()
353                line2 = lines.pop(0)
354                if line2.startswith("    "):
355                    uid = line2.strip().strip('"')
356                keyinfos.append(KeyInfo(keytype, bits, id, uid, date))
357        return out, keyinfos
358
359    def import_keydata(self, keydata, minimize=False):
360        out, err = self._gpg_outerr(["--skip-verify", "--import"], input=keydata)
361        kh = self._find_keyhandle(err)
362        if minimize:
363            # get_public_keydata gets us a minimized key
364            minimized_keydata = self.get_public_keydata(kh)
365            self._gpg_outerr(["--yes", "--delete-key", kh])
366            _, err = self._gpg_outerr(["--skip-verify", "--import"], input=minimized_keydata)
367            min_kh = self._find_keyhandle(err)
368            assert min_kh == kh
369        return kh
370
371
372class KeyInfo:
373    def __init__(self, type, bits, id, uid, date_created):
374        self.type = type
375        self.bits = int(bits)
376        self.id = id
377        self.uids = [uid] if uid else []
378        self.date_created = date_created
379
380    def match(self, other_id):
381        i = min(len(other_id), len(self.id))
382        return self.id[-i:].lower() == other_id[-i:].lower()
383
384    def __str__(self):
385        return "KeyInfo(id={id!r}, uids={uids!r}, bits={bits}, type={type})".format(
386            **self.__dict__)
387
388    __repr__ = __str__
389
390
391def find_executable(name):
392    """ return a path object found by looking at the systems
393        underlying PATH specification.  If an executable
394        cannot be found, None is returned. copied and adapted
395        from py.path.local.sysfind.
396    """
397    if os.path.isabs(name):
398        return name if os.path.isfile(name) else None
399    else:
400        if iswin32:
401            paths = os.environ['Path'].split(';')
402            if '' not in paths and '.' not in paths:
403                paths.append('.')
404            try:
405                systemroot = os.environ['SYSTEMROOT']
406            except KeyError:
407                pass
408            else:
409                paths = [re.sub('%SystemRoot%', systemroot, path)
410                         for path in paths]
411        else:
412            paths = os.environ['PATH'].split(':')
413        tryadd = []
414        if iswin32:
415            tryadd += os.environ['PATHEXT'].split(os.pathsep)
416        tryadd.append("")
417
418        for x in paths:
419            for addext in tryadd:
420                p = os.path.join(x, name) + addext
421                try:
422                    if os.path.isfile(p):
423                        return p
424                except Exception:
425                    pass
426    return None
427