1# Copyright (c) 2008-2014 by Vinay Sajip.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are met:
6#
7#    * Redistributions of source code must retain the above copyright notice,
8#      this list of conditions and the following disclaimer.
9#    * Redistributions in binary form must reproduce the above copyright notice,
10#      this list of conditions and the following disclaimer in the documentation
11#      and/or other materials provided with the distribution.
12#    * The name(s) of the copyright holder(s) may not be used to endorse or
13#      promote products derived from this software without specific prior
14#      written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) "AS IS" AND ANY EXPRESS OR
17# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19# EVENT SHALL THE COPYRIGHT HOLDER(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
20# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
24# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27""" A wrapper for the 'gpg' command::
28
29Portions of this module are derived from A.M. Kuchling's well-designed
30GPG.py, using Richard Jones' updated version 1.3, which can be found
31in the pycrypto CVS repository on Sourceforge:
32
33http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
34
35This module is *not* forward-compatible with amk's; some of the
36old interface has changed.  For instance, since I've added decrypt
37functionality, I elected to initialize with a 'gnupghome' argument
38instead of 'keyring', so that gpg can find both the public and secret
39keyrings.  I've also altered some of the returned objects in order for
40the caller to not have to know as much about the internals of the
41result classes.
42
43While the rest of ISconf is released under the GPL, I am releasing
44this single file under the same terms that A.M. Kuchling used for
45pycrypto.
46
47Steve Traugott, stevegt@terraluna.org
48Thu Jun 23 21:27:20 PDT 2005
49
50This version of the module has been modified from Steve Traugott's version
51(see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
52Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
53and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
54the previous versions.
55
56Modifications Copyright (C) 2008-2017 Vinay Sajip. All rights reserved.
57
58A unittest harness (test_gnupg.py) has also been added.
59"""
60
61__version__ = "0.4.1"
62__author__ = "Vinay Sajip"
63__date__  = "$07-Jul-2017 15:09:20$"
64
65try:
66    from io import StringIO
67except ImportError:  # pragma: no cover
68    from cStringIO import StringIO
69
70import codecs
71import locale
72import logging
73import os
74import re
75import socket
76from subprocess import Popen
77from subprocess import PIPE
78import sys
79import threading
80
81STARTUPINFO = None
82if os.name == 'nt':  # pragma: no cover
83    try:
84        from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
85    except ImportError:
86        STARTUPINFO = None
87
88try:
89    import logging.NullHandler as NullHandler
90except ImportError:
91    class NullHandler(logging.Handler):
92        def handle(self, record):
93            pass
94try:
95    unicode
96    _py3k = False
97    string_types = basestring
98    text_type = unicode
99except NameError:
100    _py3k = True
101    string_types = str
102    text_type = str
103
104logger = logging.getLogger(__name__)
105if not logger.handlers:
106    logger.addHandler(NullHandler())
107
108# We use the test below because it works for Jython as well as CPython
109if os.path.__name__ == 'ntpath':  # pragma: no cover
110    # On Windows, we don't need shell quoting, other than worrying about
111    # paths with spaces in them.
112    def shell_quote(s):
113        return '"%s"' % s
114else:
115    # Section copied from sarge
116
117    # This regex determines which shell input needs quoting
118    # because it may be unsafe
119    UNSAFE = re.compile(r'[^\w%+,./:=@-]')
120
121    def shell_quote(s):
122        """
123        Quote text so that it is safe for Posix command shells.
124
125        For example, "*.py" would be converted to "'*.py'". If the text is
126        considered safe it is returned unquoted.
127
128        :param s: The value to quote
129        :type s: str (or unicode on 2.x)
130        :return: A safe version of the input, from the point of view of Posix
131                 command shells
132        :rtype: The passed-in type
133        """
134        if not isinstance(s, string_types):  # pragma: no cover
135            raise TypeError('Expected string type, got %s' % type(s))
136        if not s:
137            result = "''"
138        elif not UNSAFE.search(s):
139            result = s
140        else:
141            result = "'%s'" % s.replace("'", r"'\''")
142        return result
143
144    # end of sarge code
145
146# Now that we use shell=False, we shouldn't need to quote arguments.
147# Use no_quote instead of shell_quote to remind us of where quoting
148# was needed. However, note that we still need, on 2.x, to encode any
149# Unicode argument with the file system encoding - see Issue #41 and
150# Python issue #1759845 ("subprocess.call fails with unicode strings in
151# command line").
152
153# Allows the encoding used to be overridden in special cases by setting
154# this module attribute appropriately.
155fsencoding = sys.getfilesystemencoding()
156
157def no_quote(s):
158    if not _py3k and isinstance(s, text_type):
159        s = s.encode(fsencoding)
160    return s
161
162def _copy_data(instream, outstream):
163    # Copy one stream to another
164    sent = 0
165    if hasattr(sys.stdin, 'encoding'):
166        enc = sys.stdin.encoding
167    else:  # pragma: no cover
168        enc = 'ascii'
169    while True:
170        # See issue #39: read can fail when e.g. a text stream is provided
171        # for what is actually a binary file
172        try:
173            data = instream.read(1024)
174        except UnicodeError:
175            logger.warning('Exception occurred while reading', exc_info=1)
176            break
177        if not data:
178            break
179        sent += len(data)
180        # logger.debug("sending chunk (%d): %r", sent, data[:256])
181        try:
182            outstream.write(data)
183        except UnicodeError:  # pragma: no cover
184            outstream.write(data.encode(enc))
185        except:
186            # Can sometimes get 'broken pipe' errors even when the data has all
187            # been sent
188            logger.exception('Error sending data')
189            break
190    try:
191        outstream.close()
192    except IOError:  # pragma: no cover
193        logger.warning('Exception occurred while closing: ignored', exc_info=1)
194    logger.debug("closed output, %d bytes sent", sent)
195
196def _threaded_copy_data(instream, outstream):
197    wr = threading.Thread(target=_copy_data, args=(instream, outstream))
198    wr.setDaemon(True)
199    logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
200    wr.start()
201    return wr
202
203def _write_passphrase(stream, passphrase, encoding):
204    passphrase = '%s\n' % passphrase
205    passphrase = passphrase.encode(encoding)
206    stream.write(passphrase)
207    logger.debug('Wrote passphrase')
208
209def _is_sequence(instance):
210    return isinstance(instance, (list, tuple, set, frozenset))
211
212def _make_memory_stream(s):
213    try:
214        from io import BytesIO
215        rv = BytesIO(s)
216    except ImportError:  # pragma: no cover
217        rv = StringIO(s)
218    return rv
219
220def _make_binary_stream(s, encoding):
221    if _py3k:
222        if isinstance(s, str):
223            s = s.encode(encoding)
224    else:
225        if type(s) is not str:
226            s = s.encode(encoding)
227    return _make_memory_stream(s)
228
229class Verify(object):
230    "Handle status messages for --verify"
231
232    TRUST_UNDEFINED = 0
233    TRUST_NEVER = 1
234    TRUST_MARGINAL = 2
235    TRUST_FULLY = 3
236    TRUST_ULTIMATE = 4
237
238    TRUST_LEVELS = {
239        "TRUST_UNDEFINED" : TRUST_UNDEFINED,
240        "TRUST_NEVER" : TRUST_NEVER,
241        "TRUST_MARGINAL" : TRUST_MARGINAL,
242        "TRUST_FULLY" : TRUST_FULLY,
243        "TRUST_ULTIMATE" : TRUST_ULTIMATE,
244    }
245
246    def __init__(self, gpg):
247        self.gpg = gpg
248        self.valid = False
249        self.fingerprint = self.creation_date = self.timestamp = None
250        self.signature_id = self.key_id = None
251        self.username = None
252        self.key_status = None
253        self.status = None
254        self.pubkey_fingerprint = None
255        self.expire_timestamp = None
256        self.sig_timestamp = None
257        self.trust_text = None
258        self.trust_level = None
259
260    def __nonzero__(self):
261        return self.valid
262
263    __bool__ = __nonzero__
264
265    def handle_status(self, key, value):
266        if key in self.TRUST_LEVELS:
267            self.trust_text = key
268            self.trust_level = self.TRUST_LEVELS[key]
269        elif key in ("WARNING", "ERROR"):
270            logger.warning('potential problem: %s: %s', key, value)
271        elif key == "BADSIG":  # pragma: no cover
272            self.valid = False
273            self.status = 'signature bad'
274            self.key_id, self.username = value.split(None, 1)
275        elif key == "ERRSIG":  # pragma: no cover
276            self.valid = False
277            (self.key_id,
278             algo, hash_algo,
279             cls,
280             self.timestamp) = value.split()[:5]
281            self.status = 'signature error'
282        elif key == "EXPSIG":  # pragma: no cover
283            self.valid = False
284            self.status = 'signature expired'
285            self.key_id, self.username = value.split(None, 1)
286        elif key == "GOODSIG":
287            self.valid = True
288            self.status = 'signature good'
289            self.key_id, self.username = value.split(None, 1)
290        elif key == "VALIDSIG":
291            (self.fingerprint,
292             self.creation_date,
293             self.sig_timestamp,
294             self.expire_timestamp) = value.split()[:4]
295            # may be different if signature is made with a subkey
296            self.pubkey_fingerprint = value.split()[-1]
297            self.status = 'signature valid'
298        elif key == "SIG_ID":
299            (self.signature_id,
300             self.creation_date, self.timestamp) = value.split()
301        elif key == "DECRYPTION_FAILED":  # pragma: no cover
302            self.valid = False
303            self.key_id = value
304            self.status = 'decryption failed'
305        elif key == "NO_PUBKEY":  # pragma: no cover
306            self.valid = False
307            self.key_id = value
308            self.status = 'no public key'
309        elif key in ("EXPKEYSIG", "REVKEYSIG"):  # pragma: no cover
310            # signed with expired or revoked key
311            self.valid = False
312            self.key_id = value.split()[0]
313            if key == "EXPKEYSIG":
314                self.key_status = 'signing key has expired'
315            else:
316                self.key_status = 'signing key was revoked'
317            self.status = self.key_status
318        elif key in ("UNEXPECTED", "FAILURE"):  # pragma: no cover
319            self.valid = False
320            self.key_id = value
321            if key == "UNEXPECTED":
322                self.status = 'unexpected data'
323            else:
324                # N.B. there might be other reasons
325                if not self.status:
326                    self.status = 'incorrect passphrase'
327        elif key in ("DECRYPTION_INFO", "PLAINTEXT", "PLAINTEXT_LENGTH",
328                     "NO_SECKEY", "BEGIN_SIGNING"):
329            pass
330        else:  # pragma: no cover
331            logger.debug('message ignored: %s, %s', key, value)
332
333class ImportResult(object):
334    "Handle status messages for --import"
335
336    counts = '''count no_user_id imported imported_rsa unchanged
337            n_uids n_subk n_sigs n_revoc sec_read sec_imported
338            sec_dups not_imported'''.split()
339    def __init__(self, gpg):
340        self.gpg = gpg
341        self.imported = []
342        self.results = []
343        self.fingerprints = []
344        for result in self.counts:
345            setattr(self, result, None)
346
347    def __nonzero__(self):
348        if self.not_imported: return False
349        if not self.fingerprints: return False
350        return True
351
352    __bool__ = __nonzero__
353
354    ok_reason = {
355        '0': 'Not actually changed',
356        '1': 'Entirely new key',
357        '2': 'New user IDs',
358        '4': 'New signatures',
359        '8': 'New subkeys',
360        '16': 'Contains private key',
361    }
362
363    problem_reason = {
364        '0': 'No specific reason given',
365        '1': 'Invalid Certificate',
366        '2': 'Issuer Certificate missing',
367        '3': 'Certificate Chain too long',
368        '4': 'Error storing certificate',
369    }
370
371    def handle_status(self, key, value):
372        if key in ("WARNING", "ERROR"):
373            logger.warning('potential problem: %s: %s', key, value)
374        elif key in ("IMPORTED", "KEY_CONSIDERED"):
375            # this duplicates info we already see in import_ok & import_problem
376            pass
377        elif key == "NODATA":  # pragma: no cover
378            self.results.append({'fingerprint': None,
379                'problem': '0', 'text': 'No valid data found'})
380        elif key == "IMPORT_OK":
381            reason, fingerprint = value.split()
382            reasons = []
383            for code, text in list(self.ok_reason.items()):
384                if int(reason) | int(code) == int(reason):
385                    reasons.append(text)
386            reasontext = '\n'.join(reasons) + "\n"
387            self.results.append({'fingerprint': fingerprint,
388                'ok': reason, 'text': reasontext})
389            self.fingerprints.append(fingerprint)
390        elif key == "IMPORT_PROBLEM":  # pragma: no cover
391            try:
392                reason, fingerprint = value.split()
393            except:
394                reason = value
395                fingerprint = '<unknown>'
396            self.results.append({'fingerprint': fingerprint,
397                'problem': reason, 'text': self.problem_reason[reason]})
398        elif key == "IMPORT_RES":
399            import_res = value.split()
400            for i, count in enumerate(self.counts):
401                setattr(self, count, int(import_res[i]))
402        elif key == "KEYEXPIRED":  # pragma: no cover
403            self.results.append({'fingerprint': None,
404                'problem': '0', 'text': 'Key expired'})
405        elif key == "SIGEXPIRED":  # pragma: no cover
406            self.results.append({'fingerprint': None,
407                'problem': '0', 'text': 'Signature expired'})
408        elif key == "FAILURE":  # pragma: no cover
409            self.results.append({'fingerprint': None,
410                'problem': '0', 'text': 'Other failure'})
411        else:  # pragma: no cover
412            logger.debug('message ignored: %s, %s', key, value)
413
414    def summary(self):
415        l = []
416        l.append('%d imported' % self.imported)
417        if self.not_imported:  # pragma: no cover
418            l.append('%d not imported' % self.not_imported)
419        return ', '.join(l)
420
421ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
422BASIC_ESCAPES = {
423    r'\n': '\n',
424    r'\r': '\r',
425    r'\f': '\f',
426    r'\v': '\v',
427    r'\b': '\b',
428    r'\0': '\0',
429}
430
431class SendResult(object):
432    def __init__(self, gpg):
433        self.gpg = gpg
434
435    def handle_status(self, key, value):
436        logger.debug('SendResult: %s: %s', key, value)
437
438class SearchKeys(list):
439    ''' Handle status messages for --search-keys.
440
441        Handle pub and uid (relating the latter to the former).
442
443        Don't care about the rest
444    '''
445
446    UID_INDEX = 1
447    FIELDS = 'type keyid algo length date expires'.split()
448
449    def __init__(self, gpg):
450        self.gpg = gpg
451        self.curkey = None
452        self.fingerprints = []
453        self.uids = []
454
455    def get_fields(self, args):
456        result = {}
457        for i, var in enumerate(self.FIELDS):
458            if i < len(args):
459                result[var] = args[i]
460            else:
461                result[var] = 'unavailable'
462        result['uids'] = []
463        result['sigs'] = []
464        return result
465
466    def pub(self, args):
467        self.curkey = curkey = self.get_fields(args)
468        self.append(curkey)
469
470    def uid(self, args):
471        uid = args[self.UID_INDEX]
472        uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
473        for k, v in BASIC_ESCAPES.items():
474            uid = uid.replace(k, v)
475        self.curkey['uids'].append(uid)
476        self.uids.append(uid)
477
478    def handle_status(self, key, value):  # pragma: no cover
479        pass
480
481class ListKeys(SearchKeys):
482    ''' Handle status messages for --list-keys, --list-sigs.
483
484        Handle pub and uid (relating the latter to the former).
485
486        Don't care about (info from src/DETAILS):
487
488        crt = X.509 certificate
489        crs = X.509 certificate and private key available
490        uat = user attribute (same as user id except for field 10).
491        sig = signature
492        rev = revocation signature
493        pkd = public key data (special field format, see below)
494        grp = reserved for gpgsm
495        rvk = revocation key
496    '''
497
498    UID_INDEX = 9
499    FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid sig'.split()
500
501    def __init__(self, gpg):
502        super(ListKeys, self).__init__(gpg)
503        self.in_subkey = False
504        self.key_map = {}
505
506    def key(self, args):
507        self.curkey = curkey = self.get_fields(args)
508        if curkey['uid']:
509            curkey['uids'].append(curkey['uid'])
510        del curkey['uid']
511        curkey['subkeys'] = []
512        self.append(curkey)
513        self.in_subkey = False
514
515    pub = sec = key
516
517    def fpr(self, args):
518        fp = args[9]
519        if fp in self.key_map:  # pragma: no cover
520            raise ValueError('Unexpected fingerprint collision: %s' % fp)
521        if not self.in_subkey:
522            self.curkey['fingerprint'] = fp
523            self.fingerprints.append(fp)
524            self.key_map[fp] = self.curkey
525        else:
526            self.curkey['subkeys'][-1].append(fp)
527            self.key_map[fp] = self.curkey
528
529    def sub(self, args):
530        subkey = [args[4], args[11]]    # keyid, type
531        self.curkey['subkeys'].append(subkey)
532        self.in_subkey = True
533
534    def ssb(self, args):
535        subkey = [args[4], None]    # keyid, type
536        self.curkey['subkeys'].append(subkey)
537        self.in_subkey = True
538
539    def sig(self, args):
540        # keyid, uid, sigclass
541        self.curkey['sigs'].append((args[4], args[9], args[10]))
542
543class ScanKeys(ListKeys):
544    ''' Handle status messages for --with-fingerprint.'''
545
546    def sub(self, args):
547        # --with-fingerprint --with-colons somehow outputs fewer colons,
548        # use the last value args[-1] instead of args[11]
549        subkey = [args[4], args[-1]]
550        self.curkey['subkeys'].append(subkey)
551        self.in_subkey = True
552
553class TextHandler(object):
554    def _as_text(self):
555        return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
556
557    if _py3k:
558        __str__ = _as_text
559    else:
560        __unicode__ = _as_text
561
562        def __str__(self):
563            return self.data
564
565
566class Crypt(Verify, TextHandler):
567    "Handle status messages for --encrypt and --decrypt"
568    def __init__(self, gpg):
569        Verify.__init__(self, gpg)
570        self.data = ''
571        self.ok = False
572        self.status = ''
573
574    def __nonzero__(self):
575        if self.ok: return True
576        return False
577
578    __bool__ = __nonzero__
579
580    def handle_status(self, key, value):
581        if key in ("WARNING", "ERROR"):
582            logger.warning('potential problem: %s: %s', key, value)
583        elif key == "NODATA":
584            self.status = "no data was provided"
585        elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
586                     "MISSING_PASSPHRASE", "DECRYPTION_FAILED",
587                     "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"):
588            self.status = key.replace("_", " ").lower()
589        elif key == "NEED_PASSPHRASE_SYM":
590            self.status = 'need symmetric passphrase'
591        elif key == "BEGIN_DECRYPTION":
592            self.status = 'decryption incomplete'
593        elif key == "BEGIN_ENCRYPTION":
594            self.status = 'encryption incomplete'
595        elif key == "DECRYPTION_OKAY":
596            self.status = 'decryption ok'
597            self.ok = True
598        elif key == "END_ENCRYPTION":
599            self.status = 'encryption ok'
600            self.ok = True
601        elif key == "INV_RECP":  # pragma: no cover
602            self.status = 'invalid recipient'
603        elif key == "KEYEXPIRED":  # pragma: no cover
604            self.status = 'key expired'
605        elif key == "SIG_CREATED":  # pragma: no cover
606            self.status = 'sig created'
607        elif key == "SIGEXPIRED":  # pragma: no cover
608            self.status = 'sig expired'
609        elif key in ("ENC_TO", "USERID_HINT", "GOODMDC",
610                     "END_DECRYPTION", "CARDCTRL", "BADMDC",
611                     "SC_OP_FAILURE", "SC_OP_SUCCESS",
612                     "PINENTRY_LAUNCHED", "KEY_CONSIDERED"):
613            pass
614        else:
615            Verify.handle_status(self, key, value)
616
617class GenKey(object):
618    "Handle status messages for --gen-key"
619    def __init__(self, gpg):
620        self.gpg = gpg
621        self.type = None
622        self.fingerprint = None
623
624    def __nonzero__(self):
625        if self.fingerprint: return True
626        return False
627
628    __bool__ = __nonzero__
629
630    def __str__(self):
631        return self.fingerprint or ''
632
633    def handle_status(self, key, value):
634        if key in ("WARNING", "ERROR"):  # pragma: no cover
635            logger.warning('potential problem: %s: %s', key, value)
636        elif key == "KEY_CREATED":
637            (self.type,self.fingerprint) = value.split()
638        elif key in ("PROGRESS", "GOOD_PASSPHRASE", "KEY_NOT_CREATED"):
639            pass
640        else:  # pragma: no cover
641            logger.debug('message ignored: %s, %s', key, value)
642
643class ExportResult(GenKey):
644    """Handle status messages for --export[-secret-key].
645
646    For now, just use an existing class to base it on - if needed, we
647    can override handle_status for more specific message handling.
648    """
649    def handle_status(self, key, value):
650        if key in ("EXPORTED", "EXPORT_RES"):
651            pass
652        else:
653            super(ExportResult, self).handle_status(key, value)
654
655class DeleteResult(object):
656    "Handle status messages for --delete-key and --delete-secret-key"
657    def __init__(self, gpg):
658        self.gpg = gpg
659        self.status = 'ok'
660
661    def __str__(self):
662        return self.status
663
664    problem_reason = {
665        '1': 'No such key',
666        '2': 'Must delete secret key first',
667        '3': 'Ambiguous specification',
668    }
669
670    def handle_status(self, key, value):
671        if key == "DELETE_PROBLEM":  # pragma: no cover
672            self.status = self.problem_reason.get(value,
673                                                  "Unknown error: %r" % value)
674        else:  # pragma: no cover
675            logger.debug('message ignored: %s, %s', key, value)
676
677    def __nonzero__(self):
678        return self.status == 'ok'
679
680    __bool__ = __nonzero__
681
682
683class Sign(TextHandler):
684    "Handle status messages for --sign"
685    def __init__(self, gpg):
686        self.gpg = gpg
687        self.type = None
688        self.hash_algo = None
689        self.fingerprint = None
690        self.status = None
691
692    def __nonzero__(self):
693        return self.fingerprint is not None
694
695    __bool__ = __nonzero__
696
697    def handle_status(self, key, value):
698        if key in ("WARNING", "ERROR", "FAILURE"):  # pragma: no cover
699            logger.warning('potential problem: %s: %s', key, value)
700        elif key in ("KEYEXPIRED", "SIGEXPIRED"):  # pragma: no cover
701            self.status = 'key expired'
702        elif key == "KEYREVOKED":  # pragma: no cover
703            self.status = 'key revoked'
704        elif key == "SIG_CREATED":
705            (self.type,
706             algo, self.hash_algo, cls,
707             self.timestamp, self.fingerprint
708             ) = value.split()
709            self.status = 'signature created'
710        elif key in ("USERID_HINT", "NEED_PASSPHRASE", "GOOD_PASSPHRASE",
711                     "BAD_PASSPHRASE", "BEGIN_SIGNING"):
712            pass
713        else:  # pragma: no cover
714            logger.debug('message ignored: %s, %s', key, value)
715
716VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
717HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I)
718
719class GPG(object):
720
721    decode_errors = 'strict'
722
723    result_map = {
724        'crypt': Crypt,
725        'delete': DeleteResult,
726        'generate': GenKey,
727        'import': ImportResult,
728        'send': SendResult,
729        'list': ListKeys,
730        'scan': ScanKeys,
731        'search': SearchKeys,
732        'sign': Sign,
733        'verify': Verify,
734        'export': ExportResult,
735    }
736
737    "Encapsulate access to the gpg executable"
738    def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
739                 use_agent=False, keyring=None, options=None,
740                 secret_keyring=None):
741        """Initialize a GPG process wrapper.  Options are:
742
743        gpgbinary -- full pathname for GPG binary.
744
745        gnupghome -- full pathname to where we can find the public and
746        private keyrings.  Default is whatever gpg defaults to.
747        keyring -- name of alternative keyring file to use, or list of such
748        keyrings. If specified, the default keyring is not used.
749        options =-- a list of additional options to pass to the GPG binary.
750        secret_keyring -- name of alternative secret keyring file to use, or
751        list of such keyrings.
752        """
753        self.gpgbinary = gpgbinary
754        self.gnupghome = gnupghome
755        if keyring:
756            # Allow passing a string or another iterable. Make it uniformly
757            # a list of keyring filenames
758            if isinstance(keyring, string_types):
759                keyring = [keyring]
760        self.keyring = keyring
761        if secret_keyring:
762            # Allow passing a string or another iterable. Make it uniformly
763            # a list of keyring filenames
764            if isinstance(secret_keyring, string_types):
765                secret_keyring = [secret_keyring]
766        self.secret_keyring = secret_keyring
767        self.verbose = verbose
768        self.use_agent = use_agent
769        if isinstance(options, str):  # pragma: no cover
770            options = [options]
771        self.options = options
772        self.on_data = None  # or a callable - will be called with data chunks
773        # Changed in 0.3.7 to use Latin-1 encoding rather than
774        # locale.getpreferredencoding falling back to sys.stdin.encoding
775        # falling back to utf-8, because gpg itself uses latin-1 as the default
776        # encoding.
777        self.encoding = 'latin-1'
778        if gnupghome and not os.path.isdir(self.gnupghome):
779            os.makedirs(self.gnupghome,0x1C0)
780        try:
781            p = self._open_subprocess(["--version"])
782        except OSError:
783            msg = 'Unable to run gpg - it may not be available.'
784            logger.exception(msg)
785            raise OSError(msg)
786        result = self.result_map['verify'](self) # any result will do for this
787        self._collect_output(p, result, stdin=p.stdin)
788        if p.returncode != 0:  # pragma: no cover
789            raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
790                                                             result.stderr))
791        m = VERSION_RE.match(result.data)
792        if not m:  # pragma: no cover
793            self.version = None
794        else:
795            dot = '.'.encode('ascii')
796            self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
797
798    def make_args(self, args, passphrase):
799        """
800        Make a list of command line elements for GPG. The value of ``args``
801        will be appended. The ``passphrase`` argument needs to be True if
802        a passphrase will be sent to GPG, else False.
803        """
804        cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty']
805        cmd.extend(['--debug', 'ipc'])
806        if passphrase and hasattr(self, 'version'):
807            if self.version >= (2, 1):
808                cmd[1:1] = ['--pinentry-mode', 'loopback']
809        cmd.extend(['--fixed-list-mode', '--batch', '--with-colons'])
810        if self.gnupghome:
811            cmd.extend(['--homedir',  no_quote(self.gnupghome)])
812        if self.keyring:
813            cmd.append('--no-default-keyring')
814            for fn in self.keyring:
815                cmd.extend(['--keyring', no_quote(fn)])
816        if self.secret_keyring:
817            for fn in self.secret_keyring:
818                cmd.extend(['--secret-keyring', no_quote(fn)])
819        if passphrase:
820            cmd.extend(['--passphrase-fd', '0'])
821        if self.use_agent:  # pragma: no cover
822            cmd.append('--use-agent')
823        if self.options:
824            cmd.extend(self.options)
825        cmd.extend(args)
826        return cmd
827
828    def _open_subprocess(self, args, passphrase=False):
829        # Internal method: open a pipe to a GPG subprocess and return
830        # the file objects for communicating with it.
831
832        # def debug_print(cmd):
833            # result = []
834            # for c in cmd:
835                # if ' ' not in c:
836                    # result.append(c)
837                # else:
838                    # if '"' not in c:
839                        # result.append('"%s"' % c)
840                    # elif "'" not in c:
841                        # result.append("'%s'" % c)
842                    # else:
843                        # result.append(c)  # give up
844            # return ' '.join(cmd)
845        from subprocess import list2cmdline as debug_print
846
847        cmd = self.make_args(args, passphrase)
848        if self.verbose:  # pragma: no cover
849            print(debug_print(cmd))
850        if not STARTUPINFO:
851            si = None
852        else:  # pragma: no cover
853            si = STARTUPINFO()
854            si.dwFlags = STARTF_USESHOWWINDOW
855            si.wShowWindow = SW_HIDE
856        result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE,
857                       startupinfo=si)
858        logger.debug("%s: %s", result.pid, debug_print(cmd))
859        return result
860
861    def _read_response(self, stream, result):
862        # Internal method: reads all the stderr output from GPG, taking notice
863        # only of lines that begin with the magic [GNUPG:] prefix.
864        #
865        # Calls methods on the response object for each valid token found,
866        # with the arg being the remainder of the status line.
867        lines = []
868        while True:
869            line = stream.readline()
870            if len(line) == 0:
871                break
872            lines.append(line)
873            line = line.rstrip()
874            if self.verbose:  # pragma: no cover
875                print(line)
876            logger.debug("%s", line)
877            if line[0:9] == '[GNUPG:] ':
878                # Chop off the prefix
879                line = line[9:]
880                L = line.split(None, 1)
881                keyword = L[0]
882                if len(L) > 1:
883                    value = L[1]
884                else:
885                    value = ""
886                result.handle_status(keyword, value)
887        result.stderr = ''.join(lines)
888
889    def _read_data(self, stream, result, on_data=None):
890        # Read the contents of the file from GPG's stdout
891        chunks = []
892        while True:
893            data = stream.read(1024)
894            if len(data) == 0:
895                break
896            logger.debug("chunk: %r" % data[:256])
897            chunks.append(data)
898            if on_data:
899                on_data(data)
900        if _py3k:
901            # Join using b'' or '', as appropriate
902            result.data = type(data)().join(chunks)
903        else:
904            result.data = ''.join(chunks)
905
906    def _collect_output(self, process, result, writer=None, stdin=None):
907        """
908        Drain the subprocesses output streams, writing the collected output
909        to the result. If a writer thread (writing to the subprocess) is given,
910        make sure it's joined before returning. If a stdin stream is given,
911        close it before returning.
912        """
913        stderr = codecs.getreader(self.encoding)(process.stderr)
914        rr = threading.Thread(target=self._read_response, args=(stderr, result))
915        rr.setDaemon(True)
916        logger.debug('stderr reader: %r', rr)
917        rr.start()
918
919        stdout = process.stdout
920        dr = threading.Thread(target=self._read_data, args=(stdout, result, self.on_data))
921        dr.setDaemon(True)
922        logger.debug('stdout reader: %r', dr)
923        dr.start()
924
925        dr.join()
926        rr.join()
927        if writer is not None:
928            writer.join()
929        process.wait()
930        if stdin is not None:
931            try:
932                stdin.close()
933            except IOError:  # pragma: no cover
934                pass
935        stderr.close()
936        stdout.close()
937
938    def _handle_io(self, args, fileobj, result, passphrase=None, binary=False):
939        "Handle a call to GPG - pass input data, collect output data"
940        # Handle a basic data call - pass data to GPG, handle the output
941        # including status information. Garbage In, Garbage Out :)
942        p = self._open_subprocess(args, passphrase is not None)
943        if not binary:  # pragma: no cover
944            stdin = codecs.getwriter(self.encoding)(p.stdin)
945        else:
946            stdin = p.stdin
947        if passphrase:
948            _write_passphrase(stdin, passphrase, self.encoding)
949        writer = _threaded_copy_data(fileobj, stdin)
950        self._collect_output(p, result, writer, stdin)
951        return result
952
953    #
954    # SIGNATURE METHODS
955    #
956    def sign(self, message, **kwargs):
957        """sign message"""
958        f = _make_binary_stream(message, self.encoding)
959        result = self.sign_file(f, **kwargs)
960        f.close()
961        return result
962
963    def set_output_without_confirmation(self, args, output):
964        "If writing to a file which exists, avoid a confirmation message."
965        if os.path.exists(output):
966            # We need to avoid an overwrite confirmation message
967            args.extend(['--yes'])
968        args.extend(['--output', no_quote(output)])
969
970    def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
971                  detach=False, binary=False, output=None, extra_args=None):
972        """sign file"""
973        logger.debug("sign_file: %s", file)
974        if binary:  # pragma: no cover
975            args = ['-s']
976        else:
977            args = ['-sa']
978        # You can't specify detach-sign and clearsign together: gpg ignores
979        # the detach-sign in that case.
980        if detach:
981            args.append("--detach-sign")
982        elif clearsign:
983            args.append("--clearsign")
984        if keyid:
985            args.extend(['--default-key', no_quote(keyid)])
986        if output:  # write the output to a file with the specified name
987            self.set_output_without_confirmation(args, output)
988
989        if extra_args:
990            args.extend(extra_args)
991        result = self.result_map['sign'](self)
992        #We could use _handle_io here except for the fact that if the
993        #passphrase is bad, gpg bails and you can't write the message.
994        p = self._open_subprocess(args, passphrase is not None)
995        try:
996            stdin = p.stdin
997            if passphrase:
998                _write_passphrase(stdin, passphrase, self.encoding)
999            writer = _threaded_copy_data(file, stdin)
1000        except IOError:  # pragma: no cover
1001            logging.exception("error writing message")
1002            writer = None
1003        self._collect_output(p, result, writer, stdin)
1004        return result
1005
1006    def verify(self, data, **kwargs):
1007        """Verify the signature on the contents of the string 'data'
1008
1009        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1010        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
1011        >>> input = gpg.gen_key_input(passphrase='foo')
1012        >>> key = gpg.gen_key(input)
1013        >>> assert key
1014        >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
1015        >>> assert not sig
1016        >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
1017        >>> assert sig
1018        >>> verify = gpg.verify(sig.data)
1019        >>> assert verify
1020
1021        """
1022        f = _make_binary_stream(data, self.encoding)
1023        result = self.verify_file(f, **kwargs)
1024        f.close()
1025        return result
1026
1027    def verify_file(self, file, data_filename=None, close_file=True, extra_args=None):
1028        "Verify the signature on the contents of the file-like object 'file'"
1029        logger.debug('verify_file: %r, %r', file, data_filename)
1030        result = self.result_map['verify'](self)
1031        args = ['--verify']
1032        if extra_args:
1033            args.extend(extra_args)
1034        if data_filename is None:
1035            self._handle_io(args, file, result, binary=True)
1036        else:
1037            logger.debug('Handling detached verification')
1038            import tempfile
1039            fd, fn = tempfile.mkstemp(prefix='pygpg')
1040            s = file.read()
1041            if close_file:
1042                file.close()
1043            logger.debug('Wrote to temp file: %r', s)
1044            os.write(fd, s)
1045            os.close(fd)
1046            args.append(no_quote(fn))
1047            args.append(no_quote(data_filename))
1048            try:
1049                p = self._open_subprocess(args)
1050                self._collect_output(p, result, stdin=p.stdin)
1051            finally:
1052                os.unlink(fn)
1053        return result
1054
1055    def verify_data(self, sig_filename, data, extra_args=None):
1056        "Verify the signature in sig_filename against data in memory"
1057        logger.debug('verify_data: %r, %r ...', sig_filename, data[:16])
1058        result = self.result_map['verify'](self)
1059        args = ['--verify']
1060        if extra_args:
1061            args.extend(extra_args)
1062        args.extend([no_quote(sig_filename), '-'])
1063        stream = _make_memory_stream(data)
1064        self._handle_io(args, stream, result, binary=True)
1065        return result
1066
1067    #
1068    # KEY MANAGEMENT
1069    #
1070
1071    def import_keys(self, key_data):
1072        """
1073        Import the key_data into our keyring.
1074        """
1075        result = self.result_map['import'](self)
1076        logger.debug('import_keys: %r', key_data[:256])
1077        data = _make_binary_stream(key_data, self.encoding)
1078        self._handle_io(['--import'], data, result, binary=True)
1079        logger.debug('import_keys result: %r', result.__dict__)
1080        data.close()
1081        return result
1082
1083    def recv_keys(self, keyserver, *keyids):
1084        """Import a key from a keyserver
1085
1086        >>> import shutil
1087        >>> shutil.rmtree("keys", ignore_errors=True)
1088        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1089        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
1090        >>> os.chmod('keys', 0x1C0)
1091        >>> result = gpg.recv_keys('pgp.mit.edu', '92905378')
1092        >>> assert result
1093
1094        """
1095        result = self.result_map['import'](self)
1096        logger.debug('recv_keys: %r', keyids)
1097        data = _make_binary_stream("", self.encoding)
1098        #data = ""
1099        args = ['--keyserver', no_quote(keyserver), '--recv-keys']
1100        args.extend([no_quote(k) for k in keyids])
1101        self._handle_io(args, data, result, binary=True)
1102        logger.debug('recv_keys result: %r', result.__dict__)
1103        data.close()
1104        return result
1105
1106    def send_keys(self, keyserver, *keyids):
1107        """Send a key to a keyserver.
1108
1109        Note: it's not practical to test this function without sending
1110        arbitrary data to live keyservers.
1111        """
1112        result = self.result_map['send'](self)
1113        logger.debug('send_keys: %r', keyids)
1114        data = _make_binary_stream('', self.encoding)
1115        #data = ""
1116        args = ['--keyserver', no_quote(keyserver), '--send-keys']
1117        args.extend([no_quote(k) for k in keyids])
1118        self._handle_io(args, data, result, binary=True)
1119        logger.debug('send_keys result: %r', result.__dict__)
1120        data.close()
1121        return result
1122
1123    def delete_keys(self, fingerprints, secret=False, passphrase=None):
1124        which='key'
1125        if secret:  # pragma: no cover
1126            if self.version >= (2, 1) and passphrase is None:
1127                raise ValueError('For GnuPG >= 2.1, deleting secret keys '
1128                                 'needs a passphrase to be provided')
1129            which='secret-key'
1130        if _is_sequence(fingerprints):  # pragma: no cover
1131            fingerprints = [no_quote(s) for s in fingerprints]
1132        else:
1133            fingerprints = [no_quote(fingerprints)]
1134        args = ['--delete-%s' % which]
1135        args.extend(fingerprints)
1136        result = self.result_map['delete'](self)
1137        if not secret or self.version < (2, 1):
1138            p = self._open_subprocess(args)
1139            self._collect_output(p, result, stdin=p.stdin)
1140        else:
1141            # Need to send in a passphrase.
1142            f = _make_binary_stream('', self.encoding)
1143            try:
1144                self._handle_io(args, f, result, passphrase=passphrase,
1145                                binary=True)
1146            finally:
1147                f.close()
1148        return result
1149
1150    def export_keys(self, keyids, secret=False, armor=True, minimal=False,
1151                    passphrase=None):
1152        """
1153        Export the indicated keys. A 'keyid' is anything gpg accepts.
1154
1155        Since GnuPG 2.1, you can't export secret keys without providing a
1156        passphrase.
1157        """
1158
1159        which=''
1160        if secret:
1161            which='-secret-key'
1162            if self.version >= (2, 1) and passphrase is None:
1163                raise ValueError('For GnuPG >= 2.1, exporting secret keys '
1164                                 'needs a passphrase to be provided')
1165        if _is_sequence(keyids):
1166            keyids = [no_quote(k) for k in keyids]
1167        else:
1168            keyids = [no_quote(keyids)]
1169        args = ['--export%s' % which]
1170        if armor:
1171            args.append('--armor')
1172        if minimal:  # pragma: no cover
1173            args.extend(['--export-options','export-minimal'])
1174        args.extend(keyids)
1175        # gpg --export produces no status-fd output; stdout will be
1176        # empty in case of failure
1177        #stdout, stderr = p.communicate()
1178        result = self.result_map['export'](self)
1179        if not secret or self.version < (2, 1):
1180            p = self._open_subprocess(args)
1181            self._collect_output(p, result, stdin=p.stdin)
1182        else:
1183            # Need to send in a passphrase.
1184            f = _make_binary_stream('', self.encoding)
1185            try:
1186                self._handle_io(args, f, result, passphrase=passphrase,
1187                                binary=True)
1188            finally:
1189                f.close()
1190        logger.debug('export_keys result: %r', result.data)
1191        # Issue #49: Return bytes if armor not specified, else text
1192        result = result.data
1193        if armor:
1194            result = result.decode(self.encoding, self.decode_errors)
1195        return result
1196
1197    def _get_list_output(self, p, kind):
1198        # Get the response information
1199        result = self.result_map[kind](self)
1200        self._collect_output(p, result, stdin=p.stdin)
1201        lines = result.data.decode(self.encoding,
1202                                   self.decode_errors).splitlines()
1203        valid_keywords = 'pub uid sec fpr sub ssb sig'.split()
1204        for line in lines:
1205            if self.verbose:  # pragma: no cover
1206                print(line)
1207            logger.debug("line: %r", line.rstrip())
1208            if not line:  # pragma: no cover
1209                break
1210            L = line.strip().split(':')
1211            if not L:  # pragma: no cover
1212                continue
1213            keyword = L[0]
1214            if keyword in valid_keywords:
1215                getattr(result, keyword)(L)
1216        return result
1217
1218    def list_keys(self, secret=False, keys=None, sigs=False):
1219        """ list the keys currently in the keyring
1220
1221        >>> import shutil
1222        >>> shutil.rmtree("keys", ignore_errors=True)
1223        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1224        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
1225        >>> input = gpg.gen_key_input(passphrase='foo')
1226        >>> result = gpg.gen_key(input)
1227        >>> fp1 = result.fingerprint
1228        >>> result = gpg.gen_key(input)
1229        >>> fp2 = result.fingerprint
1230        >>> pubkeys = gpg.list_keys()
1231        >>> assert fp1 in pubkeys.fingerprints
1232        >>> assert fp2 in pubkeys.fingerprints
1233
1234        """
1235
1236        if sigs:
1237            which = 'sigs'
1238        else:            which='keys'
1239        if secret:
1240            which='secret-keys'
1241        args = ['--list-%s' % which,
1242                '--fingerprint', '--fingerprint'] # get subkey FPs, too
1243        if keys:
1244            if isinstance(keys, string_types):
1245                keys = [keys]
1246            args.extend(keys)
1247        p = self._open_subprocess(args)
1248        return self._get_list_output(p, 'list')
1249
1250    def scan_keys(self, filename):
1251        """
1252        List details of an ascii armored or binary key file
1253        without first importing it to the local keyring.
1254
1255        The function achieves this on modern GnuPG by running:
1256
1257        $ gpg --dry-run --import-options import-show --import
1258
1259        On older versions, it does the *much* riskier:
1260
1261        $ gpg --with-fingerprint --with-colons filename
1262        """
1263        if self.version >= (2, 1):
1264            args = ['--dry-run', '--import-options', 'import-show', '--import']
1265        else:
1266            logger.warning('Trying to list packets, but if the file is not a '
1267                           'keyring, might accidentally decrypt')
1268            args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode']
1269        args.append(no_quote(filename))
1270        p = self._open_subprocess(args)
1271        return self._get_list_output(p, 'scan')
1272
1273    def search_keys(self, query, keyserver='pgp.mit.edu'):
1274        """ search keyserver by query (using --search-keys option)
1275
1276        >>> import shutil
1277        >>> shutil.rmtree('keys', ignore_errors=True)
1278        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1279        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
1280        >>> os.chmod('keys', 0x1C0)
1281        >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
1282        >>> assert result, 'Failed using default keyserver'
1283        >>> #keyserver = 'keyserver.ubuntu.com'
1284        >>> #result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
1285        >>> #assert result, 'Failed using keyserver.ubuntu.com'
1286
1287        """
1288        query = query.strip()
1289        if HEX_DIGITS_RE.match(query):
1290            query = '0x' + query
1291        args = ['--fingerprint',
1292                '--keyserver', no_quote(keyserver), '--search-keys',
1293                no_quote(query)]
1294        p = self._open_subprocess(args)
1295
1296        # Get the response information
1297        result = self.result_map['search'](self)
1298        self._collect_output(p, result, stdin=p.stdin)
1299        lines = result.data.decode(self.encoding,
1300                                   self.decode_errors).splitlines()
1301        valid_keywords = ['pub', 'uid']
1302        for line in lines:
1303            if self.verbose:  # pragma: no cover
1304                print(line)
1305            logger.debug('line: %r', line.rstrip())
1306            if not line:    # sometimes get blank lines on Windows
1307                continue
1308            L = line.strip().split(':')
1309            if not L:  # pragma: no cover
1310                continue
1311            keyword = L[0]
1312            if keyword in valid_keywords:
1313                getattr(result, keyword)(L)
1314        return result
1315
1316    def gen_key(self, input):
1317        """Generate a key; you might use gen_key_input() to create the
1318        control input.
1319
1320        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1321        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
1322        >>> input = gpg.gen_key_input(passphrase='foo')
1323        >>> result = gpg.gen_key(input)
1324        >>> assert result
1325        >>> result = gpg.gen_key('foo')
1326        >>> assert not result
1327
1328        """
1329        args = ["--gen-key"]
1330        result = self.result_map['generate'](self)
1331        f = _make_binary_stream(input, self.encoding)
1332        self._handle_io(args, f, result, binary=True)
1333        f.close()
1334        return result
1335
1336    def gen_key_input(self, **kwargs):
1337        """
1338        Generate --gen-key input per gpg doc/DETAILS
1339        """
1340        parms = {}
1341        for key, val in list(kwargs.items()):
1342            key = key.replace('_','-').title()
1343            if str(val).strip():    # skip empty strings
1344                parms[key] = val
1345        parms.setdefault('Key-Type','RSA')
1346        parms.setdefault('Key-Length',2048)
1347        parms.setdefault('Name-Real', "Autogenerated Key")
1348        logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or
1349                   'unspecified')
1350        hostname = socket.gethostname()
1351        parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
1352                                                  hostname))
1353        out = "Key-Type: %s\n" % parms.pop('Key-Type')
1354        for key, val in list(parms.items()):
1355            out += "%s: %s\n" % (key, val)
1356        out += "%commit\n"
1357        return out
1358
1359        # Key-Type: RSA
1360        # Key-Length: 1024
1361        # Name-Real: ISdlink Server on %s
1362        # Name-Comment: Created by %s
1363        # Name-Email: isdlink@%s
1364        # Expire-Date: 0
1365        # %commit
1366        #
1367        #
1368        # Key-Type: DSA
1369        # Key-Length: 1024
1370        # Subkey-Type: ELG-E
1371        # Subkey-Length: 1024
1372        # Name-Real: Joe Tester
1373        # Name-Comment: with stupid passphrase
1374        # Name-Email: joe@foo.bar
1375        # Expire-Date: 0
1376        # Passphrase: abc
1377        # %pubring foo.pub
1378        # %secring foo.sec
1379        # %commit
1380
1381    #
1382    # ENCRYPTION
1383    #
1384    def encrypt_file(self, file, recipients, sign=None,
1385            always_trust=False, passphrase=None,
1386            armor=True, output=None, symmetric=False, extra_args=None):
1387        "Encrypt the message read from the file-like object 'file'"
1388        args = ['--encrypt']
1389        if symmetric:
1390            # can't be False or None - could be True or a cipher algo value
1391            # such as AES256
1392            args = ['--symmetric']
1393            if symmetric is not True:
1394                args.extend(['--cipher-algo', no_quote(symmetric)])
1395            # else use the default, currently CAST5
1396        else:
1397            if not recipients:
1398                raise ValueError('No recipients specified with asymmetric '
1399                                 'encryption')
1400            if not _is_sequence(recipients):
1401                recipients = (recipients,)
1402            for recipient in recipients:
1403                args.extend(['--recipient', no_quote(recipient)])
1404        if armor:   # create ascii-armored output - False for binary output
1405            args.append('--armor')
1406        if output:  # write the output to a file with the specified name
1407            self.set_output_without_confirmation(args, output)
1408        if sign is True:  # pragma: no cover
1409            args.append('--sign')
1410        elif sign:  # pragma: no cover
1411            args.extend(['--sign', '--default-key', no_quote(sign)])
1412        if always_trust:  # pragma: no cover
1413            args.append('--always-trust')
1414        if extra_args:
1415            args.extend(extra_args)
1416        result = self.result_map['crypt'](self)
1417        self._handle_io(args, file, result, passphrase=passphrase, binary=True)
1418        logger.debug('encrypt result: %r', result.data)
1419        return result
1420
1421    def encrypt(self, data, recipients, **kwargs):
1422        """Encrypt the message contained in the string 'data'
1423
1424        >>> import shutil
1425        >>> if os.path.exists("keys"):
1426        ...     shutil.rmtree("keys", ignore_errors=True)
1427        >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
1428        >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
1429        >>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1')
1430        >>> result = gpg.gen_key(input)
1431        >>> fp1 = result.fingerprint
1432        >>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2')
1433        >>> result = gpg.gen_key(input)
1434        >>> fp2 = result.fingerprint
1435        >>> result = gpg.encrypt("hello",fp2)
1436        >>> message = str(result)
1437        >>> assert message != 'hello'
1438        >>> result = gpg.decrypt(message, passphrase='pp2')
1439        >>> assert result
1440        >>> str(result)
1441        'hello'
1442        >>> result = gpg.encrypt("hello again", fp1)
1443        >>> message = str(result)
1444        >>> result = gpg.decrypt(message, passphrase='bar')
1445        >>> result.status in ('decryption failed', 'bad passphrase')
1446        True
1447        >>> assert not result
1448        >>> result = gpg.decrypt(message, passphrase='pp1')
1449        >>> result.status == 'decryption ok'
1450        True
1451        >>> str(result)
1452        'hello again'
1453        >>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1')
1454        >>> result.status == 'encryption ok'
1455        True
1456        >>> message = str(result)
1457        >>> result = gpg.decrypt(message, passphrase='pp2')
1458        >>> result.status == 'decryption ok'
1459        True
1460        >>> assert result.fingerprint == fp1
1461
1462        """
1463        data = _make_binary_stream(data, self.encoding)
1464        result = self.encrypt_file(data, recipients, **kwargs)
1465        data.close()
1466        return result
1467
1468    def decrypt(self, message, **kwargs):
1469        data = _make_binary_stream(message, self.encoding)
1470        result = self.decrypt_file(data, **kwargs)
1471        data.close()
1472        return result
1473
1474    def decrypt_file(self, file, always_trust=False, passphrase=None,
1475                     output=None, extra_args=None):
1476        args = ["--decrypt"]
1477        if output:  # write the output to a file with the specified name
1478            self.set_output_without_confirmation(args, output)
1479        if always_trust:  # pragma: no cover
1480            args.append("--always-trust")
1481        if extra_args:
1482            args.extend(extra_args)
1483        result = self.result_map['crypt'](self)
1484        self._handle_io(args, file, result, passphrase, binary=True)
1485        logger.debug('decrypt result: %r', result.data)
1486        return result
1487