1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for L{twisted.conch.checkers}. 6""" 7 8 9try: 10 import crypt 11except ImportError: 12 cryptSkip = "cannot run without crypt module" 13else: 14 cryptSkip = "" 15 16import os 17from base64 import encodebytes 18from collections import namedtuple 19from io import BytesIO 20 21from zope.interface.verify import verifyObject 22 23from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse 24from twisted.cred.credentials import ( 25 ISSHPrivateKey, 26 IUsernamePassword, 27 SSHPrivateKey, 28 UsernamePassword, 29) 30from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials 31from twisted.python import util 32from twisted.python.failure import Failure 33from twisted.python.fakepwd import ShadowDatabase, UserDatabase 34from twisted.python.filepath import FilePath 35from twisted.python.reflect import requireModule 36from twisted.test.test_process import MockOS 37from twisted.trial.unittest import TestCase 38 39if requireModule("cryptography") and requireModule("pyasn1"): 40 dependencySkip = "" 41 from twisted.conch import checkers 42 from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey 43 from twisted.conch.ssh import keys 44 from twisted.conch.test import keydata 45else: 46 dependencySkip = "can't run without cryptography and PyASN1" 47 48if getattr(os, "geteuid", None) is None: 49 euidSkip = "Cannot run without effective UIDs (questionable)" 50else: 51 euidSkip = "" 52 53 54class HelperTests(TestCase): 55 """ 56 Tests for helper functions L{verifyCryptedPassword}, L{_pwdGetByName} and 57 L{_shadowGetByName}. 58 """ 59 60 skip = cryptSkip or dependencySkip 61 62 def setUp(self): 63 self.mockos = MockOS() 64 65 def test_verifyCryptedPassword(self): 66 """ 67 L{verifyCryptedPassword} returns C{True} if the plaintext password 68 passed to it matches the encrypted password passed to it. 69 """ 70 password = "secret string" 71 salt = "salty" 72 crypted = crypt.crypt(password, salt) 73 self.assertTrue( 74 checkers.verifyCryptedPassword(crypted, password), 75 "{!r} supposed to be valid encrypted password for {!r}".format( 76 crypted, password 77 ), 78 ) 79 80 def test_verifyCryptedPasswordMD5(self): 81 """ 82 L{verifyCryptedPassword} returns True if the provided cleartext password 83 matches the provided MD5 password hash. 84 """ 85 password = "password" 86 salt = "$1$salt" 87 crypted = crypt.crypt(password, salt) 88 self.assertTrue( 89 checkers.verifyCryptedPassword(crypted, password), 90 "{!r} supposed to be valid encrypted password for {}".format( 91 crypted, password 92 ), 93 ) 94 95 def test_refuteCryptedPassword(self): 96 """ 97 L{verifyCryptedPassword} returns C{False} if the plaintext password 98 passed to it does not match the encrypted password passed to it. 99 """ 100 password = "string secret" 101 wrong = "secret string" 102 crypted = crypt.crypt(password, password) 103 self.assertFalse( 104 checkers.verifyCryptedPassword(crypted, wrong), 105 "{!r} not supposed to be valid encrypted password for {}".format( 106 crypted, wrong 107 ), 108 ) 109 110 def test_pwdGetByName(self): 111 """ 112 L{_pwdGetByName} returns a tuple of items from the UNIX /etc/passwd 113 database if the L{pwd} module is present. 114 """ 115 userdb = UserDatabase() 116 userdb.addUser("alice", "secrit", 1, 2, "first last", "/foo", "/bin/sh") 117 self.patch(checkers, "pwd", userdb) 118 self.assertEqual(checkers._pwdGetByName("alice"), userdb.getpwnam("alice")) 119 120 def test_pwdGetByNameWithoutPwd(self): 121 """ 122 If the C{pwd} module isn't present, L{_pwdGetByName} returns L{None}. 123 """ 124 self.patch(checkers, "pwd", None) 125 self.assertIsNone(checkers._pwdGetByName("alice")) 126 127 def test_shadowGetByName(self): 128 """ 129 L{_shadowGetByName} returns a tuple of items from the UNIX /etc/shadow 130 database if the L{spwd} is present. 131 """ 132 userdb = ShadowDatabase() 133 userdb.addUser("bob", "passphrase", 1, 2, 3, 4, 5, 6, 7) 134 self.patch(checkers, "spwd", userdb) 135 136 self.mockos.euid = 2345 137 self.mockos.egid = 1234 138 self.patch(util, "os", self.mockos) 139 140 self.assertEqual(checkers._shadowGetByName("bob"), userdb.getspnam("bob")) 141 self.assertEqual(self.mockos.seteuidCalls, [0, 2345]) 142 self.assertEqual(self.mockos.setegidCalls, [0, 1234]) 143 144 def test_shadowGetByNameWithoutSpwd(self): 145 """ 146 L{_shadowGetByName} returns L{None} if C{spwd} is not present. 147 """ 148 self.patch(checkers, "spwd", None) 149 150 self.assertIsNone(checkers._shadowGetByName("bob")) 151 self.assertEqual(self.mockos.seteuidCalls, []) 152 self.assertEqual(self.mockos.setegidCalls, []) 153 154 155class SSHPublicKeyDatabaseTests(TestCase): 156 """ 157 Tests for L{SSHPublicKeyDatabase}. 158 """ 159 160 skip = euidSkip or dependencySkip 161 162 def setUp(self): 163 self.checker = checkers.SSHPublicKeyDatabase() 164 self.key1 = encodebytes(b"foobar") 165 self.key2 = encodebytes(b"eggspam") 166 self.content = b"t1 " + self.key1 + b" foo\nt2 " + self.key2 + b" egg\n" 167 168 self.mockos = MockOS() 169 self.mockos.path = FilePath(self.mktemp()) 170 self.mockos.path.makedirs() 171 self.patch(util, "os", self.mockos) 172 self.sshDir = self.mockos.path.child(".ssh") 173 self.sshDir.makedirs() 174 175 userdb = UserDatabase() 176 userdb.addUser( 177 b"user", 178 b"password", 179 1, 180 2, 181 b"first last", 182 self.mockos.path.path, 183 b"/bin/shell", 184 ) 185 self.checker._userdb = userdb 186 187 def test_deprecated(self): 188 """ 189 L{SSHPublicKeyDatabase} is deprecated as of version 15.0 190 """ 191 warningsShown = self.flushWarnings(offendingFunctions=[self.setUp]) 192 self.assertEqual(warningsShown[0]["category"], DeprecationWarning) 193 self.assertEqual( 194 warningsShown[0]["message"], 195 "twisted.conch.checkers.SSHPublicKeyDatabase " 196 "was deprecated in Twisted 15.0.0: Please use " 197 "twisted.conch.checkers.SSHPublicKeyChecker, " 198 "initialized with an instance of " 199 "twisted.conch.checkers.UNIXAuthorizedKeysFiles instead.", 200 ) 201 self.assertEqual(len(warningsShown), 1) 202 203 def _testCheckKey(self, filename): 204 self.sshDir.child(filename).setContent(self.content) 205 user = UsernamePassword(b"user", b"password") 206 user.blob = b"foobar" 207 self.assertTrue(self.checker.checkKey(user)) 208 user.blob = b"eggspam" 209 self.assertTrue(self.checker.checkKey(user)) 210 user.blob = b"notallowed" 211 self.assertFalse(self.checker.checkKey(user)) 212 213 def test_checkKey(self): 214 """ 215 L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the 216 authorized_keys file and check the keys against that file. 217 """ 218 self._testCheckKey("authorized_keys") 219 self.assertEqual(self.mockos.seteuidCalls, []) 220 self.assertEqual(self.mockos.setegidCalls, []) 221 222 def test_checkKey2(self): 223 """ 224 L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the 225 authorized_keys2 file and check the keys against that file. 226 """ 227 self._testCheckKey("authorized_keys2") 228 self.assertEqual(self.mockos.seteuidCalls, []) 229 self.assertEqual(self.mockos.setegidCalls, []) 230 231 def test_checkKeyAsRoot(self): 232 """ 233 If the key file is readable, L{SSHPublicKeyDatabase.checkKey} should 234 switch its uid/gid to the ones of the authenticated user. 235 """ 236 keyFile = self.sshDir.child("authorized_keys") 237 keyFile.setContent(self.content) 238 # Fake permission error by changing the mode 239 keyFile.chmod(0o000) 240 self.addCleanup(keyFile.chmod, 0o777) 241 # And restore the right mode when seteuid is called 242 savedSeteuid = self.mockos.seteuid 243 244 def seteuid(euid): 245 keyFile.chmod(0o777) 246 return savedSeteuid(euid) 247 248 self.mockos.euid = 2345 249 self.mockos.egid = 1234 250 self.patch(self.mockos, "seteuid", seteuid) 251 self.patch(util, "os", self.mockos) 252 user = UsernamePassword(b"user", b"password") 253 user.blob = b"foobar" 254 self.assertTrue(self.checker.checkKey(user)) 255 self.assertEqual(self.mockos.seteuidCalls, [0, 1, 0, 2345]) 256 self.assertEqual(self.mockos.setegidCalls, [2, 1234]) 257 258 def test_requestAvatarId(self): 259 """ 260 L{SSHPublicKeyDatabase.requestAvatarId} should return the avatar id 261 passed in if its C{_checkKey} method returns True. 262 """ 263 264 def _checkKey(ignored): 265 return True 266 267 self.patch(self.checker, "checkKey", _checkKey) 268 credentials = SSHPrivateKey( 269 b"test", 270 b"ssh-rsa", 271 keydata.publicRSA_openssh, 272 b"foo", 273 keys.Key.fromString(keydata.privateRSA_openssh).sign(b"foo"), 274 ) 275 d = self.checker.requestAvatarId(credentials) 276 277 def _verify(avatarId): 278 self.assertEqual(avatarId, b"test") 279 280 return d.addCallback(_verify) 281 282 def test_requestAvatarIdWithoutSignature(self): 283 """ 284 L{SSHPublicKeyDatabase.requestAvatarId} should raise L{ValidPublicKey} 285 if the credentials represent a valid key without a signature. This 286 tells the user that the key is valid for login, but does not actually 287 allow that user to do so without a signature. 288 """ 289 290 def _checkKey(ignored): 291 return True 292 293 self.patch(self.checker, "checkKey", _checkKey) 294 credentials = SSHPrivateKey( 295 b"test", b"ssh-rsa", keydata.publicRSA_openssh, None, None 296 ) 297 d = self.checker.requestAvatarId(credentials) 298 return self.assertFailure(d, ValidPublicKey) 299 300 def test_requestAvatarIdInvalidKey(self): 301 """ 302 If L{SSHPublicKeyDatabase.checkKey} returns False, 303 C{_cbRequestAvatarId} should raise L{UnauthorizedLogin}. 304 """ 305 306 def _checkKey(ignored): 307 return False 308 309 self.patch(self.checker, "checkKey", _checkKey) 310 d = self.checker.requestAvatarId(None) 311 return self.assertFailure(d, UnauthorizedLogin) 312 313 def test_requestAvatarIdInvalidSignature(self): 314 """ 315 Valid keys with invalid signatures should cause 316 L{SSHPublicKeyDatabase.requestAvatarId} to return a {UnauthorizedLogin} 317 failure 318 """ 319 320 def _checkKey(ignored): 321 return True 322 323 self.patch(self.checker, "checkKey", _checkKey) 324 credentials = SSHPrivateKey( 325 b"test", 326 b"ssh-rsa", 327 keydata.publicRSA_openssh, 328 b"foo", 329 keys.Key.fromString(keydata.privateDSA_openssh).sign(b"foo"), 330 ) 331 d = self.checker.requestAvatarId(credentials) 332 return self.assertFailure(d, UnauthorizedLogin) 333 334 def test_requestAvatarIdNormalizeException(self): 335 """ 336 Exceptions raised while verifying the key should be normalized into an 337 C{UnauthorizedLogin} failure. 338 """ 339 340 def _checkKey(ignored): 341 return True 342 343 self.patch(self.checker, "checkKey", _checkKey) 344 credentials = SSHPrivateKey(b"test", None, b"blob", b"sigData", b"sig") 345 d = self.checker.requestAvatarId(credentials) 346 347 def _verifyLoggedException(failure): 348 errors = self.flushLoggedErrors(keys.BadKeyError) 349 self.assertEqual(len(errors), 1) 350 return failure 351 352 d.addErrback(_verifyLoggedException) 353 return self.assertFailure(d, UnauthorizedLogin) 354 355 356class SSHProtocolCheckerTests(TestCase): 357 """ 358 Tests for L{SSHProtocolChecker}. 359 """ 360 361 skip = dependencySkip 362 363 def test_registerChecker(self): 364 """ 365 L{SSHProcotolChecker.registerChecker} should add the given checker to 366 the list of registered checkers. 367 """ 368 checker = checkers.SSHProtocolChecker() 369 self.assertEqual(checker.credentialInterfaces, []) 370 checker.registerChecker( 371 checkers.SSHPublicKeyDatabase(), 372 ) 373 self.assertEqual(checker.credentialInterfaces, [ISSHPrivateKey]) 374 self.assertIsInstance( 375 checker.checkers[ISSHPrivateKey], checkers.SSHPublicKeyDatabase 376 ) 377 378 def test_registerCheckerWithInterface(self): 379 """ 380 If a specific interface is passed into 381 L{SSHProtocolChecker.registerChecker}, that interface should be 382 registered instead of what the checker specifies in 383 credentialIntefaces. 384 """ 385 checker = checkers.SSHProtocolChecker() 386 self.assertEqual(checker.credentialInterfaces, []) 387 checker.registerChecker(checkers.SSHPublicKeyDatabase(), IUsernamePassword) 388 self.assertEqual(checker.credentialInterfaces, [IUsernamePassword]) 389 self.assertIsInstance( 390 checker.checkers[IUsernamePassword], checkers.SSHPublicKeyDatabase 391 ) 392 393 def test_requestAvatarId(self): 394 """ 395 L{SSHProtocolChecker.requestAvatarId} should defer to one if its 396 registered checkers to authenticate a user. 397 """ 398 checker = checkers.SSHProtocolChecker() 399 passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse() 400 passwordDatabase.addUser(b"test", b"test") 401 checker.registerChecker(passwordDatabase) 402 d = checker.requestAvatarId(UsernamePassword(b"test", b"test")) 403 404 def _callback(avatarId): 405 self.assertEqual(avatarId, b"test") 406 407 return d.addCallback(_callback) 408 409 def test_requestAvatarIdWithNotEnoughAuthentication(self): 410 """ 411 If the client indicates that it is never satisfied, by always returning 412 False from _areDone, then L{SSHProtocolChecker} should raise 413 L{NotEnoughAuthentication}. 414 """ 415 checker = checkers.SSHProtocolChecker() 416 417 def _areDone(avatarId): 418 return False 419 420 self.patch(checker, "areDone", _areDone) 421 422 passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse() 423 passwordDatabase.addUser(b"test", b"test") 424 checker.registerChecker(passwordDatabase) 425 d = checker.requestAvatarId(UsernamePassword(b"test", b"test")) 426 return self.assertFailure(d, NotEnoughAuthentication) 427 428 def test_requestAvatarIdInvalidCredential(self): 429 """ 430 If the passed credentials aren't handled by any registered checker, 431 L{SSHProtocolChecker} should raise L{UnhandledCredentials}. 432 """ 433 checker = checkers.SSHProtocolChecker() 434 d = checker.requestAvatarId(UsernamePassword(b"test", b"test")) 435 return self.assertFailure(d, UnhandledCredentials) 436 437 def test_areDone(self): 438 """ 439 The default L{SSHProcotolChecker.areDone} should simply return True. 440 """ 441 self.assertTrue(checkers.SSHProtocolChecker().areDone(None)) 442 443 444class UNIXPasswordDatabaseTests(TestCase): 445 """ 446 Tests for L{UNIXPasswordDatabase}. 447 """ 448 449 skip = cryptSkip or dependencySkip 450 451 def assertLoggedIn(self, d, username): 452 """ 453 Assert that the L{Deferred} passed in is called back with the value 454 'username'. This represents a valid login for this TestCase. 455 456 NOTE: To work, this method's return value must be returned from the 457 test method, or otherwise hooked up to the test machinery. 458 459 @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 460 @type d: L{Deferred} 461 @rtype: L{Deferred} 462 """ 463 result = [] 464 d.addBoth(result.append) 465 self.assertEqual(len(result), 1, "login incomplete") 466 if isinstance(result[0], Failure): 467 result[0].raiseException() 468 self.assertEqual(result[0], username) 469 470 def test_defaultCheckers(self): 471 """ 472 L{UNIXPasswordDatabase} with no arguments has checks the C{pwd} database 473 and then the C{spwd} database. 474 """ 475 checker = checkers.UNIXPasswordDatabase() 476 477 def crypted(username, password): 478 salt = crypt.crypt(password, username) 479 crypted = crypt.crypt(password, "$1$" + salt) 480 return crypted 481 482 pwd = UserDatabase() 483 pwd.addUser( 484 "alice", crypted("alice", "password"), 1, 2, "foo", "/foo", "/bin/sh" 485 ) 486 # x and * are convention for "look elsewhere for the password" 487 pwd.addUser("bob", "x", 1, 2, "bar", "/bar", "/bin/sh") 488 spwd = ShadowDatabase() 489 spwd.addUser("alice", "wrong", 1, 2, 3, 4, 5, 6, 7) 490 spwd.addUser("bob", crypted("bob", "password"), 8, 9, 10, 11, 12, 13, 14) 491 492 self.patch(checkers, "pwd", pwd) 493 self.patch(checkers, "spwd", spwd) 494 495 mockos = MockOS() 496 self.patch(util, "os", mockos) 497 498 mockos.euid = 2345 499 mockos.egid = 1234 500 501 cred = UsernamePassword(b"alice", b"password") 502 self.assertLoggedIn(checker.requestAvatarId(cred), b"alice") 503 self.assertEqual(mockos.seteuidCalls, []) 504 self.assertEqual(mockos.setegidCalls, []) 505 cred.username = b"bob" 506 self.assertLoggedIn(checker.requestAvatarId(cred), b"bob") 507 self.assertEqual(mockos.seteuidCalls, [0, 2345]) 508 self.assertEqual(mockos.setegidCalls, [0, 1234]) 509 510 def assertUnauthorizedLogin(self, d): 511 """ 512 Asserts that the L{Deferred} passed in is erred back with an 513 L{UnauthorizedLogin} L{Failure}. This reprsents an invalid login for 514 this TestCase. 515 516 NOTE: To work, this method's return value must be returned from the 517 test method, or otherwise hooked up to the test machinery. 518 519 @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 520 @type d: L{Deferred} 521 @rtype: L{None} 522 """ 523 self.assertRaises( 524 checkers.UnauthorizedLogin, self.assertLoggedIn, d, "bogus value" 525 ) 526 527 def test_passInCheckers(self): 528 """ 529 L{UNIXPasswordDatabase} takes a list of functions to check for UNIX 530 user information. 531 """ 532 password = crypt.crypt("secret", "secret") 533 userdb = UserDatabase() 534 userdb.addUser("anybody", password, 1, 2, "foo", "/bar", "/bin/sh") 535 checker = checkers.UNIXPasswordDatabase([userdb.getpwnam]) 536 self.assertLoggedIn( 537 checker.requestAvatarId(UsernamePassword(b"anybody", b"secret")), b"anybody" 538 ) 539 540 def test_verifyPassword(self): 541 """ 542 If the encrypted password provided by the getpwnam function is valid 543 (verified by the L{verifyCryptedPassword} function), we callback the 544 C{requestAvatarId} L{Deferred} with the username. 545 """ 546 547 def verifyCryptedPassword(crypted, pw): 548 return crypted == pw 549 550 def getpwnam(username): 551 return [username, username] 552 553 self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword) 554 checker = checkers.UNIXPasswordDatabase([getpwnam]) 555 credential = UsernamePassword(b"username", b"username") 556 self.assertLoggedIn(checker.requestAvatarId(credential), b"username") 557 558 def test_failOnKeyError(self): 559 """ 560 If the getpwnam function raises a KeyError, the login fails with an 561 L{UnauthorizedLogin} exception. 562 """ 563 564 def getpwnam(username): 565 raise KeyError(username) 566 567 checker = checkers.UNIXPasswordDatabase([getpwnam]) 568 credential = UsernamePassword(b"username", b"username") 569 self.assertUnauthorizedLogin(checker.requestAvatarId(credential)) 570 571 def test_failOnBadPassword(self): 572 """ 573 If the verifyCryptedPassword function doesn't verify the password, the 574 login fails with an L{UnauthorizedLogin} exception. 575 """ 576 577 def verifyCryptedPassword(crypted, pw): 578 return False 579 580 def getpwnam(username): 581 return [username, username] 582 583 self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword) 584 checker = checkers.UNIXPasswordDatabase([getpwnam]) 585 credential = UsernamePassword(b"username", b"username") 586 self.assertUnauthorizedLogin(checker.requestAvatarId(credential)) 587 588 def test_loopThroughFunctions(self): 589 """ 590 UNIXPasswordDatabase.requestAvatarId loops through each getpwnam 591 function associated with it and returns a L{Deferred} which fires with 592 the result of the first one which returns a value other than None. 593 ones do not verify the password. 594 """ 595 596 def verifyCryptedPassword(crypted, pw): 597 return crypted == pw 598 599 def getpwnam1(username): 600 return [username, "not the password"] 601 602 def getpwnam2(username): 603 return [username, username] 604 605 self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword) 606 checker = checkers.UNIXPasswordDatabase([getpwnam1, getpwnam2]) 607 credential = UsernamePassword(b"username", b"username") 608 self.assertLoggedIn(checker.requestAvatarId(credential), b"username") 609 610 def test_failOnSpecial(self): 611 """ 612 If the password returned by any function is C{""}, C{"x"}, or C{"*"} it 613 is not compared against the supplied password. Instead it is skipped. 614 """ 615 pwd = UserDatabase() 616 pwd.addUser("alice", "", 1, 2, "", "foo", "bar") 617 pwd.addUser("bob", "x", 1, 2, "", "foo", "bar") 618 pwd.addUser("carol", "*", 1, 2, "", "foo", "bar") 619 self.patch(checkers, "pwd", pwd) 620 621 checker = checkers.UNIXPasswordDatabase([checkers._pwdGetByName]) 622 cred = UsernamePassword(b"alice", b"") 623 self.assertUnauthorizedLogin(checker.requestAvatarId(cred)) 624 625 cred = UsernamePassword(b"bob", b"x") 626 self.assertUnauthorizedLogin(checker.requestAvatarId(cred)) 627 628 cred = UsernamePassword(b"carol", b"*") 629 self.assertUnauthorizedLogin(checker.requestAvatarId(cred)) 630 631 632class AuthorizedKeyFileReaderTests(TestCase): 633 """ 634 Tests for L{checkers.readAuthorizedKeyFile} 635 """ 636 637 skip = dependencySkip 638 639 def test_ignoresComments(self): 640 """ 641 L{checkers.readAuthorizedKeyFile} does not attempt to turn comments 642 into keys 643 """ 644 fileobj = BytesIO( 645 b"# this comment is ignored\n" 646 b"this is not\n" 647 b"# this is again\n" 648 b"and this is not" 649 ) 650 result = checkers.readAuthorizedKeyFile(fileobj, lambda x: x) 651 self.assertEqual([b"this is not", b"and this is not"], list(result)) 652 653 def test_ignoresLeadingWhitespaceAndEmptyLines(self): 654 """ 655 L{checkers.readAuthorizedKeyFile} ignores leading whitespace in 656 lines, as well as empty lines 657 """ 658 fileobj = BytesIO( 659 b""" 660 # ignore 661 not ignored 662 """ 663 ) 664 result = checkers.readAuthorizedKeyFile(fileobj, parseKey=lambda x: x) 665 self.assertEqual([b"not ignored"], list(result)) 666 667 def test_ignoresUnparsableKeys(self): 668 """ 669 L{checkers.readAuthorizedKeyFile} does not raise an exception 670 when a key fails to parse (raises a 671 L{twisted.conch.ssh.keys.BadKeyError}), but rather just keeps going 672 """ 673 674 def failOnSome(line): 675 if line.startswith(b"f"): 676 raise keys.BadKeyError("failed to parse") 677 return line 678 679 fileobj = BytesIO(b"failed key\ngood key") 680 result = checkers.readAuthorizedKeyFile(fileobj, parseKey=failOnSome) 681 self.assertEqual([b"good key"], list(result)) 682 683 684class InMemorySSHKeyDBTests(TestCase): 685 """ 686 Tests for L{checkers.InMemorySSHKeyDB} 687 """ 688 689 skip = dependencySkip 690 691 def test_implementsInterface(self): 692 """ 693 L{checkers.InMemorySSHKeyDB} implements 694 L{checkers.IAuthorizedKeysDB} 695 """ 696 keydb = checkers.InMemorySSHKeyDB({b"alice": [b"key"]}) 697 verifyObject(checkers.IAuthorizedKeysDB, keydb) 698 699 def test_noKeysForUnauthorizedUser(self): 700 """ 701 If the user is not in the mapping provided to 702 L{checkers.InMemorySSHKeyDB}, an empty iterator is returned 703 by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys} 704 """ 705 keydb = checkers.InMemorySSHKeyDB({b"alice": [b"keys"]}) 706 self.assertEqual([], list(keydb.getAuthorizedKeys(b"bob"))) 707 708 def test_allKeysForAuthorizedUser(self): 709 """ 710 If the user is in the mapping provided to 711 L{checkers.InMemorySSHKeyDB}, an iterator with all the keys 712 is returned by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys} 713 """ 714 keydb = checkers.InMemorySSHKeyDB({b"alice": [b"a", b"b"]}) 715 self.assertEqual([b"a", b"b"], list(keydb.getAuthorizedKeys(b"alice"))) 716 717 718class UNIXAuthorizedKeysFilesTests(TestCase): 719 """ 720 Tests for L{checkers.UNIXAuthorizedKeysFiles}. 721 """ 722 723 skip = dependencySkip 724 725 def setUp(self): 726 mockos = MockOS() 727 mockos.path = FilePath(self.mktemp()) 728 mockos.path.makedirs() 729 730 self.userdb = UserDatabase() 731 self.userdb.addUser( 732 b"alice", 733 b"password", 734 1, 735 2, 736 b"alice lastname", 737 mockos.path.path, 738 b"/bin/shell", 739 ) 740 741 self.sshDir = mockos.path.child(".ssh") 742 self.sshDir.makedirs() 743 authorizedKeys = self.sshDir.child("authorized_keys") 744 authorizedKeys.setContent(b"key 1\nkey 2") 745 746 self.expectedKeys = [b"key 1", b"key 2"] 747 748 def test_implementsInterface(self): 749 """ 750 L{checkers.UNIXAuthorizedKeysFiles} implements 751 L{checkers.IAuthorizedKeysDB}. 752 """ 753 keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb) 754 verifyObject(checkers.IAuthorizedKeysDB, keydb) 755 756 def test_noKeysForUnauthorizedUser(self): 757 """ 758 If the user is not in the user database provided to 759 L{checkers.UNIXAuthorizedKeysFiles}, an empty iterator is returned 760 by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}. 761 """ 762 keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x) 763 self.assertEqual([], list(keydb.getAuthorizedKeys("bob"))) 764 765 def test_allKeysInAllAuthorizedFilesForAuthorizedUser(self): 766 """ 767 If the user is in the user database provided to 768 L{checkers.UNIXAuthorizedKeysFiles}, an iterator with all the keys in 769 C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2} is returned 770 by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}. 771 """ 772 self.sshDir.child("authorized_keys2").setContent(b"key 3") 773 keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x) 774 self.assertEqual( 775 self.expectedKeys + [b"key 3"], list(keydb.getAuthorizedKeys(b"alice")) 776 ) 777 778 def test_ignoresNonexistantFile(self): 779 """ 780 L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only 781 the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2} 782 if they exist. 783 """ 784 keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x) 785 self.assertEqual(self.expectedKeys, list(keydb.getAuthorizedKeys(b"alice"))) 786 787 def test_ignoresUnreadableFile(self): 788 """ 789 L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only 790 the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2} 791 if they are readable. 792 """ 793 self.sshDir.child("authorized_keys2").makedirs() 794 keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x) 795 self.assertEqual(self.expectedKeys, list(keydb.getAuthorizedKeys(b"alice"))) 796 797 798_KeyDB = namedtuple("_KeyDB", ["getAuthorizedKeys"]) 799 800 801class _DummyException(Exception): 802 """ 803 Fake exception to be used for testing. 804 """ 805 806 pass 807 808 809class SSHPublicKeyCheckerTests(TestCase): 810 """ 811 Tests for L{checkers.SSHPublicKeyChecker}. 812 """ 813 814 skip = dependencySkip 815 816 def setUp(self): 817 self.credentials = SSHPrivateKey( 818 b"alice", 819 b"ssh-rsa", 820 keydata.publicRSA_openssh, 821 b"foo", 822 keys.Key.fromString(keydata.privateRSA_openssh).sign(b"foo"), 823 ) 824 self.keydb = _KeyDB(lambda _: [keys.Key.fromString(keydata.publicRSA_openssh)]) 825 self.checker = checkers.SSHPublicKeyChecker(self.keydb) 826 827 def test_credentialsWithoutSignature(self): 828 """ 829 Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with 830 credentials that do not have a signature fails with L{ValidPublicKey}. 831 """ 832 self.credentials.signature = None 833 self.failureResultOf( 834 self.checker.requestAvatarId(self.credentials), ValidPublicKey 835 ) 836 837 def test_credentialsWithBadKey(self): 838 """ 839 Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with 840 credentials that have a bad key fails with L{keys.BadKeyError}. 841 """ 842 self.credentials.blob = b"" 843 self.failureResultOf( 844 self.checker.requestAvatarId(self.credentials), keys.BadKeyError 845 ) 846 847 def test_credentialsNoMatchingKey(self): 848 """ 849 If L{checkers.IAuthorizedKeysDB.getAuthorizedKeys} returns no keys 850 that match the credentials, 851 L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with 852 L{UnauthorizedLogin}. 853 """ 854 self.credentials.blob = keydata.publicDSA_openssh 855 self.failureResultOf( 856 self.checker.requestAvatarId(self.credentials), UnauthorizedLogin 857 ) 858 859 def test_credentialsInvalidSignature(self): 860 """ 861 Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with 862 credentials that are incorrectly signed fails with 863 L{UnauthorizedLogin}. 864 """ 865 self.credentials.signature = keys.Key.fromString( 866 keydata.privateDSA_openssh 867 ).sign(b"foo") 868 self.failureResultOf( 869 self.checker.requestAvatarId(self.credentials), UnauthorizedLogin 870 ) 871 872 def test_failureVerifyingKey(self): 873 """ 874 If L{keys.Key.verify} raises an exception, 875 L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with 876 L{UnauthorizedLogin}. 877 """ 878 879 def fail(*args, **kwargs): 880 raise _DummyException() 881 882 self.patch(keys.Key, "verify", fail) 883 884 self.failureResultOf( 885 self.checker.requestAvatarId(self.credentials), UnauthorizedLogin 886 ) 887 self.flushLoggedErrors(_DummyException) 888 889 def test_usernameReturnedOnSuccess(self): 890 """ 891 L{checker.SSHPublicKeyChecker.requestAvatarId}, if successful, 892 callbacks with the username. 893 """ 894 d = self.checker.requestAvatarId(self.credentials) 895 self.assertEqual(b"alice", self.successResultOf(d)) 896