1# Copyright (C) 2016 g10 Code GmbH 2# 3# This file is part of GPGME. 4# 5# GPGME is free software; you can redistribute it and/or modify it 6# under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# GPGME is distributed in the hope that it will be useful, but WITHOUT 11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 12# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General 13# Public License for more details. 14# 15# You should have received a copy of the GNU Lesser General Public 16# License along with this program; if not, see <https://www.gnu.org/licenses/>. 17 18from __future__ import absolute_import, print_function, unicode_literals 19 20import contextlib 21import shutil 22import sys 23import os 24import re 25import tempfile 26import time 27import gpg 28 29del absolute_import, print_function, unicode_literals 30 31 32def assert_gpg_version(version=(2, 1, 0)): 33 with gpg.Context() as c: 34 clean_version = re.match(r'\d+\.\d+\.\d+', 35 c.engine_info.version).group(0) 36 if tuple(map(int, clean_version.split('.'))) < version: 37 print("GnuPG too old: have {0}, need {1}.".format( 38 c.engine_info.version, '.'.join(map(str, version)))) 39 sys.exit(77) 40 41def is_gpg_version(version): 42 with gpg.Context() as c: 43 clean_version = re.match(r'\d+\.\d+\.\d+', 44 c.engine_info.version).group(0) 45 return tuple(map(int, clean_version.split('.'))) == version 46 47 48def have_tofu_support(ctx, some_uid): 49 keys = list( 50 ctx.keylist( 51 some_uid, 52 mode=(gpg.constants.keylist.mode.LOCAL | 53 gpg.constants.keylist.mode.WITH_TOFU))) 54 return len(keys) > 0 55 56 57# Skip the Python tests for GnuPG < 2.1.12. Prior versions do not 58# understand the command line flags that we assume exist. C.f. issue 59# 3008. 60assert_gpg_version((2, 1, 12)) 61 62# known keys 63alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734" 64bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2" 65encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C" 66sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D" 67no_such_key = "A" * 40 68 69 70def make_filename(name): 71 return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name) 72 73 74def in_srcdir(name): 75 return os.path.join(os.environ['srcdir'], name) 76 77 78verbose = int(os.environ.get('verbose', 0)) > 1 79 80 81def print_data(data): 82 if verbose: 83 try: 84 # See if it is a file-like object. 85 data.seek(0, os.SEEK_SET) 86 data = data.read() 87 except: 88 # Hope for the best. 89 pass 90 91 if hasattr(sys.stdout, "buffer"): 92 sys.stdout.buffer.write(data) 93 else: 94 sys.stdout.write(data) 95 96 97def mark_key_trusted(ctx, key): 98 class Editor(object): 99 def __init__(self): 100 self.steps = ["trust", "save"] 101 102 def edit(self, status, args, out): 103 if args == "keyedit.prompt": 104 result = self.steps.pop(0) 105 elif args == "edit_ownertrust.value": 106 result = "5" 107 elif args == "edit_ownertrust.set_ultimate.okay": 108 result = "Y" 109 elif args == "keyedit.save.okay": 110 result = "Y" 111 else: 112 result = None 113 return result 114 115 with gpg.Data() as sink: 116 ctx.op_edit(key, Editor().edit, sink, sink) 117 118 119# Python3.2 and up has tempfile.TemporaryDirectory, but we cannot use 120# that, because there shutil.rmtree is used without 121# ignore_errors=True, and that races against gpg-agent deleting its 122# sockets. 123class TemporaryDirectory(object): 124 def __enter__(self): 125 self.path = tempfile.mkdtemp() 126 return self.path 127 128 def __exit__(self, *args): 129 shutil.rmtree(self.path, ignore_errors=True) 130 131 132@contextlib.contextmanager 133def EphemeralContext(): 134 with TemporaryDirectory() as tmp: 135 home = os.environ['GNUPGHOME'] 136 shutil.copy(os.path.join(home, "gpg.conf"), tmp) 137 shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp) 138 139 with gpg.Context(home_dir=tmp) as ctx: 140 yield ctx 141 142 # Ask the agent to quit. 143 agent_socket = os.path.join(tmp, "S.gpg-agent") 144 ctx.protocol = gpg.constants.protocol.ASSUAN 145 ctx.set_engine_info(ctx.protocol, file_name=agent_socket) 146 try: 147 ctx.assuan_transact(["KILLAGENT"]) 148 except gpg.errors.GPGMEError as e: 149 if e.getcode() == gpg.errors.ASS_CONNECT_FAILED: 150 pass # the agent was not running 151 else: 152 raise 153 154 # Block until it is really gone. 155 while os.path.exists(agent_socket): 156 time.sleep(.01) 157