1# Copyright (C) 2006-2012, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Tests for the formatting and construction of errors.""" 18 19import inspect 20import re 21import socket 22import sys 23 24from .. import ( 25 controldir, 26 errors, 27 osutils, 28 tests, 29 urlutils, 30 ) 31 32 33class TestErrors(tests.TestCase): 34 35 def test_no_arg_named_message(self): 36 """Ensure the __init__ and _fmt in errors do not have "message" arg. 37 38 This test fails if __init__ or _fmt in errors has an argument 39 named "message" as this can cause errors in some Python versions. 40 Python 2.5 uses a slot for StandardError.message. 41 See bug #603461 42 """ 43 fmt_pattern = re.compile("%\\(message\\)[sir]") 44 for c in errors.BzrError.__subclasses__(): 45 init = getattr(c, '__init__', None) 46 fmt = getattr(c, '_fmt', None) 47 if init: 48 args = inspect.getfullargspec(init)[0] 49 self.assertFalse('message' in args, 50 ('Argument name "message" not allowed for ' 51 '"errors.%s.__init__"' % c.__name__)) 52 if fmt and fmt_pattern.search(fmt): 53 self.assertFalse(True, ('"message" not allowed in ' 54 '"errors.%s._fmt"' % c.__name__)) 55 56 def test_bad_filename_encoding(self): 57 error = errors.BadFilenameEncoding(b'bad/filen\xe5me', 'UTF-8') 58 self.assertContainsRe( 59 str(error), 60 "^Filename b?'bad/filen\\\\xe5me' is not valid in your current" 61 " filesystem encoding UTF-8$") 62 63 def test_duplicate_help_prefix(self): 64 error = errors.DuplicateHelpPrefix('foo') 65 self.assertEqualDiff('The prefix foo is in the help search path twice.', 66 str(error)) 67 68 def test_ghost_revisions_have_no_revno(self): 69 error = errors.GhostRevisionsHaveNoRevno('target', 'ghost_rev') 70 self.assertEqualDiff("Could not determine revno for {target} because" 71 " its ancestry shows a ghost at {ghost_rev}", 72 str(error)) 73 74 def test_incompatibleVersion(self): 75 error = errors.IncompatibleVersion("module", [(4, 5, 6), (7, 8, 9)], 76 (1, 2, 3)) 77 self.assertEqualDiff( 78 'API module is not compatible; one of versions ' 79 '[(4, 5, 6), (7, 8, 9)] is required, but current version is ' 80 '(1, 2, 3).', 81 str(error)) 82 83 def test_inconsistent_delta(self): 84 error = errors.InconsistentDelta('path', 'file-id', 'reason for foo') 85 self.assertEqualDiff( 86 "An inconsistent delta was supplied involving 'path', 'file-id'\n" 87 "reason: reason for foo", 88 str(error)) 89 90 def test_inconsistent_delta_delta(self): 91 error = errors.InconsistentDeltaDelta([], 'reason') 92 self.assertEqualDiff( 93 "An inconsistent delta was supplied: []\nreason: reason", 94 str(error)) 95 96 def test_in_process_transport(self): 97 error = errors.InProcessTransport('fpp') 98 self.assertEqualDiff( 99 "The transport 'fpp' is only accessible within this process.", 100 str(error)) 101 102 def test_invalid_http_range(self): 103 error = errors.InvalidHttpRange('path', 104 'Content-Range: potatoes 0-00/o0oo0', 105 'bad range') 106 self.assertEqual("Invalid http range" 107 " 'Content-Range: potatoes 0-00/o0oo0'" 108 " for path: bad range", 109 str(error)) 110 111 def test_invalid_range(self): 112 error = errors.InvalidRange('path', 12, 'bad range') 113 self.assertEqual("Invalid range access in path at 12: bad range", 114 str(error)) 115 116 def test_jail_break(self): 117 error = errors.JailBreak("some url") 118 self.assertEqualDiff("An attempt to access a url outside the server" 119 " jail was made: 'some url'.", 120 str(error)) 121 122 def test_lock_active(self): 123 error = errors.LockActive("lock description") 124 self.assertEqualDiff("The lock for 'lock description' is in use and " 125 "cannot be broken.", 126 str(error)) 127 128 def test_lock_corrupt(self): 129 error = errors.LockCorrupt("corruption info") 130 self.assertEqualDiff("Lock is apparently held, but corrupted: " 131 "corruption info\n" 132 "Use 'brz break-lock' to clear it", 133 str(error)) 134 135 def test_medium_not_connected(self): 136 error = errors.MediumNotConnected("a medium") 137 self.assertEqualDiff( 138 "The medium 'a medium' is not connected.", str(error)) 139 140 def test_no_smart_medium(self): 141 error = errors.NoSmartMedium("a transport") 142 self.assertEqualDiff("The transport 'a transport' cannot tunnel the " 143 "smart protocol.", 144 str(error)) 145 146 def test_no_such_id(self): 147 error = errors.NoSuchId("atree", "anid") 148 self.assertEqualDiff("The file id \"anid\" is not present in the tree " 149 "atree.", 150 str(error)) 151 152 def test_no_such_revision_in_tree(self): 153 error = errors.NoSuchRevisionInTree("atree", "anid") 154 self.assertEqualDiff("The revision id {anid} is not present in the" 155 " tree atree.", str(error)) 156 self.assertIsInstance(error, errors.NoSuchRevision) 157 158 def test_not_stacked(self): 159 error = errors.NotStacked('a branch') 160 self.assertEqualDiff("The branch 'a branch' is not stacked.", 161 str(error)) 162 163 def test_not_write_locked(self): 164 error = errors.NotWriteLocked('a thing to repr') 165 self.assertEqualDiff("'a thing to repr' is not write locked but needs " 166 "to be.", 167 str(error)) 168 169 def test_lock_failed(self): 170 error = errors.LockFailed( 171 'http://canonical.com/', 'readonly transport') 172 self.assertEqualDiff("Cannot lock http://canonical.com/: readonly transport", 173 str(error)) 174 self.assertFalse(error.internal_error) 175 176 def test_too_many_concurrent_requests(self): 177 error = errors.TooManyConcurrentRequests("a medium") 178 self.assertEqualDiff("The medium 'a medium' has reached its concurrent " 179 "request limit. Be sure to finish_writing and finish_reading on " 180 "the currently open request.", 181 str(error)) 182 183 def test_unstackable_location(self): 184 error = errors.UnstackableLocationError('foo', 'bar') 185 self.assertEqualDiff("The branch 'foo' cannot be stacked on 'bar'.", 186 str(error)) 187 188 def test_unstackable_repository_format(self): 189 format = u'foo' 190 url = "/foo" 191 error = errors.UnstackableRepositoryFormat(format, url) 192 self.assertEqualDiff( 193 "The repository '/foo'(foo) is not a stackable format. " 194 "You will need to upgrade the repository to permit branch stacking.", 195 str(error)) 196 197 def test_up_to_date(self): 198 error = errors.UpToDateFormat("someformat") 199 self.assertEqualDiff( 200 "The branch format someformat is already at the most " 201 "recent format.", str(error)) 202 203 def test_read_error(self): 204 # a unicode path to check that %r is being used. 205 path = u'a path' 206 error = errors.ReadError(path) 207 self.assertContainsRe(str(error), "^Error reading from u?'a path'.$") 208 209 def test_bzrerror_from_literal_string(self): 210 # Some code constructs BzrError from a literal string, in which case 211 # no further formatting is done. (I'm not sure raising the base class 212 # is a great idea, but if the exception is not intended to be caught 213 # perhaps no more is needed.) 214 try: 215 raise errors.BzrError('this is my errors; %d is not expanded') 216 except errors.BzrError as e: 217 self.assertEqual('this is my errors; %d is not expanded', str(e)) 218 219 def test_reading_completed(self): 220 error = errors.ReadingCompleted("a request") 221 self.assertEqualDiff("The MediumRequest 'a request' has already had " 222 "finish_reading called upon it - the request has been completed and" 223 " no more data may be read.", 224 str(error)) 225 226 def test_writing_completed(self): 227 error = errors.WritingCompleted("a request") 228 self.assertEqualDiff("The MediumRequest 'a request' has already had " 229 "finish_writing called upon it - accept bytes may not be called " 230 "anymore.", 231 str(error)) 232 233 def test_writing_not_completed(self): 234 error = errors.WritingNotComplete("a request") 235 self.assertEqualDiff("The MediumRequest 'a request' has not has " 236 "finish_writing called upon it - until the write phase is complete" 237 " no data may be read.", 238 str(error)) 239 240 def test_transport_not_possible(self): 241 error = errors.TransportNotPossible('readonly', 'original error') 242 self.assertEqualDiff('Transport operation not possible:' 243 ' readonly original error', str(error)) 244 245 def assertSocketConnectionError(self, expected, *args, **kwargs): 246 """Check the formatting of a SocketConnectionError exception""" 247 e = errors.SocketConnectionError(*args, **kwargs) 248 self.assertEqual(expected, str(e)) 249 250 def test_socket_connection_error(self): 251 """Test the formatting of SocketConnectionError""" 252 253 # There should be a default msg about failing to connect 254 # we only require a host name. 255 self.assertSocketConnectionError( 256 'Failed to connect to ahost', 257 'ahost') 258 259 # If port is None, we don't put :None 260 self.assertSocketConnectionError( 261 'Failed to connect to ahost', 262 'ahost', port=None) 263 # But if port is supplied we include it 264 self.assertSocketConnectionError( 265 'Failed to connect to ahost:22', 266 'ahost', port=22) 267 268 # We can also supply extra information about the error 269 # with or without a port 270 self.assertSocketConnectionError( 271 'Failed to connect to ahost:22; bogus error', 272 'ahost', port=22, orig_error='bogus error') 273 self.assertSocketConnectionError( 274 'Failed to connect to ahost; bogus error', 275 'ahost', orig_error='bogus error') 276 # An exception object can be passed rather than a string 277 orig_error = ValueError('bad value') 278 self.assertSocketConnectionError( 279 'Failed to connect to ahost; %s' % (str(orig_error),), 280 host='ahost', orig_error=orig_error) 281 282 # And we can supply a custom failure message 283 self.assertSocketConnectionError( 284 'Unable to connect to ssh host ahost:444; my_error', 285 host='ahost', port=444, msg='Unable to connect to ssh host', 286 orig_error='my_error') 287 288 def test_target_not_branch(self): 289 """Test the formatting of TargetNotBranch.""" 290 error = errors.TargetNotBranch('foo') 291 self.assertEqual( 292 "Your branch does not have all of the revisions required in " 293 "order to merge this merge directive and the target " 294 "location specified in the merge directive is not a branch: " 295 "foo.", str(error)) 296 297 def test_unexpected_smart_server_response(self): 298 e = errors.UnexpectedSmartServerResponse(('not yes',)) 299 self.assertEqual( 300 "Could not understand response from smart server: ('not yes',)", 301 str(e)) 302 303 def test_unknown_container_format(self): 304 """Test the formatting of UnknownContainerFormatError.""" 305 e = errors.UnknownContainerFormatError('bad format string') 306 self.assertEqual( 307 "Unrecognised container format: 'bad format string'", 308 str(e)) 309 310 def test_unexpected_end_of_container(self): 311 """Test the formatting of UnexpectedEndOfContainerError.""" 312 e = errors.UnexpectedEndOfContainerError() 313 self.assertEqual( 314 "Unexpected end of container stream", str(e)) 315 316 def test_unknown_record_type(self): 317 """Test the formatting of UnknownRecordTypeError.""" 318 e = errors.UnknownRecordTypeError("X") 319 self.assertEqual( 320 "Unknown record type: 'X'", 321 str(e)) 322 323 def test_invalid_record(self): 324 """Test the formatting of InvalidRecordError.""" 325 e = errors.InvalidRecordError("xxx") 326 self.assertEqual( 327 "Invalid record: xxx", 328 str(e)) 329 330 def test_container_has_excess_data(self): 331 """Test the formatting of ContainerHasExcessDataError.""" 332 e = errors.ContainerHasExcessDataError("excess bytes") 333 self.assertEqual( 334 "Container has data after end marker: 'excess bytes'", 335 str(e)) 336 337 def test_duplicate_record_name_error(self): 338 """Test the formatting of DuplicateRecordNameError.""" 339 e = errors.DuplicateRecordNameError(b"n\xc3\xa5me") 340 self.assertEqual( 341 u"Container has multiple records with the same name: n\xe5me", 342 str(e)) 343 344 def test_check_error(self): 345 e = errors.BzrCheckError('example check failure') 346 self.assertEqual( 347 "Internal check failed: example check failure", 348 str(e)) 349 self.assertTrue(e.internal_error) 350 351 def test_repository_data_stream_error(self): 352 """Test the formatting of RepositoryDataStreamError.""" 353 e = errors.RepositoryDataStreamError(u"my reason") 354 self.assertEqual( 355 "Corrupt or incompatible data stream: my reason", str(e)) 356 357 def test_immortal_pending_deletion_message(self): 358 err = errors.ImmortalPendingDeletion('foo') 359 self.assertEqual( 360 "Unable to delete transform temporary directory foo. " 361 "Please examine foo to see if it contains any files " 362 "you wish to keep, and delete it when you are done.", 363 str(err)) 364 365 def test_invalid_url_join(self): 366 """Test the formatting of InvalidURLJoin.""" 367 e = urlutils.InvalidURLJoin('Reason', 'base path', ('args',)) 368 self.assertEqual( 369 "Invalid URL join request: Reason: 'base path' + ('args',)", 370 str(e)) 371 372 def test_unable_encode_path(self): 373 err = errors.UnableEncodePath('foo', 'executable') 374 self.assertEqual("Unable to encode executable path 'foo' in " 375 "user encoding " + osutils.get_user_encoding(), 376 str(err)) 377 378 def test_unknown_format(self): 379 err = errors.UnknownFormatError('bar', kind='foo') 380 self.assertEqual("Unknown foo format: 'bar'", str(err)) 381 382 def test_tip_change_rejected(self): 383 err = errors.TipChangeRejected(u'Unicode message\N{INTERROBANG}') 384 self.assertEqual( 385 u'Tip change rejected: Unicode message\N{INTERROBANG}', 386 str(err)) 387 388 def test_error_from_smart_server(self): 389 error_tuple = ('error', 'tuple') 390 err = errors.ErrorFromSmartServer(error_tuple) 391 self.assertEqual( 392 "Error received from smart server: ('error', 'tuple')", str(err)) 393 394 def test_untranslateable_error_from_smart_server(self): 395 error_tuple = ('error', 'tuple') 396 orig_err = errors.ErrorFromSmartServer(error_tuple) 397 err = errors.UnknownErrorFromSmartServer(orig_err) 398 self.assertEqual( 399 "Server sent an unexpected error: ('error', 'tuple')", str(err)) 400 401 def test_smart_message_handler_error(self): 402 # Make an exc_info tuple. 403 try: 404 raise Exception("example error") 405 except Exception: 406 err = errors.SmartMessageHandlerError(sys.exc_info()) 407 # GZ 2010-11-08: Should not store exc_info in exception instances. 408 try: 409 self.assertStartsWith( 410 str(err), "The message handler raised an exception:\n") 411 self.assertEndsWith(str(err), "Exception: example error\n") 412 finally: 413 del err 414 415 def test_unresumable_write_group(self): 416 repo = "dummy repo" 417 wg_tokens = ['token'] 418 reason = "a reason" 419 err = errors.UnresumableWriteGroup(repo, wg_tokens, reason) 420 self.assertEqual( 421 "Repository dummy repo cannot resume write group " 422 "['token']: a reason", str(err)) 423 424 def test_unsuspendable_write_group(self): 425 repo = "dummy repo" 426 err = errors.UnsuspendableWriteGroup(repo) 427 self.assertEqual( 428 'Repository dummy repo cannot suspend a write group.', str(err)) 429 430 def test_not_branch_no_args(self): 431 err = errors.NotBranchError('path') 432 self.assertEqual('Not a branch: "path".', str(err)) 433 434 def test_not_branch_bzrdir_with_recursive_not_branch_error(self): 435 class FakeBzrDir(object): 436 def open_repository(self): 437 # str() on the NotBranchError will trigger a call to this, 438 # which in turn will another, identical NotBranchError. 439 raise errors.NotBranchError('path', controldir=FakeBzrDir()) 440 err = errors.NotBranchError('path', controldir=FakeBzrDir()) 441 self.assertEqual('Not a branch: "path": NotBranchError.', str(err)) 442 443 def test_recursive_bind(self): 444 error = errors.RecursiveBind('foo_bar_branch') 445 msg = ('Branch "foo_bar_branch" appears to be bound to itself. ' 446 'Please use `brz unbind` to fix.') 447 self.assertEqualDiff(msg, str(error)) 448 449 def test_retry_with_new_packs(self): 450 fake_exc_info = ('{exc type}', '{exc value}', '{exc traceback}') 451 error = errors.RetryWithNewPacks( 452 '{context}', reload_occurred=False, exc_info=fake_exc_info) 453 self.assertEqual( 454 'Pack files have changed, reload and retry. context: ' 455 '{context} {exc value}', str(error)) 456 457 458class PassThroughError(errors.BzrError): 459 460 _fmt = """Pass through %(foo)s and %(bar)s""" 461 462 def __init__(self, foo, bar): 463 errors.BzrError.__init__(self, foo=foo, bar=bar) 464 465 466class ErrorWithBadFormat(errors.BzrError): 467 468 _fmt = """One format specifier: %(thing)s""" 469 470 471class ErrorWithNoFormat(errors.BzrError): 472 __doc__ = """This class has a docstring but no format string.""" 473 474 475class TestErrorFormatting(tests.TestCase): 476 477 def test_always_str(self): 478 e = PassThroughError(u'\xb5', 'bar') 479 self.assertIsInstance(e.__str__(), str) 480 # In Python 2 str(foo) *must* return a real byte string 481 # not a Unicode string. The following line would raise a 482 # Unicode error, because it tries to call str() on the string 483 # returned from e.__str__(), and it has non ascii characters 484 s = str(e) 485 self.assertEqual('Pass through \xb5 and bar', s) 486 487 def test_missing_format_string(self): 488 e = ErrorWithNoFormat(param='randomvalue') 489 self.assertStartsWith(str(e), 490 "Unprintable exception ErrorWithNoFormat") 491 492 def test_mismatched_format_args(self): 493 # Even though ErrorWithBadFormat's format string does not match the 494 # arguments we constructing it with, we can still stringify an instance 495 # of this exception. The resulting string will say its unprintable. 496 e = ErrorWithBadFormat(not_thing='x') 497 self.assertStartsWith( 498 str(e), 'Unprintable exception ErrorWithBadFormat') 499 500 def test_cannot_bind_address(self): 501 # see <https://bugs.launchpad.net/bzr/+bug/286871> 502 e = errors.CannotBindAddress('example.com', 22, 503 socket.error(13, 'Permission denied')) 504 self.assertContainsRe( 505 str(e), 506 r'Cannot bind address "example\.com:22":.*Permission denied') 507 508 509class TestErrorsUsingTransport(tests.TestCaseWithMemoryTransport): 510 """Tests for errors that need to use a branch or repo.""" 511 512 def test_no_public_branch(self): 513 b = self.make_branch('.') 514 error = errors.NoPublicBranch(b) 515 url = urlutils.unescape_for_display(b.base, 'ascii') 516 self.assertEqualDiff( 517 'There is no public branch set for "%s".' % url, str(error)) 518 519 def test_no_repo(self): 520 dir = controldir.ControlDir.create(self.get_url()) 521 error = errors.NoRepositoryPresent(dir) 522 self.assertNotEqual(-1, 523 str(error).find((dir.transport.clone('..').base))) 524 self.assertEqual(-1, str(error).find((dir.transport.base))) 525 526 def test_corrupt_repository(self): 527 repo = self.make_repository('.') 528 error = errors.CorruptRepository(repo) 529 self.assertEqualDiff("An error has been detected in the repository %s.\n" 530 "Please run brz reconcile on this repository." % 531 repo.controldir.root_transport.base, 532 str(error)) 533 534 def test_not_branch_bzrdir_with_repo(self): 535 controldir = self.make_repository('repo').controldir 536 err = errors.NotBranchError('path', controldir=controldir) 537 self.assertEqual( 538 'Not a branch: "path": location is a repository.', str(err)) 539 540 def test_not_branch_bzrdir_without_repo(self): 541 controldir = self.make_controldir('bzrdir') 542 err = errors.NotBranchError('path', controldir=controldir) 543 self.assertEqual('Not a branch: "path".', str(err)) 544 545 def test_not_branch_laziness(self): 546 real_bzrdir = self.make_controldir('path') 547 548 class FakeBzrDir(object): 549 def __init__(self): 550 self.calls = [] 551 552 def open_repository(self): 553 self.calls.append('open_repository') 554 raise errors.NoRepositoryPresent(real_bzrdir) 555 fake_bzrdir = FakeBzrDir() 556 err = errors.NotBranchError('path', controldir=fake_bzrdir) 557 self.assertEqual([], fake_bzrdir.calls) 558 str(err) 559 self.assertEqual(['open_repository'], fake_bzrdir.calls) 560 # Stringifying twice doesn't try to open a repository twice. 561 str(err) 562 self.assertEqual(['open_repository'], fake_bzrdir.calls) 563