1# -*- coding: utf-8 -*- 2# vim:ts=4:sw=4:expandtab 3 4""" BinGPG is a "gpg" or "gpg2" command line wrapper which 5implements all operations we need for Autocrypt usage. 6It is not meant as a general wrapper outside Autocrypt 7contexts. 8""" 9 10from __future__ import print_function, unicode_literals 11import logging 12from distutils.version import LooseVersion as V 13import six 14import os 15import sys 16from subprocess import Popen, PIPE 17from contextlib import contextmanager 18import tempfile 19import re 20iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt') 21 22 23def cached_property(f): 24 # returns a property definition which lazily computes and 25 # caches the result of calling f. The property also allows 26 # setting the value (before or after read). 27 def get(self): 28 propcache = self.__dict__.setdefault("_property_cache", {}) 29 key = f.__name__ 30 try: 31 return propcache[key] 32 except KeyError: 33 x = self._property_cache[key] = f(self) 34 return x 35 36 def set(self, val): 37 propcache = self.__dict__.setdefault("_property_cache", {}) 38 propcache[f.__name__] = val 39 return property(get, set) 40 41 42class InvocationFailure(Exception): 43 def __init__(self, ret, cmd, out, err, extrainfo=None): 44 self.ret = ret 45 self.cmd = cmd 46 self.out = out 47 self.err = err 48 self.extrainfo = extrainfo 49 50 def __str__(self): 51 lines = ["GPG Command '%s' retcode=%d" % (self.cmd, self.ret)] 52 for name, olines in [("stdout:", self.out.__str__()), 53 ("stderr:", self.err)]: 54 lines.append(name) 55 for line in olines.splitlines(): 56 lines.append(" " + line) 57 if self.extrainfo: 58 lines.append(self.extrainfo) 59 return "\n".join(lines) 60 61 62class BinGPG(object): 63 """ basic wrapper for gpg command line invocations. """ 64 InvocationFailure = InvocationFailure 65 66 def __init__(self, homedir=None, gpgpath="gpg"): 67 """ 68 :type homedir: unicode or None 69 :param homedir: gpg home directory, if None system gpg homedir is used. 70 :type gpgpath: unicode 71 :param gpgpath: 72 If the path contains path separators and points 73 to an existing file we use it directly. 74 If it contains no path separators, we lookup 75 the path to the binary under the system's PATH. 76 If we can not determine an eventual binary 77 we raise ValueError. 78 """ 79 self.homedir = homedir 80 p = find_executable(gpgpath) 81 if p is None: 82 raise ValueError("could not find binary for {!r}".format(gpgpath)) 83 self.gpgpath = p 84 self._ensure_init() 85 86 def __str__(self): 87 return "BinGPG(gpgpath={gpgpath!r}, homedir={homedir!r})".format( 88 gpgpath=self.gpgpath, homedir=self.homedir) 89 90 @cached_property 91 def _version_info(self): 92 return self._gpg_out(['--version']) 93 94 @cached_property 95 def gpg_version(self): 96 vline = self._version_info.split('\n', 1)[0] 97 return V(vline.split(' ')[2]) 98 99 @cached_property 100 def isgpg2(self, min_version=V("2.0")): 101 return self.gpg_version >= min_version 102 103 @cached_property 104 def isgpg21(self, min_version=V("2.1")): 105 return self.gpg_version >= min_version 106 107 def _ensure_init(self): 108 if self.homedir is None: 109 return 110 111 if not os.path.exists(self.homedir): 112 # we create the dir if the basedir exists, otherwise we fail 113 os.makedirs(self.homedir) 114 os.chmod(self.homedir, 0o700) 115 116 # fix bad defaults for certain gpg2 versions 117 if V("2.0") <= self.gpg_version < V("2.1.12"): 118 p = os.path.join(self.homedir, "gpg-agent.conf") 119 if not os.path.exists(p): 120 with open(p, "w") as f: 121 f.write("allow-loopback-pinentry\n") 122 123 def killagent(self): 124 if self.isgpg2: 125 args = [find_executable("gpg-connect-agent"), "--no-autostart"] 126 args += self._homedirflags + ["KILLAGENT"] 127 popen = Popen(args) 128 popen.wait() 129 130 @contextmanager 131 def temp_written_file(self, data): 132 with tempfile.NamedTemporaryFile(delete=False) as f: 133 f.write(data) 134 try: 135 yield f.name 136 finally: 137 os.remove(f.name) 138 139 @property 140 def _homedirflags(self): 141 return ["--homedir", self.homedir] if self.homedir else [] 142 143 @cached_property 144 def _nopassphrase(self): 145 opts = ["--passphrase", "123"] 146 if self.isgpg21: 147 opts.append("--pinentry-mode=loopback") 148 return opts 149 150 def _gpg_out(self, argv, input=None, strict=False, encoding="utf8"): 151 return self._gpg_outerr(argv, input=input, strict=strict, encoding=encoding)[0] 152 153 def _gpg_outerr(self, argv, input=None, strict=False, encoding="utf8"): 154 """ return stdout and stderr output of invoking gpg with the 155 specified parameters. 156 157 If the invocation leads to a non-zero exit 158 status an InvocationFailure exception is thrown. It is also 159 thrown if strict is True and there was non-empty stderr output. 160 stderr output will always be returned as a text type (utf8-decoded) 161 while stdout output is returned decoded if encoding is set (default is "utf8"). 162 If you want binary stdout output specify encoding=None. 163 """ 164 assert input is None or isinstance(input, bytes) 165 args = [self.gpgpath, "--batch"] + self._homedirflags 166 # make sure we use unicode for all provided arguments 167 168 def ensure_unicode(x): 169 return x.decode("utf8") if isinstance(x, bytes) else x 170 args.extend(map(ensure_unicode, argv)) 171 172 # open the process with a C locale, pipe everything 173 env = os.environ.copy() 174 env["LANG"] = "C" 175 env["LANGUAGE"] = "C" 176 env["LC_ALL"] = "en_US.UTF-8" 177 popen = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env) 178 179 # some debugging info 180 G = os.environ.get("GNUPGHOME") 181 extra = "" if not G else ("GNUPGHOME=" + G + " ") 182 logging.debug("$ %s%s", extra, " ".join(args)) 183 184 out, err = popen.communicate(input=input) 185 ret = popen.wait() 186 if ret == 130: 187 raise KeyboardInterrupt("detected in gpg invocation") 188 err = err.decode("utf-8") 189 if encoding: 190 out = out.decode(encoding) 191 if ret != 0 or (strict and err): 192 raise self.InvocationFailure(ret, " ".join(args), 193 out=out, err=err) 194 return out, err 195 196 def supports_eddsa(self): 197 for l in self._version_info.split('\n'): 198 if l.startswith('Pubkey:'): 199 return 'eddsa' in map( 200 lambda x: x.strip().lower(), l.split(':', 1)[1].split(',')) 201 return False 202 203 def gen_secret_key(self, emailadr): 204 spec = "\n".join([ 205 "Key-Type: RSA", 206 "Key-Length: 3072", 207 "Key-Usage: sign", 208 "Subkey-Type: RSA", 209 "Subkey-Length: 3072", 210 "Subkey-Usage: encrypt", 211 "Name-Email: " + emailadr, 212 "Expire-Date: 0", 213 "%commit" 214 ]).encode("utf8") 215 with self.temp_written_file(spec) as fn: 216 try: 217 out, err = self._gpg_outerr(self._nopassphrase + ["--gen-key", fn]) 218 except InvocationFailure as e: 219 e.extrainfo = open(fn).read() 220 raise 221 222 keyhandle = self._find_keyhandle(err) 223 logging.debug("created secret key: %s", keyhandle) 224 return keyhandle 225 226 def list_secret_keyinfos(self, keyhandle=None): 227 args = ["--skip-verify", "--with-colons", "--list-secret-keys"] 228 if keyhandle is not None: 229 args.append(keyhandle) 230 return self._parse_list(args, ("sec", "ssb")) 231 232 def get_secret_keyhandle(self, keyhandle): 233 assert isinstance(keyhandle, six.text_type) 234 for k in self.list_secret_keyinfos(keyhandle): 235 is_in_uids = any(keyhandle in uid for uid in k.uids) 236 if is_in_uids or k.match(keyhandle): 237 return k.id 238 return None 239 240 def list_public_keyinfos(self, keyhandle=None): 241 args = ["--skip-verify", "--with-colons", "--list-public-keys"] 242 if keyhandle is not None: 243 args.append(keyhandle) 244 return self._parse_list(args, ("pub", "sub")) 245 246 def _parse_list(self, args, types): 247 out = self._gpg_out(args) 248 keyinfos = [] 249 last_main_type_keyinfo = None 250 for line in out.splitlines(): 251 parts = line.split(":") 252 if parts[0] in types: 253 keyinfos.append( 254 KeyInfo(type=parts[3], bits=int(parts[2]), uid=parts[9], 255 id=parts[4], date_created=parts[5])) 256 if parts[0] == types[0]: 257 last_main_type_keyinfo = keyinfos[-1] 258 elif parts[0] == "uid": 259 last_main_type_keyinfo.uids.append(parts[9]) 260 return keyinfos 261 262 def _find_keyhandle(self, string, _pattern=re.compile("key (?:ID )?([0-9A-F]+)")): 263 m = _pattern.search(string) 264 assert m and len(m.groups()) == 1, string 265 x = m.groups()[0] 266 267 # now search the fingerprint if we only have a shortid 268 if len(x) <= 8: # keyid has 8 hex bytes 269 keyinfos = self.list_public_keyinfos(x) 270 for k in keyinfos: 271 if k.match(x): 272 return k.id 273 raise ValueError("could not find fingerprint %r in %r" % (x, keyinfos)) 274 # note that this might be a 16-char fingerprint or a 40-char one (gpg-2.1.18) 275 return x 276 277 def list_secret_key_packets(self, keyhandle): 278 return self.list_packets(self.get_secret_keydata(keyhandle)) 279 280 def list_public_key_packets(self, keyhandle): 281 return self.list_packets(self.get_public_keydata(keyhandle)) 282 283 def list_packets(self, keydata): 284 out = self._gpg_out(["--list-packets"], input=keydata) 285 # build up a list of (pkgname, pkgvalue, lines) tuples 286 packets = [] 287 lines = [] 288 last_package_type = None 289 for rawline in out.splitlines(): 290 line = rawline.strip() 291 c = line[0:1] 292 if c == "#": 293 continue 294 if c == ":": 295 i = line[1:].find(c) 296 if i != -1: 297 ptype = line[1: i + 1] 298 pvalue = line[i + 2:].strip() 299 if last_package_type is not None: 300 packets.append(last_package_type + (lines,)) 301 lines = [] 302 last_package_type = (ptype, pvalue) 303 else: 304 assert last_package_type, line 305 lines.append(line) 306 else: 307 packets.append(last_package_type + (lines,)) 308 return packets 309 310 def get_public_keydata(self, keyhandle, armor=False): 311 args = ["-a"] if armor else [] 312 args.extend(["--export-options=export-minimal", "--export", str(keyhandle)]) 313 out = self._gpg_out(args, strict=True, encoding=None) 314 return out 315 316 def get_secret_keydata(self, keyhandle, armor=False): 317 args = ["-a"] if armor else [] 318 args.extend(self._nopassphrase + ["--export-options=export-minimal", 319 "--export-secret-key", keyhandle]) 320 return self._gpg_out(args, strict=True, encoding=None) 321 322 def encrypt(self, data, recipients, signkey=None, text=False): 323 opts = self._nopassphrase + ["--encrypt", "--always-trust"] 324 for r in recipients: 325 opts.extend(["-r", r]) 326 if signkey: 327 opts.extend(["--sign", "-u", signkey]) 328 if text: 329 opts.extend(["--armor"]) 330 # print(opts) 331 return self._gpg_out(opts, input=data, encoding=None) 332 333 def sign(self, data, keyhandle): 334 args = self._nopassphrase + ["--detach-sign", "-u", keyhandle] 335 return self._gpg_out(args, input=data, encoding=None) 336 337 def verify(self, data, signature): 338 with self.temp_written_file(signature) as sig_fn: 339 out, err = self._gpg_outerr(["--verify", sig_fn, "-"], input=data) 340 return self._find_keyhandle(err) 341 342 def decrypt(self, enc_data): 343 args = self._nopassphrase + ["--with-colons", "--decrypt"] 344 out, err = self._gpg_outerr(args, input=enc_data, encoding=None) 345 lines = err.splitlines() 346 keyinfos = [] 347 while lines: 348 line1 = lines.pop(0) 349 m = re.match(r"gpg.*with (\d+)-bit (\w+).*" 350 r"ID (\w+).*created (.*)", line1) 351 if m: 352 bits, keytype, id, date = m.groups() 353 line2 = lines.pop(0) 354 if line2.startswith(" "): 355 uid = line2.strip().strip('"') 356 keyinfos.append(KeyInfo(keytype, bits, id, uid, date)) 357 return out, keyinfos 358 359 def import_keydata(self, keydata, minimize=False): 360 out, err = self._gpg_outerr(["--skip-verify", "--import"], input=keydata) 361 kh = self._find_keyhandle(err) 362 if minimize: 363 # get_public_keydata gets us a minimized key 364 minimized_keydata = self.get_public_keydata(kh) 365 self._gpg_outerr(["--yes", "--delete-key", kh]) 366 _, err = self._gpg_outerr(["--skip-verify", "--import"], input=minimized_keydata) 367 min_kh = self._find_keyhandle(err) 368 assert min_kh == kh 369 return kh 370 371 372class KeyInfo: 373 def __init__(self, type, bits, id, uid, date_created): 374 self.type = type 375 self.bits = int(bits) 376 self.id = id 377 self.uids = [uid] if uid else [] 378 self.date_created = date_created 379 380 def match(self, other_id): 381 i = min(len(other_id), len(self.id)) 382 return self.id[-i:].lower() == other_id[-i:].lower() 383 384 def __str__(self): 385 return "KeyInfo(id={id!r}, uids={uids!r}, bits={bits}, type={type})".format( 386 **self.__dict__) 387 388 __repr__ = __str__ 389 390 391def find_executable(name): 392 """ return a path object found by looking at the systems 393 underlying PATH specification. If an executable 394 cannot be found, None is returned. copied and adapted 395 from py.path.local.sysfind. 396 """ 397 if os.path.isabs(name): 398 return name if os.path.isfile(name) else None 399 else: 400 if iswin32: 401 paths = os.environ['Path'].split(';') 402 if '' not in paths and '.' not in paths: 403 paths.append('.') 404 try: 405 systemroot = os.environ['SYSTEMROOT'] 406 except KeyError: 407 pass 408 else: 409 paths = [re.sub('%SystemRoot%', systemroot, path) 410 for path in paths] 411 else: 412 paths = os.environ['PATH'].split(':') 413 tryadd = [] 414 if iswin32: 415 tryadd += os.environ['PATHEXT'].split(os.pathsep) 416 tryadd.append("") 417 418 for x in paths: 419 for addext in tryadd: 420 p = os.path.join(x, name) + addext 421 try: 422 if os.path.isfile(p): 423 return p 424 except Exception: 425 pass 426 return None 427