1# Copyright (c) 2008-2014 by Vinay Sajip. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are met: 6# 7# * Redistributions of source code must retain the above copyright notice, 8# this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above copyright notice, 10# this list of conditions and the following disclaimer in the documentation 11# and/or other materials provided with the distribution. 12# * The name(s) of the copyright holder(s) may not be used to endorse or 13# promote products derived from this software without specific prior 14# written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) "AS IS" AND ANY EXPRESS OR 17# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19# EVENT SHALL THE COPYRIGHT HOLDER(S) BE LIABLE FOR ANY DIRECT, INDIRECT, 20# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 24# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27""" A wrapper for the 'gpg' command:: 28 29Portions of this module are derived from A.M. Kuchling's well-designed 30GPG.py, using Richard Jones' updated version 1.3, which can be found 31in the pycrypto CVS repository on Sourceforge: 32 33http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py 34 35This module is *not* forward-compatible with amk's; some of the 36old interface has changed. For instance, since I've added decrypt 37functionality, I elected to initialize with a 'gnupghome' argument 38instead of 'keyring', so that gpg can find both the public and secret 39keyrings. I've also altered some of the returned objects in order for 40the caller to not have to know as much about the internals of the 41result classes. 42 43While the rest of ISconf is released under the GPL, I am releasing 44this single file under the same terms that A.M. Kuchling used for 45pycrypto. 46 47Steve Traugott, stevegt@terraluna.org 48Thu Jun 23 21:27:20 PDT 2005 49 50This version of the module has been modified from Steve Traugott's version 51(see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by 52Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() 53and so does not work on Windows). Renamed to gnupg.py to avoid confusion with 54the previous versions. 55 56Modifications Copyright (C) 2008-2017 Vinay Sajip. All rights reserved. 57 58A unittest harness (test_gnupg.py) has also been added. 59""" 60 61__version__ = "0.4.1" 62__author__ = "Vinay Sajip" 63__date__ = "$07-Jul-2017 15:09:20$" 64 65try: 66 from io import StringIO 67except ImportError: # pragma: no cover 68 from cStringIO import StringIO 69 70import codecs 71import locale 72import logging 73import os 74import re 75import socket 76from subprocess import Popen 77from subprocess import PIPE 78import sys 79import threading 80 81STARTUPINFO = None 82if os.name == 'nt': # pragma: no cover 83 try: 84 from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE 85 except ImportError: 86 STARTUPINFO = None 87 88try: 89 import logging.NullHandler as NullHandler 90except ImportError: 91 class NullHandler(logging.Handler): 92 def handle(self, record): 93 pass 94try: 95 unicode 96 _py3k = False 97 string_types = basestring 98 text_type = unicode 99except NameError: 100 _py3k = True 101 string_types = str 102 text_type = str 103 104logger = logging.getLogger(__name__) 105if not logger.handlers: 106 logger.addHandler(NullHandler()) 107 108# We use the test below because it works for Jython as well as CPython 109if os.path.__name__ == 'ntpath': # pragma: no cover 110 # On Windows, we don't need shell quoting, other than worrying about 111 # paths with spaces in them. 112 def shell_quote(s): 113 return '"%s"' % s 114else: 115 # Section copied from sarge 116 117 # This regex determines which shell input needs quoting 118 # because it may be unsafe 119 UNSAFE = re.compile(r'[^\w%+,./:=@-]') 120 121 def shell_quote(s): 122 """ 123 Quote text so that it is safe for Posix command shells. 124 125 For example, "*.py" would be converted to "'*.py'". If the text is 126 considered safe it is returned unquoted. 127 128 :param s: The value to quote 129 :type s: str (or unicode on 2.x) 130 :return: A safe version of the input, from the point of view of Posix 131 command shells 132 :rtype: The passed-in type 133 """ 134 if not isinstance(s, string_types): # pragma: no cover 135 raise TypeError('Expected string type, got %s' % type(s)) 136 if not s: 137 result = "''" 138 elif not UNSAFE.search(s): 139 result = s 140 else: 141 result = "'%s'" % s.replace("'", r"'\''") 142 return result 143 144 # end of sarge code 145 146# Now that we use shell=False, we shouldn't need to quote arguments. 147# Use no_quote instead of shell_quote to remind us of where quoting 148# was needed. However, note that we still need, on 2.x, to encode any 149# Unicode argument with the file system encoding - see Issue #41 and 150# Python issue #1759845 ("subprocess.call fails with unicode strings in 151# command line"). 152 153# Allows the encoding used to be overridden in special cases by setting 154# this module attribute appropriately. 155fsencoding = sys.getfilesystemencoding() 156 157def no_quote(s): 158 if not _py3k and isinstance(s, text_type): 159 s = s.encode(fsencoding) 160 return s 161 162def _copy_data(instream, outstream): 163 # Copy one stream to another 164 sent = 0 165 if hasattr(sys.stdin, 'encoding'): 166 enc = sys.stdin.encoding 167 else: # pragma: no cover 168 enc = 'ascii' 169 while True: 170 # See issue #39: read can fail when e.g. a text stream is provided 171 # for what is actually a binary file 172 try: 173 data = instream.read(1024) 174 except UnicodeError: 175 logger.warning('Exception occurred while reading', exc_info=1) 176 break 177 if not data: 178 break 179 sent += len(data) 180 # logger.debug("sending chunk (%d): %r", sent, data[:256]) 181 try: 182 outstream.write(data) 183 except UnicodeError: # pragma: no cover 184 outstream.write(data.encode(enc)) 185 except: 186 # Can sometimes get 'broken pipe' errors even when the data has all 187 # been sent 188 logger.exception('Error sending data') 189 break 190 try: 191 outstream.close() 192 except IOError: # pragma: no cover 193 logger.warning('Exception occurred while closing: ignored', exc_info=1) 194 logger.debug("closed output, %d bytes sent", sent) 195 196def _threaded_copy_data(instream, outstream): 197 wr = threading.Thread(target=_copy_data, args=(instream, outstream)) 198 wr.setDaemon(True) 199 logger.debug('data copier: %r, %r, %r', wr, instream, outstream) 200 wr.start() 201 return wr 202 203def _write_passphrase(stream, passphrase, encoding): 204 passphrase = '%s\n' % passphrase 205 passphrase = passphrase.encode(encoding) 206 stream.write(passphrase) 207 logger.debug('Wrote passphrase') 208 209def _is_sequence(instance): 210 return isinstance(instance, (list, tuple, set, frozenset)) 211 212def _make_memory_stream(s): 213 try: 214 from io import BytesIO 215 rv = BytesIO(s) 216 except ImportError: # pragma: no cover 217 rv = StringIO(s) 218 return rv 219 220def _make_binary_stream(s, encoding): 221 if _py3k: 222 if isinstance(s, str): 223 s = s.encode(encoding) 224 else: 225 if type(s) is not str: 226 s = s.encode(encoding) 227 return _make_memory_stream(s) 228 229class Verify(object): 230 "Handle status messages for --verify" 231 232 TRUST_UNDEFINED = 0 233 TRUST_NEVER = 1 234 TRUST_MARGINAL = 2 235 TRUST_FULLY = 3 236 TRUST_ULTIMATE = 4 237 238 TRUST_LEVELS = { 239 "TRUST_UNDEFINED" : TRUST_UNDEFINED, 240 "TRUST_NEVER" : TRUST_NEVER, 241 "TRUST_MARGINAL" : TRUST_MARGINAL, 242 "TRUST_FULLY" : TRUST_FULLY, 243 "TRUST_ULTIMATE" : TRUST_ULTIMATE, 244 } 245 246 def __init__(self, gpg): 247 self.gpg = gpg 248 self.valid = False 249 self.fingerprint = self.creation_date = self.timestamp = None 250 self.signature_id = self.key_id = None 251 self.username = None 252 self.key_status = None 253 self.status = None 254 self.pubkey_fingerprint = None 255 self.expire_timestamp = None 256 self.sig_timestamp = None 257 self.trust_text = None 258 self.trust_level = None 259 260 def __nonzero__(self): 261 return self.valid 262 263 __bool__ = __nonzero__ 264 265 def handle_status(self, key, value): 266 if key in self.TRUST_LEVELS: 267 self.trust_text = key 268 self.trust_level = self.TRUST_LEVELS[key] 269 elif key in ("WARNING", "ERROR"): 270 logger.warning('potential problem: %s: %s', key, value) 271 elif key == "BADSIG": # pragma: no cover 272 self.valid = False 273 self.status = 'signature bad' 274 self.key_id, self.username = value.split(None, 1) 275 elif key == "ERRSIG": # pragma: no cover 276 self.valid = False 277 (self.key_id, 278 algo, hash_algo, 279 cls, 280 self.timestamp) = value.split()[:5] 281 self.status = 'signature error' 282 elif key == "EXPSIG": # pragma: no cover 283 self.valid = False 284 self.status = 'signature expired' 285 self.key_id, self.username = value.split(None, 1) 286 elif key == "GOODSIG": 287 self.valid = True 288 self.status = 'signature good' 289 self.key_id, self.username = value.split(None, 1) 290 elif key == "VALIDSIG": 291 (self.fingerprint, 292 self.creation_date, 293 self.sig_timestamp, 294 self.expire_timestamp) = value.split()[:4] 295 # may be different if signature is made with a subkey 296 self.pubkey_fingerprint = value.split()[-1] 297 self.status = 'signature valid' 298 elif key == "SIG_ID": 299 (self.signature_id, 300 self.creation_date, self.timestamp) = value.split() 301 elif key == "DECRYPTION_FAILED": # pragma: no cover 302 self.valid = False 303 self.key_id = value 304 self.status = 'decryption failed' 305 elif key == "NO_PUBKEY": # pragma: no cover 306 self.valid = False 307 self.key_id = value 308 self.status = 'no public key' 309 elif key in ("EXPKEYSIG", "REVKEYSIG"): # pragma: no cover 310 # signed with expired or revoked key 311 self.valid = False 312 self.key_id = value.split()[0] 313 if key == "EXPKEYSIG": 314 self.key_status = 'signing key has expired' 315 else: 316 self.key_status = 'signing key was revoked' 317 self.status = self.key_status 318 elif key in ("UNEXPECTED", "FAILURE"): # pragma: no cover 319 self.valid = False 320 self.key_id = value 321 if key == "UNEXPECTED": 322 self.status = 'unexpected data' 323 else: 324 # N.B. there might be other reasons 325 if not self.status: 326 self.status = 'incorrect passphrase' 327 elif key in ("DECRYPTION_INFO", "PLAINTEXT", "PLAINTEXT_LENGTH", 328 "NO_SECKEY", "BEGIN_SIGNING"): 329 pass 330 else: # pragma: no cover 331 logger.debug('message ignored: %s, %s', key, value) 332 333class ImportResult(object): 334 "Handle status messages for --import" 335 336 counts = '''count no_user_id imported imported_rsa unchanged 337 n_uids n_subk n_sigs n_revoc sec_read sec_imported 338 sec_dups not_imported'''.split() 339 def __init__(self, gpg): 340 self.gpg = gpg 341 self.imported = [] 342 self.results = [] 343 self.fingerprints = [] 344 for result in self.counts: 345 setattr(self, result, None) 346 347 def __nonzero__(self): 348 if self.not_imported: return False 349 if not self.fingerprints: return False 350 return True 351 352 __bool__ = __nonzero__ 353 354 ok_reason = { 355 '0': 'Not actually changed', 356 '1': 'Entirely new key', 357 '2': 'New user IDs', 358 '4': 'New signatures', 359 '8': 'New subkeys', 360 '16': 'Contains private key', 361 } 362 363 problem_reason = { 364 '0': 'No specific reason given', 365 '1': 'Invalid Certificate', 366 '2': 'Issuer Certificate missing', 367 '3': 'Certificate Chain too long', 368 '4': 'Error storing certificate', 369 } 370 371 def handle_status(self, key, value): 372 if key in ("WARNING", "ERROR"): 373 logger.warning('potential problem: %s: %s', key, value) 374 elif key in ("IMPORTED", "KEY_CONSIDERED"): 375 # this duplicates info we already see in import_ok & import_problem 376 pass 377 elif key == "NODATA": # pragma: no cover 378 self.results.append({'fingerprint': None, 379 'problem': '0', 'text': 'No valid data found'}) 380 elif key == "IMPORT_OK": 381 reason, fingerprint = value.split() 382 reasons = [] 383 for code, text in list(self.ok_reason.items()): 384 if int(reason) | int(code) == int(reason): 385 reasons.append(text) 386 reasontext = '\n'.join(reasons) + "\n" 387 self.results.append({'fingerprint': fingerprint, 388 'ok': reason, 'text': reasontext}) 389 self.fingerprints.append(fingerprint) 390 elif key == "IMPORT_PROBLEM": # pragma: no cover 391 try: 392 reason, fingerprint = value.split() 393 except: 394 reason = value 395 fingerprint = '<unknown>' 396 self.results.append({'fingerprint': fingerprint, 397 'problem': reason, 'text': self.problem_reason[reason]}) 398 elif key == "IMPORT_RES": 399 import_res = value.split() 400 for i, count in enumerate(self.counts): 401 setattr(self, count, int(import_res[i])) 402 elif key == "KEYEXPIRED": # pragma: no cover 403 self.results.append({'fingerprint': None, 404 'problem': '0', 'text': 'Key expired'}) 405 elif key == "SIGEXPIRED": # pragma: no cover 406 self.results.append({'fingerprint': None, 407 'problem': '0', 'text': 'Signature expired'}) 408 elif key == "FAILURE": # pragma: no cover 409 self.results.append({'fingerprint': None, 410 'problem': '0', 'text': 'Other failure'}) 411 else: # pragma: no cover 412 logger.debug('message ignored: %s, %s', key, value) 413 414 def summary(self): 415 l = [] 416 l.append('%d imported' % self.imported) 417 if self.not_imported: # pragma: no cover 418 l.append('%d not imported' % self.not_imported) 419 return ', '.join(l) 420 421ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I) 422BASIC_ESCAPES = { 423 r'\n': '\n', 424 r'\r': '\r', 425 r'\f': '\f', 426 r'\v': '\v', 427 r'\b': '\b', 428 r'\0': '\0', 429} 430 431class SendResult(object): 432 def __init__(self, gpg): 433 self.gpg = gpg 434 435 def handle_status(self, key, value): 436 logger.debug('SendResult: %s: %s', key, value) 437 438class SearchKeys(list): 439 ''' Handle status messages for --search-keys. 440 441 Handle pub and uid (relating the latter to the former). 442 443 Don't care about the rest 444 ''' 445 446 UID_INDEX = 1 447 FIELDS = 'type keyid algo length date expires'.split() 448 449 def __init__(self, gpg): 450 self.gpg = gpg 451 self.curkey = None 452 self.fingerprints = [] 453 self.uids = [] 454 455 def get_fields(self, args): 456 result = {} 457 for i, var in enumerate(self.FIELDS): 458 if i < len(args): 459 result[var] = args[i] 460 else: 461 result[var] = 'unavailable' 462 result['uids'] = [] 463 result['sigs'] = [] 464 return result 465 466 def pub(self, args): 467 self.curkey = curkey = self.get_fields(args) 468 self.append(curkey) 469 470 def uid(self, args): 471 uid = args[self.UID_INDEX] 472 uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) 473 for k, v in BASIC_ESCAPES.items(): 474 uid = uid.replace(k, v) 475 self.curkey['uids'].append(uid) 476 self.uids.append(uid) 477 478 def handle_status(self, key, value): # pragma: no cover 479 pass 480 481class ListKeys(SearchKeys): 482 ''' Handle status messages for --list-keys, --list-sigs. 483 484 Handle pub and uid (relating the latter to the former). 485 486 Don't care about (info from src/DETAILS): 487 488 crt = X.509 certificate 489 crs = X.509 certificate and private key available 490 uat = user attribute (same as user id except for field 10). 491 sig = signature 492 rev = revocation signature 493 pkd = public key data (special field format, see below) 494 grp = reserved for gpgsm 495 rvk = revocation key 496 ''' 497 498 UID_INDEX = 9 499 FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid sig'.split() 500 501 def __init__(self, gpg): 502 super(ListKeys, self).__init__(gpg) 503 self.in_subkey = False 504 self.key_map = {} 505 506 def key(self, args): 507 self.curkey = curkey = self.get_fields(args) 508 if curkey['uid']: 509 curkey['uids'].append(curkey['uid']) 510 del curkey['uid'] 511 curkey['subkeys'] = [] 512 self.append(curkey) 513 self.in_subkey = False 514 515 pub = sec = key 516 517 def fpr(self, args): 518 fp = args[9] 519 if fp in self.key_map: # pragma: no cover 520 raise ValueError('Unexpected fingerprint collision: %s' % fp) 521 if not self.in_subkey: 522 self.curkey['fingerprint'] = fp 523 self.fingerprints.append(fp) 524 self.key_map[fp] = self.curkey 525 else: 526 self.curkey['subkeys'][-1].append(fp) 527 self.key_map[fp] = self.curkey 528 529 def sub(self, args): 530 subkey = [args[4], args[11]] # keyid, type 531 self.curkey['subkeys'].append(subkey) 532 self.in_subkey = True 533 534 def ssb(self, args): 535 subkey = [args[4], None] # keyid, type 536 self.curkey['subkeys'].append(subkey) 537 self.in_subkey = True 538 539 def sig(self, args): 540 # keyid, uid, sigclass 541 self.curkey['sigs'].append((args[4], args[9], args[10])) 542 543class ScanKeys(ListKeys): 544 ''' Handle status messages for --with-fingerprint.''' 545 546 def sub(self, args): 547 # --with-fingerprint --with-colons somehow outputs fewer colons, 548 # use the last value args[-1] instead of args[11] 549 subkey = [args[4], args[-1]] 550 self.curkey['subkeys'].append(subkey) 551 self.in_subkey = True 552 553class TextHandler(object): 554 def _as_text(self): 555 return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) 556 557 if _py3k: 558 __str__ = _as_text 559 else: 560 __unicode__ = _as_text 561 562 def __str__(self): 563 return self.data 564 565 566class Crypt(Verify, TextHandler): 567 "Handle status messages for --encrypt and --decrypt" 568 def __init__(self, gpg): 569 Verify.__init__(self, gpg) 570 self.data = '' 571 self.ok = False 572 self.status = '' 573 574 def __nonzero__(self): 575 if self.ok: return True 576 return False 577 578 __bool__ = __nonzero__ 579 580 def handle_status(self, key, value): 581 if key in ("WARNING", "ERROR"): 582 logger.warning('potential problem: %s: %s', key, value) 583 elif key == "NODATA": 584 self.status = "no data was provided" 585 elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", 586 "MISSING_PASSPHRASE", "DECRYPTION_FAILED", 587 "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"): 588 self.status = key.replace("_", " ").lower() 589 elif key == "NEED_PASSPHRASE_SYM": 590 self.status = 'need symmetric passphrase' 591 elif key == "BEGIN_DECRYPTION": 592 self.status = 'decryption incomplete' 593 elif key == "BEGIN_ENCRYPTION": 594 self.status = 'encryption incomplete' 595 elif key == "DECRYPTION_OKAY": 596 self.status = 'decryption ok' 597 self.ok = True 598 elif key == "END_ENCRYPTION": 599 self.status = 'encryption ok' 600 self.ok = True 601 elif key == "INV_RECP": # pragma: no cover 602 self.status = 'invalid recipient' 603 elif key == "KEYEXPIRED": # pragma: no cover 604 self.status = 'key expired' 605 elif key == "SIG_CREATED": # pragma: no cover 606 self.status = 'sig created' 607 elif key == "SIGEXPIRED": # pragma: no cover 608 self.status = 'sig expired' 609 elif key in ("ENC_TO", "USERID_HINT", "GOODMDC", 610 "END_DECRYPTION", "CARDCTRL", "BADMDC", 611 "SC_OP_FAILURE", "SC_OP_SUCCESS", 612 "PINENTRY_LAUNCHED", "KEY_CONSIDERED"): 613 pass 614 else: 615 Verify.handle_status(self, key, value) 616 617class GenKey(object): 618 "Handle status messages for --gen-key" 619 def __init__(self, gpg): 620 self.gpg = gpg 621 self.type = None 622 self.fingerprint = None 623 624 def __nonzero__(self): 625 if self.fingerprint: return True 626 return False 627 628 __bool__ = __nonzero__ 629 630 def __str__(self): 631 return self.fingerprint or '' 632 633 def handle_status(self, key, value): 634 if key in ("WARNING", "ERROR"): # pragma: no cover 635 logger.warning('potential problem: %s: %s', key, value) 636 elif key == "KEY_CREATED": 637 (self.type,self.fingerprint) = value.split() 638 elif key in ("PROGRESS", "GOOD_PASSPHRASE", "KEY_NOT_CREATED"): 639 pass 640 else: # pragma: no cover 641 logger.debug('message ignored: %s, %s', key, value) 642 643class ExportResult(GenKey): 644 """Handle status messages for --export[-secret-key]. 645 646 For now, just use an existing class to base it on - if needed, we 647 can override handle_status for more specific message handling. 648 """ 649 def handle_status(self, key, value): 650 if key in ("EXPORTED", "EXPORT_RES"): 651 pass 652 else: 653 super(ExportResult, self).handle_status(key, value) 654 655class DeleteResult(object): 656 "Handle status messages for --delete-key and --delete-secret-key" 657 def __init__(self, gpg): 658 self.gpg = gpg 659 self.status = 'ok' 660 661 def __str__(self): 662 return self.status 663 664 problem_reason = { 665 '1': 'No such key', 666 '2': 'Must delete secret key first', 667 '3': 'Ambiguous specification', 668 } 669 670 def handle_status(self, key, value): 671 if key == "DELETE_PROBLEM": # pragma: no cover 672 self.status = self.problem_reason.get(value, 673 "Unknown error: %r" % value) 674 else: # pragma: no cover 675 logger.debug('message ignored: %s, %s', key, value) 676 677 def __nonzero__(self): 678 return self.status == 'ok' 679 680 __bool__ = __nonzero__ 681 682 683class Sign(TextHandler): 684 "Handle status messages for --sign" 685 def __init__(self, gpg): 686 self.gpg = gpg 687 self.type = None 688 self.hash_algo = None 689 self.fingerprint = None 690 self.status = None 691 692 def __nonzero__(self): 693 return self.fingerprint is not None 694 695 __bool__ = __nonzero__ 696 697 def handle_status(self, key, value): 698 if key in ("WARNING", "ERROR", "FAILURE"): # pragma: no cover 699 logger.warning('potential problem: %s: %s', key, value) 700 elif key in ("KEYEXPIRED", "SIGEXPIRED"): # pragma: no cover 701 self.status = 'key expired' 702 elif key == "KEYREVOKED": # pragma: no cover 703 self.status = 'key revoked' 704 elif key == "SIG_CREATED": 705 (self.type, 706 algo, self.hash_algo, cls, 707 self.timestamp, self.fingerprint 708 ) = value.split() 709 self.status = 'signature created' 710 elif key in ("USERID_HINT", "NEED_PASSPHRASE", "GOOD_PASSPHRASE", 711 "BAD_PASSPHRASE", "BEGIN_SIGNING"): 712 pass 713 else: # pragma: no cover 714 logger.debug('message ignored: %s, %s', key, value) 715 716VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I) 717HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I) 718 719class GPG(object): 720 721 decode_errors = 'strict' 722 723 result_map = { 724 'crypt': Crypt, 725 'delete': DeleteResult, 726 'generate': GenKey, 727 'import': ImportResult, 728 'send': SendResult, 729 'list': ListKeys, 730 'scan': ScanKeys, 731 'search': SearchKeys, 732 'sign': Sign, 733 'verify': Verify, 734 'export': ExportResult, 735 } 736 737 "Encapsulate access to the gpg executable" 738 def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, 739 use_agent=False, keyring=None, options=None, 740 secret_keyring=None): 741 """Initialize a GPG process wrapper. Options are: 742 743 gpgbinary -- full pathname for GPG binary. 744 745 gnupghome -- full pathname to where we can find the public and 746 private keyrings. Default is whatever gpg defaults to. 747 keyring -- name of alternative keyring file to use, or list of such 748 keyrings. If specified, the default keyring is not used. 749 options =-- a list of additional options to pass to the GPG binary. 750 secret_keyring -- name of alternative secret keyring file to use, or 751 list of such keyrings. 752 """ 753 self.gpgbinary = gpgbinary 754 self.gnupghome = gnupghome 755 if keyring: 756 # Allow passing a string or another iterable. Make it uniformly 757 # a list of keyring filenames 758 if isinstance(keyring, string_types): 759 keyring = [keyring] 760 self.keyring = keyring 761 if secret_keyring: 762 # Allow passing a string or another iterable. Make it uniformly 763 # a list of keyring filenames 764 if isinstance(secret_keyring, string_types): 765 secret_keyring = [secret_keyring] 766 self.secret_keyring = secret_keyring 767 self.verbose = verbose 768 self.use_agent = use_agent 769 if isinstance(options, str): # pragma: no cover 770 options = [options] 771 self.options = options 772 self.on_data = None # or a callable - will be called with data chunks 773 # Changed in 0.3.7 to use Latin-1 encoding rather than 774 # locale.getpreferredencoding falling back to sys.stdin.encoding 775 # falling back to utf-8, because gpg itself uses latin-1 as the default 776 # encoding. 777 self.encoding = 'latin-1' 778 if gnupghome and not os.path.isdir(self.gnupghome): 779 os.makedirs(self.gnupghome,0x1C0) 780 try: 781 p = self._open_subprocess(["--version"]) 782 except OSError: 783 msg = 'Unable to run gpg - it may not be available.' 784 logger.exception(msg) 785 raise OSError(msg) 786 result = self.result_map['verify'](self) # any result will do for this 787 self._collect_output(p, result, stdin=p.stdin) 788 if p.returncode != 0: # pragma: no cover 789 raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, 790 result.stderr)) 791 m = VERSION_RE.match(result.data) 792 if not m: # pragma: no cover 793 self.version = None 794 else: 795 dot = '.'.encode('ascii') 796 self.version = tuple([int(s) for s in m.groups()[0].split(dot)]) 797 798 def make_args(self, args, passphrase): 799 """ 800 Make a list of command line elements for GPG. The value of ``args`` 801 will be appended. The ``passphrase`` argument needs to be True if 802 a passphrase will be sent to GPG, else False. 803 """ 804 cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty'] 805 cmd.extend(['--debug', 'ipc']) 806 if passphrase and hasattr(self, 'version'): 807 if self.version >= (2, 1): 808 cmd[1:1] = ['--pinentry-mode', 'loopback'] 809 cmd.extend(['--fixed-list-mode', '--batch', '--with-colons']) 810 if self.gnupghome: 811 cmd.extend(['--homedir', no_quote(self.gnupghome)]) 812 if self.keyring: 813 cmd.append('--no-default-keyring') 814 for fn in self.keyring: 815 cmd.extend(['--keyring', no_quote(fn)]) 816 if self.secret_keyring: 817 for fn in self.secret_keyring: 818 cmd.extend(['--secret-keyring', no_quote(fn)]) 819 if passphrase: 820 cmd.extend(['--passphrase-fd', '0']) 821 if self.use_agent: # pragma: no cover 822 cmd.append('--use-agent') 823 if self.options: 824 cmd.extend(self.options) 825 cmd.extend(args) 826 return cmd 827 828 def _open_subprocess(self, args, passphrase=False): 829 # Internal method: open a pipe to a GPG subprocess and return 830 # the file objects for communicating with it. 831 832 # def debug_print(cmd): 833 # result = [] 834 # for c in cmd: 835 # if ' ' not in c: 836 # result.append(c) 837 # else: 838 # if '"' not in c: 839 # result.append('"%s"' % c) 840 # elif "'" not in c: 841 # result.append("'%s'" % c) 842 # else: 843 # result.append(c) # give up 844 # return ' '.join(cmd) 845 from subprocess import list2cmdline as debug_print 846 847 cmd = self.make_args(args, passphrase) 848 if self.verbose: # pragma: no cover 849 print(debug_print(cmd)) 850 if not STARTUPINFO: 851 si = None 852 else: # pragma: no cover 853 si = STARTUPINFO() 854 si.dwFlags = STARTF_USESHOWWINDOW 855 si.wShowWindow = SW_HIDE 856 result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, 857 startupinfo=si) 858 logger.debug("%s: %s", result.pid, debug_print(cmd)) 859 return result 860 861 def _read_response(self, stream, result): 862 # Internal method: reads all the stderr output from GPG, taking notice 863 # only of lines that begin with the magic [GNUPG:] prefix. 864 # 865 # Calls methods on the response object for each valid token found, 866 # with the arg being the remainder of the status line. 867 lines = [] 868 while True: 869 line = stream.readline() 870 if len(line) == 0: 871 break 872 lines.append(line) 873 line = line.rstrip() 874 if self.verbose: # pragma: no cover 875 print(line) 876 logger.debug("%s", line) 877 if line[0:9] == '[GNUPG:] ': 878 # Chop off the prefix 879 line = line[9:] 880 L = line.split(None, 1) 881 keyword = L[0] 882 if len(L) > 1: 883 value = L[1] 884 else: 885 value = "" 886 result.handle_status(keyword, value) 887 result.stderr = ''.join(lines) 888 889 def _read_data(self, stream, result, on_data=None): 890 # Read the contents of the file from GPG's stdout 891 chunks = [] 892 while True: 893 data = stream.read(1024) 894 if len(data) == 0: 895 break 896 logger.debug("chunk: %r" % data[:256]) 897 chunks.append(data) 898 if on_data: 899 on_data(data) 900 if _py3k: 901 # Join using b'' or '', as appropriate 902 result.data = type(data)().join(chunks) 903 else: 904 result.data = ''.join(chunks) 905 906 def _collect_output(self, process, result, writer=None, stdin=None): 907 """ 908 Drain the subprocesses output streams, writing the collected output 909 to the result. If a writer thread (writing to the subprocess) is given, 910 make sure it's joined before returning. If a stdin stream is given, 911 close it before returning. 912 """ 913 stderr = codecs.getreader(self.encoding)(process.stderr) 914 rr = threading.Thread(target=self._read_response, args=(stderr, result)) 915 rr.setDaemon(True) 916 logger.debug('stderr reader: %r', rr) 917 rr.start() 918 919 stdout = process.stdout 920 dr = threading.Thread(target=self._read_data, args=(stdout, result, self.on_data)) 921 dr.setDaemon(True) 922 logger.debug('stdout reader: %r', dr) 923 dr.start() 924 925 dr.join() 926 rr.join() 927 if writer is not None: 928 writer.join() 929 process.wait() 930 if stdin is not None: 931 try: 932 stdin.close() 933 except IOError: # pragma: no cover 934 pass 935 stderr.close() 936 stdout.close() 937 938 def _handle_io(self, args, fileobj, result, passphrase=None, binary=False): 939 "Handle a call to GPG - pass input data, collect output data" 940 # Handle a basic data call - pass data to GPG, handle the output 941 # including status information. Garbage In, Garbage Out :) 942 p = self._open_subprocess(args, passphrase is not None) 943 if not binary: # pragma: no cover 944 stdin = codecs.getwriter(self.encoding)(p.stdin) 945 else: 946 stdin = p.stdin 947 if passphrase: 948 _write_passphrase(stdin, passphrase, self.encoding) 949 writer = _threaded_copy_data(fileobj, stdin) 950 self._collect_output(p, result, writer, stdin) 951 return result 952 953 # 954 # SIGNATURE METHODS 955 # 956 def sign(self, message, **kwargs): 957 """sign message""" 958 f = _make_binary_stream(message, self.encoding) 959 result = self.sign_file(f, **kwargs) 960 f.close() 961 return result 962 963 def set_output_without_confirmation(self, args, output): 964 "If writing to a file which exists, avoid a confirmation message." 965 if os.path.exists(output): 966 # We need to avoid an overwrite confirmation message 967 args.extend(['--yes']) 968 args.extend(['--output', no_quote(output)]) 969 970 def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, 971 detach=False, binary=False, output=None, extra_args=None): 972 """sign file""" 973 logger.debug("sign_file: %s", file) 974 if binary: # pragma: no cover 975 args = ['-s'] 976 else: 977 args = ['-sa'] 978 # You can't specify detach-sign and clearsign together: gpg ignores 979 # the detach-sign in that case. 980 if detach: 981 args.append("--detach-sign") 982 elif clearsign: 983 args.append("--clearsign") 984 if keyid: 985 args.extend(['--default-key', no_quote(keyid)]) 986 if output: # write the output to a file with the specified name 987 self.set_output_without_confirmation(args, output) 988 989 if extra_args: 990 args.extend(extra_args) 991 result = self.result_map['sign'](self) 992 #We could use _handle_io here except for the fact that if the 993 #passphrase is bad, gpg bails and you can't write the message. 994 p = self._open_subprocess(args, passphrase is not None) 995 try: 996 stdin = p.stdin 997 if passphrase: 998 _write_passphrase(stdin, passphrase, self.encoding) 999 writer = _threaded_copy_data(file, stdin) 1000 except IOError: # pragma: no cover 1001 logging.exception("error writing message") 1002 writer = None 1003 self._collect_output(p, result, writer, stdin) 1004 return result 1005 1006 def verify(self, data, **kwargs): 1007 """Verify the signature on the contents of the string 'data' 1008 1009 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1010 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") 1011 >>> input = gpg.gen_key_input(passphrase='foo') 1012 >>> key = gpg.gen_key(input) 1013 >>> assert key 1014 >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') 1015 >>> assert not sig 1016 >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') 1017 >>> assert sig 1018 >>> verify = gpg.verify(sig.data) 1019 >>> assert verify 1020 1021 """ 1022 f = _make_binary_stream(data, self.encoding) 1023 result = self.verify_file(f, **kwargs) 1024 f.close() 1025 return result 1026 1027 def verify_file(self, file, data_filename=None, close_file=True, extra_args=None): 1028 "Verify the signature on the contents of the file-like object 'file'" 1029 logger.debug('verify_file: %r, %r', file, data_filename) 1030 result = self.result_map['verify'](self) 1031 args = ['--verify'] 1032 if extra_args: 1033 args.extend(extra_args) 1034 if data_filename is None: 1035 self._handle_io(args, file, result, binary=True) 1036 else: 1037 logger.debug('Handling detached verification') 1038 import tempfile 1039 fd, fn = tempfile.mkstemp(prefix='pygpg') 1040 s = file.read() 1041 if close_file: 1042 file.close() 1043 logger.debug('Wrote to temp file: %r', s) 1044 os.write(fd, s) 1045 os.close(fd) 1046 args.append(no_quote(fn)) 1047 args.append(no_quote(data_filename)) 1048 try: 1049 p = self._open_subprocess(args) 1050 self._collect_output(p, result, stdin=p.stdin) 1051 finally: 1052 os.unlink(fn) 1053 return result 1054 1055 def verify_data(self, sig_filename, data, extra_args=None): 1056 "Verify the signature in sig_filename against data in memory" 1057 logger.debug('verify_data: %r, %r ...', sig_filename, data[:16]) 1058 result = self.result_map['verify'](self) 1059 args = ['--verify'] 1060 if extra_args: 1061 args.extend(extra_args) 1062 args.extend([no_quote(sig_filename), '-']) 1063 stream = _make_memory_stream(data) 1064 self._handle_io(args, stream, result, binary=True) 1065 return result 1066 1067 # 1068 # KEY MANAGEMENT 1069 # 1070 1071 def import_keys(self, key_data): 1072 """ 1073 Import the key_data into our keyring. 1074 """ 1075 result = self.result_map['import'](self) 1076 logger.debug('import_keys: %r', key_data[:256]) 1077 data = _make_binary_stream(key_data, self.encoding) 1078 self._handle_io(['--import'], data, result, binary=True) 1079 logger.debug('import_keys result: %r', result.__dict__) 1080 data.close() 1081 return result 1082 1083 def recv_keys(self, keyserver, *keyids): 1084 """Import a key from a keyserver 1085 1086 >>> import shutil 1087 >>> shutil.rmtree("keys", ignore_errors=True) 1088 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1089 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") 1090 >>> os.chmod('keys', 0x1C0) 1091 >>> result = gpg.recv_keys('pgp.mit.edu', '92905378') 1092 >>> assert result 1093 1094 """ 1095 result = self.result_map['import'](self) 1096 logger.debug('recv_keys: %r', keyids) 1097 data = _make_binary_stream("", self.encoding) 1098 #data = "" 1099 args = ['--keyserver', no_quote(keyserver), '--recv-keys'] 1100 args.extend([no_quote(k) for k in keyids]) 1101 self._handle_io(args, data, result, binary=True) 1102 logger.debug('recv_keys result: %r', result.__dict__) 1103 data.close() 1104 return result 1105 1106 def send_keys(self, keyserver, *keyids): 1107 """Send a key to a keyserver. 1108 1109 Note: it's not practical to test this function without sending 1110 arbitrary data to live keyservers. 1111 """ 1112 result = self.result_map['send'](self) 1113 logger.debug('send_keys: %r', keyids) 1114 data = _make_binary_stream('', self.encoding) 1115 #data = "" 1116 args = ['--keyserver', no_quote(keyserver), '--send-keys'] 1117 args.extend([no_quote(k) for k in keyids]) 1118 self._handle_io(args, data, result, binary=True) 1119 logger.debug('send_keys result: %r', result.__dict__) 1120 data.close() 1121 return result 1122 1123 def delete_keys(self, fingerprints, secret=False, passphrase=None): 1124 which='key' 1125 if secret: # pragma: no cover 1126 if self.version >= (2, 1) and passphrase is None: 1127 raise ValueError('For GnuPG >= 2.1, deleting secret keys ' 1128 'needs a passphrase to be provided') 1129 which='secret-key' 1130 if _is_sequence(fingerprints): # pragma: no cover 1131 fingerprints = [no_quote(s) for s in fingerprints] 1132 else: 1133 fingerprints = [no_quote(fingerprints)] 1134 args = ['--delete-%s' % which] 1135 args.extend(fingerprints) 1136 result = self.result_map['delete'](self) 1137 if not secret or self.version < (2, 1): 1138 p = self._open_subprocess(args) 1139 self._collect_output(p, result, stdin=p.stdin) 1140 else: 1141 # Need to send in a passphrase. 1142 f = _make_binary_stream('', self.encoding) 1143 try: 1144 self._handle_io(args, f, result, passphrase=passphrase, 1145 binary=True) 1146 finally: 1147 f.close() 1148 return result 1149 1150 def export_keys(self, keyids, secret=False, armor=True, minimal=False, 1151 passphrase=None): 1152 """ 1153 Export the indicated keys. A 'keyid' is anything gpg accepts. 1154 1155 Since GnuPG 2.1, you can't export secret keys without providing a 1156 passphrase. 1157 """ 1158 1159 which='' 1160 if secret: 1161 which='-secret-key' 1162 if self.version >= (2, 1) and passphrase is None: 1163 raise ValueError('For GnuPG >= 2.1, exporting secret keys ' 1164 'needs a passphrase to be provided') 1165 if _is_sequence(keyids): 1166 keyids = [no_quote(k) for k in keyids] 1167 else: 1168 keyids = [no_quote(keyids)] 1169 args = ['--export%s' % which] 1170 if armor: 1171 args.append('--armor') 1172 if minimal: # pragma: no cover 1173 args.extend(['--export-options','export-minimal']) 1174 args.extend(keyids) 1175 # gpg --export produces no status-fd output; stdout will be 1176 # empty in case of failure 1177 #stdout, stderr = p.communicate() 1178 result = self.result_map['export'](self) 1179 if not secret or self.version < (2, 1): 1180 p = self._open_subprocess(args) 1181 self._collect_output(p, result, stdin=p.stdin) 1182 else: 1183 # Need to send in a passphrase. 1184 f = _make_binary_stream('', self.encoding) 1185 try: 1186 self._handle_io(args, f, result, passphrase=passphrase, 1187 binary=True) 1188 finally: 1189 f.close() 1190 logger.debug('export_keys result: %r', result.data) 1191 # Issue #49: Return bytes if armor not specified, else text 1192 result = result.data 1193 if armor: 1194 result = result.decode(self.encoding, self.decode_errors) 1195 return result 1196 1197 def _get_list_output(self, p, kind): 1198 # Get the response information 1199 result = self.result_map[kind](self) 1200 self._collect_output(p, result, stdin=p.stdin) 1201 lines = result.data.decode(self.encoding, 1202 self.decode_errors).splitlines() 1203 valid_keywords = 'pub uid sec fpr sub ssb sig'.split() 1204 for line in lines: 1205 if self.verbose: # pragma: no cover 1206 print(line) 1207 logger.debug("line: %r", line.rstrip()) 1208 if not line: # pragma: no cover 1209 break 1210 L = line.strip().split(':') 1211 if not L: # pragma: no cover 1212 continue 1213 keyword = L[0] 1214 if keyword in valid_keywords: 1215 getattr(result, keyword)(L) 1216 return result 1217 1218 def list_keys(self, secret=False, keys=None, sigs=False): 1219 """ list the keys currently in the keyring 1220 1221 >>> import shutil 1222 >>> shutil.rmtree("keys", ignore_errors=True) 1223 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1224 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") 1225 >>> input = gpg.gen_key_input(passphrase='foo') 1226 >>> result = gpg.gen_key(input) 1227 >>> fp1 = result.fingerprint 1228 >>> result = gpg.gen_key(input) 1229 >>> fp2 = result.fingerprint 1230 >>> pubkeys = gpg.list_keys() 1231 >>> assert fp1 in pubkeys.fingerprints 1232 >>> assert fp2 in pubkeys.fingerprints 1233 1234 """ 1235 1236 if sigs: 1237 which = 'sigs' 1238 else: which='keys' 1239 if secret: 1240 which='secret-keys' 1241 args = ['--list-%s' % which, 1242 '--fingerprint', '--fingerprint'] # get subkey FPs, too 1243 if keys: 1244 if isinstance(keys, string_types): 1245 keys = [keys] 1246 args.extend(keys) 1247 p = self._open_subprocess(args) 1248 return self._get_list_output(p, 'list') 1249 1250 def scan_keys(self, filename): 1251 """ 1252 List details of an ascii armored or binary key file 1253 without first importing it to the local keyring. 1254 1255 The function achieves this on modern GnuPG by running: 1256 1257 $ gpg --dry-run --import-options import-show --import 1258 1259 On older versions, it does the *much* riskier: 1260 1261 $ gpg --with-fingerprint --with-colons filename 1262 """ 1263 if self.version >= (2, 1): 1264 args = ['--dry-run', '--import-options', 'import-show', '--import'] 1265 else: 1266 logger.warning('Trying to list packets, but if the file is not a ' 1267 'keyring, might accidentally decrypt') 1268 args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode'] 1269 args.append(no_quote(filename)) 1270 p = self._open_subprocess(args) 1271 return self._get_list_output(p, 'scan') 1272 1273 def search_keys(self, query, keyserver='pgp.mit.edu'): 1274 """ search keyserver by query (using --search-keys option) 1275 1276 >>> import shutil 1277 >>> shutil.rmtree('keys', ignore_errors=True) 1278 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1279 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys') 1280 >>> os.chmod('keys', 0x1C0) 1281 >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>') 1282 >>> assert result, 'Failed using default keyserver' 1283 >>> #keyserver = 'keyserver.ubuntu.com' 1284 >>> #result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver) 1285 >>> #assert result, 'Failed using keyserver.ubuntu.com' 1286 1287 """ 1288 query = query.strip() 1289 if HEX_DIGITS_RE.match(query): 1290 query = '0x' + query 1291 args = ['--fingerprint', 1292 '--keyserver', no_quote(keyserver), '--search-keys', 1293 no_quote(query)] 1294 p = self._open_subprocess(args) 1295 1296 # Get the response information 1297 result = self.result_map['search'](self) 1298 self._collect_output(p, result, stdin=p.stdin) 1299 lines = result.data.decode(self.encoding, 1300 self.decode_errors).splitlines() 1301 valid_keywords = ['pub', 'uid'] 1302 for line in lines: 1303 if self.verbose: # pragma: no cover 1304 print(line) 1305 logger.debug('line: %r', line.rstrip()) 1306 if not line: # sometimes get blank lines on Windows 1307 continue 1308 L = line.strip().split(':') 1309 if not L: # pragma: no cover 1310 continue 1311 keyword = L[0] 1312 if keyword in valid_keywords: 1313 getattr(result, keyword)(L) 1314 return result 1315 1316 def gen_key(self, input): 1317 """Generate a key; you might use gen_key_input() to create the 1318 control input. 1319 1320 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1321 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") 1322 >>> input = gpg.gen_key_input(passphrase='foo') 1323 >>> result = gpg.gen_key(input) 1324 >>> assert result 1325 >>> result = gpg.gen_key('foo') 1326 >>> assert not result 1327 1328 """ 1329 args = ["--gen-key"] 1330 result = self.result_map['generate'](self) 1331 f = _make_binary_stream(input, self.encoding) 1332 self._handle_io(args, f, result, binary=True) 1333 f.close() 1334 return result 1335 1336 def gen_key_input(self, **kwargs): 1337 """ 1338 Generate --gen-key input per gpg doc/DETAILS 1339 """ 1340 parms = {} 1341 for key, val in list(kwargs.items()): 1342 key = key.replace('_','-').title() 1343 if str(val).strip(): # skip empty strings 1344 parms[key] = val 1345 parms.setdefault('Key-Type','RSA') 1346 parms.setdefault('Key-Length',2048) 1347 parms.setdefault('Name-Real', "Autogenerated Key") 1348 logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or 1349 'unspecified') 1350 hostname = socket.gethostname() 1351 parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), 1352 hostname)) 1353 out = "Key-Type: %s\n" % parms.pop('Key-Type') 1354 for key, val in list(parms.items()): 1355 out += "%s: %s\n" % (key, val) 1356 out += "%commit\n" 1357 return out 1358 1359 # Key-Type: RSA 1360 # Key-Length: 1024 1361 # Name-Real: ISdlink Server on %s 1362 # Name-Comment: Created by %s 1363 # Name-Email: isdlink@%s 1364 # Expire-Date: 0 1365 # %commit 1366 # 1367 # 1368 # Key-Type: DSA 1369 # Key-Length: 1024 1370 # Subkey-Type: ELG-E 1371 # Subkey-Length: 1024 1372 # Name-Real: Joe Tester 1373 # Name-Comment: with stupid passphrase 1374 # Name-Email: joe@foo.bar 1375 # Expire-Date: 0 1376 # Passphrase: abc 1377 # %pubring foo.pub 1378 # %secring foo.sec 1379 # %commit 1380 1381 # 1382 # ENCRYPTION 1383 # 1384 def encrypt_file(self, file, recipients, sign=None, 1385 always_trust=False, passphrase=None, 1386 armor=True, output=None, symmetric=False, extra_args=None): 1387 "Encrypt the message read from the file-like object 'file'" 1388 args = ['--encrypt'] 1389 if symmetric: 1390 # can't be False or None - could be True or a cipher algo value 1391 # such as AES256 1392 args = ['--symmetric'] 1393 if symmetric is not True: 1394 args.extend(['--cipher-algo', no_quote(symmetric)]) 1395 # else use the default, currently CAST5 1396 else: 1397 if not recipients: 1398 raise ValueError('No recipients specified with asymmetric ' 1399 'encryption') 1400 if not _is_sequence(recipients): 1401 recipients = (recipients,) 1402 for recipient in recipients: 1403 args.extend(['--recipient', no_quote(recipient)]) 1404 if armor: # create ascii-armored output - False for binary output 1405 args.append('--armor') 1406 if output: # write the output to a file with the specified name 1407 self.set_output_without_confirmation(args, output) 1408 if sign is True: # pragma: no cover 1409 args.append('--sign') 1410 elif sign: # pragma: no cover 1411 args.extend(['--sign', '--default-key', no_quote(sign)]) 1412 if always_trust: # pragma: no cover 1413 args.append('--always-trust') 1414 if extra_args: 1415 args.extend(extra_args) 1416 result = self.result_map['crypt'](self) 1417 self._handle_io(args, file, result, passphrase=passphrase, binary=True) 1418 logger.debug('encrypt result: %r', result.data) 1419 return result 1420 1421 def encrypt(self, data, recipients, **kwargs): 1422 """Encrypt the message contained in the string 'data' 1423 1424 >>> import shutil 1425 >>> if os.path.exists("keys"): 1426 ... shutil.rmtree("keys", ignore_errors=True) 1427 >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') 1428 >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") 1429 >>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1') 1430 >>> result = gpg.gen_key(input) 1431 >>> fp1 = result.fingerprint 1432 >>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2') 1433 >>> result = gpg.gen_key(input) 1434 >>> fp2 = result.fingerprint 1435 >>> result = gpg.encrypt("hello",fp2) 1436 >>> message = str(result) 1437 >>> assert message != 'hello' 1438 >>> result = gpg.decrypt(message, passphrase='pp2') 1439 >>> assert result 1440 >>> str(result) 1441 'hello' 1442 >>> result = gpg.encrypt("hello again", fp1) 1443 >>> message = str(result) 1444 >>> result = gpg.decrypt(message, passphrase='bar') 1445 >>> result.status in ('decryption failed', 'bad passphrase') 1446 True 1447 >>> assert not result 1448 >>> result = gpg.decrypt(message, passphrase='pp1') 1449 >>> result.status == 'decryption ok' 1450 True 1451 >>> str(result) 1452 'hello again' 1453 >>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1') 1454 >>> result.status == 'encryption ok' 1455 True 1456 >>> message = str(result) 1457 >>> result = gpg.decrypt(message, passphrase='pp2') 1458 >>> result.status == 'decryption ok' 1459 True 1460 >>> assert result.fingerprint == fp1 1461 1462 """ 1463 data = _make_binary_stream(data, self.encoding) 1464 result = self.encrypt_file(data, recipients, **kwargs) 1465 data.close() 1466 return result 1467 1468 def decrypt(self, message, **kwargs): 1469 data = _make_binary_stream(message, self.encoding) 1470 result = self.decrypt_file(data, **kwargs) 1471 data.close() 1472 return result 1473 1474 def decrypt_file(self, file, always_trust=False, passphrase=None, 1475 output=None, extra_args=None): 1476 args = ["--decrypt"] 1477 if output: # write the output to a file with the specified name 1478 self.set_output_without_confirmation(args, output) 1479 if always_trust: # pragma: no cover 1480 args.append("--always-trust") 1481 if extra_args: 1482 args.extend(extra_args) 1483 result = self.result_map['crypt'](self) 1484 self._handle_io(args, file, result, passphrase, binary=True) 1485 logger.debug('decrypt result: %r', result.data) 1486 return result 1487