1# Copyright (C) 2005, 2006, 2007, 2009, 2011, 2012, 2013, 2016 Canonical Ltd 2# Authors: Robert Collins <robert.collins@canonical.com> 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 2 of the License, or 7# (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 18"""GPG signing and checking logic.""" 19 20import os 21 22from breezy.lazy_import import lazy_import 23lazy_import(globals(), """ 24from breezy import ( 25 config, 26 trace, 27 ui, 28 ) 29from breezy.i18n import ( 30 gettext, 31 ngettext, 32 ) 33""") 34 35from . import ( 36 errors, 37 ) 38 39# verification results 40SIGNATURE_VALID = 0 41SIGNATURE_KEY_MISSING = 1 42SIGNATURE_NOT_VALID = 2 43SIGNATURE_NOT_SIGNED = 3 44SIGNATURE_EXPIRED = 4 45 46MODE_NORMAL = 0 47MODE_DETACH = 1 48MODE_CLEAR = 2 49 50 51class GpgNotInstalled(errors.DependencyNotPresent): 52 53 _fmt = ('python-gpg is not installed, it is needed to create or ' 54 'verify signatures. %(error)s') 55 56 def __init__(self, error): 57 errors.DependencyNotPresent.__init__(self, 'gpg', error) 58 59 60class SigningFailed(errors.BzrError): 61 62 _fmt = 'Failed to GPG sign data: "%(error)s"' 63 64 def __init__(self, error): 65 errors.BzrError.__init__(self, error=error) 66 67 68class SignatureVerificationFailed(errors.BzrError): 69 70 _fmt = 'Failed to verify GPG signature data with error "%(error)s"' 71 72 def __init__(self, error): 73 errors.BzrError.__init__(self, error=error) 74 75 76def bulk_verify_signatures(repository, revids, strategy, 77 process_events_callback=None): 78 """Do verifications on a set of revisions 79 80 :param repository: repository object 81 :param revids: list of revision ids to verify 82 :param strategy: GPG strategy to use 83 :param process_events_callback: method to call for GUI frontends that 84 want to keep their UI refreshed 85 86 :return: count dictionary of results of each type, 87 result list for each revision, 88 boolean True if all results are verified successfully 89 """ 90 count = {SIGNATURE_VALID: 0, 91 SIGNATURE_KEY_MISSING: 0, 92 SIGNATURE_NOT_VALID: 0, 93 SIGNATURE_NOT_SIGNED: 0, 94 SIGNATURE_EXPIRED: 0} 95 result = [] 96 all_verifiable = True 97 total = len(revids) 98 with ui.ui_factory.nested_progress_bar() as pb: 99 for i, (rev_id, verification_result, uid) in enumerate( 100 repository.verify_revision_signatures( 101 revids, strategy)): 102 pb.update("verifying signatures", i, total) 103 result.append([rev_id, verification_result, uid]) 104 count[verification_result] += 1 105 if verification_result != SIGNATURE_VALID: 106 all_verifiable = False 107 if process_events_callback is not None: 108 process_events_callback() 109 return (count, result, all_verifiable) 110 111 112class DisabledGPGStrategy(object): 113 """A GPG Strategy that makes everything fail.""" 114 115 @staticmethod 116 def verify_signatures_available(): 117 return True 118 119 def __init__(self, ignored): 120 """Real strategies take a configuration.""" 121 122 def sign(self, content, mode): 123 raise SigningFailed('Signing is disabled.') 124 125 def verify(self, signed_data, signature=None): 126 raise SignatureVerificationFailed('Signature verification is \ 127disabled.') 128 129 def set_acceptable_keys(self, command_line_input): 130 pass 131 132 133class LoopbackGPGStrategy(object): 134 """A GPG Strategy that acts like 'cat' - data is just passed through. 135 Used in tests. 136 """ 137 138 @staticmethod 139 def verify_signatures_available(): 140 return True 141 142 def __init__(self, ignored): 143 """Real strategies take a configuration.""" 144 145 def sign(self, content, mode): 146 return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content 147 + b"-----END PSEUDO-SIGNED CONTENT-----\n") 148 149 def verify(self, signed_data, signature=None): 150 plain_text = signed_data.replace( 151 b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"") 152 plain_text = plain_text.replace( 153 b"-----END PSEUDO-SIGNED CONTENT-----\n", b"") 154 return SIGNATURE_VALID, None, plain_text 155 156 def set_acceptable_keys(self, command_line_input): 157 if command_line_input is not None: 158 patterns = command_line_input.split(",") 159 self.acceptable_keys = [] 160 for pattern in patterns: 161 if pattern == "unknown": 162 pass 163 else: 164 self.acceptable_keys.append(pattern) 165 166 167def _set_gpg_tty(): 168 tty = os.environ.get('TTY') 169 if tty is not None: 170 os.environ['GPG_TTY'] = tty 171 trace.mutter('setting GPG_TTY=%s', tty) 172 else: 173 # This is not quite worthy of a warning, because some people 174 # don't need GPG_TTY to be set. But it is worthy of a big mark 175 # in brz.log, so that people can debug it if it happens to them 176 trace.mutter('** Env var TTY empty, cannot set GPG_TTY.' 177 ' Is TTY exported?') 178 179 180class GPGStrategy(object): 181 """GPG Signing and checking facilities.""" 182 183 acceptable_keys = None 184 185 def __init__(self, config_stack): 186 self._config_stack = config_stack 187 try: 188 import gpg 189 self.context = gpg.Context() 190 self.context.armor = True 191 self.context.signers = self._get_signing_keys() 192 except ImportError: 193 pass # can't use verify() 194 195 def _get_signing_keys(self): 196 import gpg 197 keyname = self._config_stack.get('gpg_signing_key') 198 if keyname == 'default': 199 # Leave things to gpg 200 return [] 201 202 if keyname: 203 try: 204 return [self.context.get_key(keyname)] 205 except gpg.errors.KeyNotFound: 206 pass 207 208 if keyname is None: 209 # not setting gpg_signing_key at all means we should 210 # use the user email address 211 keyname = config.extract_email_address( 212 self._config_stack.get('email')) 213 if keyname == 'default': 214 return [] 215 possible_keys = self.context.keylist(keyname, secret=True) 216 try: 217 return [next(possible_keys)] 218 except StopIteration: 219 return [] 220 221 @staticmethod 222 def verify_signatures_available(): 223 """ 224 check if this strategy can verify signatures 225 226 :return: boolean if this strategy can verify signatures 227 """ 228 try: 229 import gpg # noqa: F401 230 return True 231 except ImportError: 232 return False 233 234 def sign(self, content, mode): 235 try: 236 import gpg 237 except ImportError as error: 238 raise GpgNotInstalled( 239 'Set create_signatures=no to disable creating signatures.') 240 241 if isinstance(content, str): 242 raise errors.BzrBadParameterUnicode('content') 243 244 plain_text = gpg.Data(content) 245 try: 246 output, result = self.context.sign( 247 plain_text, mode={ 248 MODE_DETACH: gpg.constants.sig.mode.DETACH, 249 MODE_CLEAR: gpg.constants.sig.mode.CLEAR, 250 MODE_NORMAL: gpg.constants.sig.mode.NORMAL, 251 }[mode]) 252 except gpg.errors.GPGMEError as error: 253 raise SigningFailed(str(error)) 254 except gpg.errors.InvalidSigners as error: 255 raise SigningFailed(str(error)) 256 257 return output 258 259 def verify(self, signed_data, signature=None): 260 """Check content has a valid signature. 261 262 :param signed_data; Signed data 263 :param signature: optional signature (if detached) 264 265 :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text 266 """ 267 try: 268 import gpg 269 except ImportError as error: 270 raise GpgNotInstalled( 271 'Set check_signatures=ignore to disable verifying signatures.') 272 273 signed_data = gpg.Data(signed_data) 274 if signature: 275 signature = gpg.Data(signature) 276 try: 277 plain_output, result = self.context.verify(signed_data, signature) 278 except gpg.errors.BadSignatures as error: 279 fingerprint = error.result.signatures[0].fpr 280 if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED: 281 expires = self.context.get_key( 282 error.result.signatures[0].fpr).subkeys[0].expires 283 if expires > error.result.signatures[0].timestamp: 284 # The expired key was not expired at time of signing. 285 # test_verify_expired_but_valid() 286 return SIGNATURE_EXPIRED, fingerprint[-8:], None 287 else: 288 # I can't work out how to create a test where the signature 289 # was expired at the time of signing. 290 return SIGNATURE_NOT_VALID, None, None 291 292 # GPG does not know this key. 293 # test_verify_unknown_key() 294 if (error.result.signatures[0].summary & 295 gpg.constants.SIGSUM_KEY_MISSING): 296 return SIGNATURE_KEY_MISSING, fingerprint[-8:], None 297 298 return SIGNATURE_NOT_VALID, None, None 299 except gpg.errors.GPGMEError as error: 300 raise SignatureVerificationFailed(error) 301 302 # No result if input is invalid. 303 # test_verify_invalid() 304 if len(result.signatures) == 0: 305 return SIGNATURE_NOT_VALID, None, plain_output 306 307 # User has specified a list of acceptable keys, check our result is in 308 # it. test_verify_unacceptable_key() 309 fingerprint = result.signatures[0].fpr 310 if self.acceptable_keys is not None: 311 if fingerprint not in self.acceptable_keys: 312 return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output 313 # Yay gpg set the valid bit. 314 # Can't write a test for this one as you can't set a key to be 315 # trusted using gpg. 316 if result.signatures[0].summary & gpg.constants.SIGSUM_VALID: 317 key = self.context.get_key(fingerprint) 318 name = key.uids[0].name 319 if isinstance(name, bytes): 320 name = name.decode('utf-8') 321 email = key.uids[0].email 322 if isinstance(email, bytes): 323 email = email.decode('utf-8') 324 return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output) 325 # Sigsum_red indicates a problem, unfortunatly I have not been able 326 # to write any tests which actually set this. 327 if result.signatures[0].summary & gpg.constants.SIGSUM_RED: 328 return SIGNATURE_NOT_VALID, None, plain_output 329 # Summary isn't set if sig is valid but key is untrusted but if user 330 # has explicity set the key as acceptable we can validate it. 331 if (result.signatures[0].summary == 0 and 332 self.acceptable_keys is not None): 333 if fingerprint in self.acceptable_keys: 334 # test_verify_untrusted_but_accepted() 335 return SIGNATURE_VALID, None, plain_output 336 # test_verify_valid_but_untrusted() 337 if result.signatures[0].summary == 0 and self.acceptable_keys is None: 338 return SIGNATURE_NOT_VALID, None, plain_output 339 # Other error types such as revoked keys should (I think) be caught by 340 # SIGSUM_RED so anything else means something is buggy. 341 raise SignatureVerificationFailed( 342 "Unknown GnuPG key verification result") 343 344 def set_acceptable_keys(self, command_line_input): 345 """Set the acceptable keys for verifying with this GPGStrategy. 346 347 :param command_line_input: comma separated list of patterns from 348 command line 349 :return: nothing 350 """ 351 patterns = None 352 acceptable_keys_config = self._config_stack.get('acceptable_keys') 353 if acceptable_keys_config is not None: 354 patterns = acceptable_keys_config 355 if command_line_input is not None: # command line overrides config 356 patterns = command_line_input.split(',') 357 358 if patterns: 359 self.acceptable_keys = [] 360 for pattern in patterns: 361 result = self.context.keylist(pattern) 362 found_key = False 363 for key in result: 364 found_key = True 365 self.acceptable_keys.append(key.subkeys[0].fpr) 366 trace.mutter("Added acceptable key: " + key.subkeys[0].fpr) 367 if not found_key: 368 trace.note(gettext( 369 "No GnuPG key results for pattern: {0}" 370 ).format(pattern)) 371 372 373def valid_commits_message(count): 374 """returns message for number of commits""" 375 return gettext(u"{0} commits with valid signatures").format( 376 count[SIGNATURE_VALID]) 377 378 379def unknown_key_message(count): 380 """returns message for number of commits""" 381 return ngettext(u"{0} commit with unknown key", 382 u"{0} commits with unknown keys", 383 count[SIGNATURE_KEY_MISSING]).format( 384 count[SIGNATURE_KEY_MISSING]) 385 386 387def commit_not_valid_message(count): 388 """returns message for number of commits""" 389 return ngettext(u"{0} commit not valid", 390 u"{0} commits not valid", 391 count[SIGNATURE_NOT_VALID]).format( 392 count[SIGNATURE_NOT_VALID]) 393 394 395def commit_not_signed_message(count): 396 """returns message for number of commits""" 397 return ngettext(u"{0} commit not signed", 398 u"{0} commits not signed", 399 count[SIGNATURE_NOT_SIGNED]).format( 400 count[SIGNATURE_NOT_SIGNED]) 401 402 403def expired_commit_message(count): 404 """returns message for number of commits""" 405 return ngettext(u"{0} commit with key now expired", 406 u"{0} commits with key now expired", 407 count[SIGNATURE_EXPIRED]).format( 408 count[SIGNATURE_EXPIRED]) 409 410 411def verbose_expired_key_message(result, repo): 412 """takes a verify result and returns list of expired key info""" 413 signers = {} 414 fingerprint_to_authors = {} 415 for rev_id, validity, fingerprint in result: 416 if validity == SIGNATURE_EXPIRED: 417 revision = repo.get_revision(rev_id) 418 authors = ', '.join(revision.get_apparent_authors()) 419 signers.setdefault(fingerprint, 0) 420 signers[fingerprint] += 1 421 fingerprint_to_authors[fingerprint] = authors 422 result = [] 423 for fingerprint, number in signers.items(): 424 result.append( 425 ngettext(u"{0} commit by author {1} with key {2} now expired", 426 u"{0} commits by author {1} with key {2} now expired", 427 number).format( 428 number, fingerprint_to_authors[fingerprint], fingerprint)) 429 return result 430 431 432def verbose_valid_message(result): 433 """takes a verify result and returns list of signed commits strings""" 434 signers = {} 435 for rev_id, validity, uid in result: 436 if validity == SIGNATURE_VALID: 437 signers.setdefault(uid, 0) 438 signers[uid] += 1 439 result = [] 440 for uid, number in signers.items(): 441 result.append(ngettext(u"{0} signed {1} commit", 442 u"{0} signed {1} commits", 443 number).format(uid, number)) 444 return result 445 446 447def verbose_not_valid_message(result, repo): 448 """takes a verify result and returns list of not valid commit info""" 449 signers = {} 450 for rev_id, validity, empty in result: 451 if validity == SIGNATURE_NOT_VALID: 452 revision = repo.get_revision(rev_id) 453 authors = ', '.join(revision.get_apparent_authors()) 454 signers.setdefault(authors, 0) 455 signers[authors] += 1 456 result = [] 457 for authors, number in signers.items(): 458 result.append(ngettext(u"{0} commit by author {1}", 459 u"{0} commits by author {1}", 460 number).format(number, authors)) 461 return result 462 463 464def verbose_not_signed_message(result, repo): 465 """takes a verify result and returns list of not signed commit info""" 466 signers = {} 467 for rev_id, validity, empty in result: 468 if validity == SIGNATURE_NOT_SIGNED: 469 revision = repo.get_revision(rev_id) 470 authors = ', '.join(revision.get_apparent_authors()) 471 signers.setdefault(authors, 0) 472 signers[authors] += 1 473 result = [] 474 for authors, number in signers.items(): 475 result.append(ngettext(u"{0} commit by author {1}", 476 u"{0} commits by author {1}", 477 number).format(number, authors)) 478 return result 479 480 481def verbose_missing_key_message(result): 482 """takes a verify result and returns list of missing key info""" 483 signers = {} 484 for rev_id, validity, fingerprint in result: 485 if validity == SIGNATURE_KEY_MISSING: 486 signers.setdefault(fingerprint, 0) 487 signers[fingerprint] += 1 488 result = [] 489 for fingerprint, number in list(signers.items()): 490 result.append(ngettext(u"Unknown key {0} signed {1} commit", 491 u"Unknown key {0} signed {1} commits", 492 number).format(fingerprint, number)) 493 return result 494