1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for L{twisted.conch.checkers}.
6"""
7
8
9try:
10    import crypt
11except ImportError:
12    cryptSkip = "cannot run without crypt module"
13else:
14    cryptSkip = ""
15
16import os
17from base64 import encodebytes
18from collections import namedtuple
19from io import BytesIO
20
21from zope.interface.verify import verifyObject
22
23from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
24from twisted.cred.credentials import (
25    ISSHPrivateKey,
26    IUsernamePassword,
27    SSHPrivateKey,
28    UsernamePassword,
29)
30from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
31from twisted.python import util
32from twisted.python.failure import Failure
33from twisted.python.fakepwd import ShadowDatabase, UserDatabase
34from twisted.python.filepath import FilePath
35from twisted.python.reflect import requireModule
36from twisted.test.test_process import MockOS
37from twisted.trial.unittest import TestCase
38
39if requireModule("cryptography") and requireModule("pyasn1"):
40    dependencySkip = ""
41    from twisted.conch import checkers
42    from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey
43    from twisted.conch.ssh import keys
44    from twisted.conch.test import keydata
45else:
46    dependencySkip = "can't run without cryptography and PyASN1"
47
48if getattr(os, "geteuid", None) is None:
49    euidSkip = "Cannot run without effective UIDs (questionable)"
50else:
51    euidSkip = ""
52
53
54class HelperTests(TestCase):
55    """
56    Tests for helper functions L{verifyCryptedPassword}, L{_pwdGetByName} and
57    L{_shadowGetByName}.
58    """
59
60    skip = cryptSkip or dependencySkip
61
62    def setUp(self):
63        self.mockos = MockOS()
64
65    def test_verifyCryptedPassword(self):
66        """
67        L{verifyCryptedPassword} returns C{True} if the plaintext password
68        passed to it matches the encrypted password passed to it.
69        """
70        password = "secret string"
71        salt = "salty"
72        crypted = crypt.crypt(password, salt)
73        self.assertTrue(
74            checkers.verifyCryptedPassword(crypted, password),
75            "{!r} supposed to be valid encrypted password for {!r}".format(
76                crypted, password
77            ),
78        )
79
80    def test_verifyCryptedPasswordMD5(self):
81        """
82        L{verifyCryptedPassword} returns True if the provided cleartext password
83        matches the provided MD5 password hash.
84        """
85        password = "password"
86        salt = "$1$salt"
87        crypted = crypt.crypt(password, salt)
88        self.assertTrue(
89            checkers.verifyCryptedPassword(crypted, password),
90            "{!r} supposed to be valid encrypted password for {}".format(
91                crypted, password
92            ),
93        )
94
95    def test_refuteCryptedPassword(self):
96        """
97        L{verifyCryptedPassword} returns C{False} if the plaintext password
98        passed to it does not match the encrypted password passed to it.
99        """
100        password = "string secret"
101        wrong = "secret string"
102        crypted = crypt.crypt(password, password)
103        self.assertFalse(
104            checkers.verifyCryptedPassword(crypted, wrong),
105            "{!r} not supposed to be valid encrypted password for {}".format(
106                crypted, wrong
107            ),
108        )
109
110    def test_pwdGetByName(self):
111        """
112        L{_pwdGetByName} returns a tuple of items from the UNIX /etc/passwd
113        database if the L{pwd} module is present.
114        """
115        userdb = UserDatabase()
116        userdb.addUser("alice", "secrit", 1, 2, "first last", "/foo", "/bin/sh")
117        self.patch(checkers, "pwd", userdb)
118        self.assertEqual(checkers._pwdGetByName("alice"), userdb.getpwnam("alice"))
119
120    def test_pwdGetByNameWithoutPwd(self):
121        """
122        If the C{pwd} module isn't present, L{_pwdGetByName} returns L{None}.
123        """
124        self.patch(checkers, "pwd", None)
125        self.assertIsNone(checkers._pwdGetByName("alice"))
126
127    def test_shadowGetByName(self):
128        """
129        L{_shadowGetByName} returns a tuple of items from the UNIX /etc/shadow
130        database if the L{spwd} is present.
131        """
132        userdb = ShadowDatabase()
133        userdb.addUser("bob", "passphrase", 1, 2, 3, 4, 5, 6, 7)
134        self.patch(checkers, "spwd", userdb)
135
136        self.mockos.euid = 2345
137        self.mockos.egid = 1234
138        self.patch(util, "os", self.mockos)
139
140        self.assertEqual(checkers._shadowGetByName("bob"), userdb.getspnam("bob"))
141        self.assertEqual(self.mockos.seteuidCalls, [0, 2345])
142        self.assertEqual(self.mockos.setegidCalls, [0, 1234])
143
144    def test_shadowGetByNameWithoutSpwd(self):
145        """
146        L{_shadowGetByName} returns L{None} if C{spwd} is not present.
147        """
148        self.patch(checkers, "spwd", None)
149
150        self.assertIsNone(checkers._shadowGetByName("bob"))
151        self.assertEqual(self.mockos.seteuidCalls, [])
152        self.assertEqual(self.mockos.setegidCalls, [])
153
154
155class SSHPublicKeyDatabaseTests(TestCase):
156    """
157    Tests for L{SSHPublicKeyDatabase}.
158    """
159
160    skip = euidSkip or dependencySkip
161
162    def setUp(self):
163        self.checker = checkers.SSHPublicKeyDatabase()
164        self.key1 = encodebytes(b"foobar")
165        self.key2 = encodebytes(b"eggspam")
166        self.content = b"t1 " + self.key1 + b" foo\nt2 " + self.key2 + b" egg\n"
167
168        self.mockos = MockOS()
169        self.mockos.path = FilePath(self.mktemp())
170        self.mockos.path.makedirs()
171        self.patch(util, "os", self.mockos)
172        self.sshDir = self.mockos.path.child(".ssh")
173        self.sshDir.makedirs()
174
175        userdb = UserDatabase()
176        userdb.addUser(
177            b"user",
178            b"password",
179            1,
180            2,
181            b"first last",
182            self.mockos.path.path,
183            b"/bin/shell",
184        )
185        self.checker._userdb = userdb
186
187    def test_deprecated(self):
188        """
189        L{SSHPublicKeyDatabase} is deprecated as of version 15.0
190        """
191        warningsShown = self.flushWarnings(offendingFunctions=[self.setUp])
192        self.assertEqual(warningsShown[0]["category"], DeprecationWarning)
193        self.assertEqual(
194            warningsShown[0]["message"],
195            "twisted.conch.checkers.SSHPublicKeyDatabase "
196            "was deprecated in Twisted 15.0.0: Please use "
197            "twisted.conch.checkers.SSHPublicKeyChecker, "
198            "initialized with an instance of "
199            "twisted.conch.checkers.UNIXAuthorizedKeysFiles instead.",
200        )
201        self.assertEqual(len(warningsShown), 1)
202
203    def _testCheckKey(self, filename):
204        self.sshDir.child(filename).setContent(self.content)
205        user = UsernamePassword(b"user", b"password")
206        user.blob = b"foobar"
207        self.assertTrue(self.checker.checkKey(user))
208        user.blob = b"eggspam"
209        self.assertTrue(self.checker.checkKey(user))
210        user.blob = b"notallowed"
211        self.assertFalse(self.checker.checkKey(user))
212
213    def test_checkKey(self):
214        """
215        L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
216        authorized_keys file and check the keys against that file.
217        """
218        self._testCheckKey("authorized_keys")
219        self.assertEqual(self.mockos.seteuidCalls, [])
220        self.assertEqual(self.mockos.setegidCalls, [])
221
222    def test_checkKey2(self):
223        """
224        L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
225        authorized_keys2 file and check the keys against that file.
226        """
227        self._testCheckKey("authorized_keys2")
228        self.assertEqual(self.mockos.seteuidCalls, [])
229        self.assertEqual(self.mockos.setegidCalls, [])
230
231    def test_checkKeyAsRoot(self):
232        """
233        If the key file is readable, L{SSHPublicKeyDatabase.checkKey} should
234        switch its uid/gid to the ones of the authenticated user.
235        """
236        keyFile = self.sshDir.child("authorized_keys")
237        keyFile.setContent(self.content)
238        # Fake permission error by changing the mode
239        keyFile.chmod(0o000)
240        self.addCleanup(keyFile.chmod, 0o777)
241        # And restore the right mode when seteuid is called
242        savedSeteuid = self.mockos.seteuid
243
244        def seteuid(euid):
245            keyFile.chmod(0o777)
246            return savedSeteuid(euid)
247
248        self.mockos.euid = 2345
249        self.mockos.egid = 1234
250        self.patch(self.mockos, "seteuid", seteuid)
251        self.patch(util, "os", self.mockos)
252        user = UsernamePassword(b"user", b"password")
253        user.blob = b"foobar"
254        self.assertTrue(self.checker.checkKey(user))
255        self.assertEqual(self.mockos.seteuidCalls, [0, 1, 0, 2345])
256        self.assertEqual(self.mockos.setegidCalls, [2, 1234])
257
258    def test_requestAvatarId(self):
259        """
260        L{SSHPublicKeyDatabase.requestAvatarId} should return the avatar id
261        passed in if its C{_checkKey} method returns True.
262        """
263
264        def _checkKey(ignored):
265            return True
266
267        self.patch(self.checker, "checkKey", _checkKey)
268        credentials = SSHPrivateKey(
269            b"test",
270            b"ssh-rsa",
271            keydata.publicRSA_openssh,
272            b"foo",
273            keys.Key.fromString(keydata.privateRSA_openssh).sign(b"foo"),
274        )
275        d = self.checker.requestAvatarId(credentials)
276
277        def _verify(avatarId):
278            self.assertEqual(avatarId, b"test")
279
280        return d.addCallback(_verify)
281
282    def test_requestAvatarIdWithoutSignature(self):
283        """
284        L{SSHPublicKeyDatabase.requestAvatarId} should raise L{ValidPublicKey}
285        if the credentials represent a valid key without a signature.  This
286        tells the user that the key is valid for login, but does not actually
287        allow that user to do so without a signature.
288        """
289
290        def _checkKey(ignored):
291            return True
292
293        self.patch(self.checker, "checkKey", _checkKey)
294        credentials = SSHPrivateKey(
295            b"test", b"ssh-rsa", keydata.publicRSA_openssh, None, None
296        )
297        d = self.checker.requestAvatarId(credentials)
298        return self.assertFailure(d, ValidPublicKey)
299
300    def test_requestAvatarIdInvalidKey(self):
301        """
302        If L{SSHPublicKeyDatabase.checkKey} returns False,
303        C{_cbRequestAvatarId} should raise L{UnauthorizedLogin}.
304        """
305
306        def _checkKey(ignored):
307            return False
308
309        self.patch(self.checker, "checkKey", _checkKey)
310        d = self.checker.requestAvatarId(None)
311        return self.assertFailure(d, UnauthorizedLogin)
312
313    def test_requestAvatarIdInvalidSignature(self):
314        """
315        Valid keys with invalid signatures should cause
316        L{SSHPublicKeyDatabase.requestAvatarId} to return a {UnauthorizedLogin}
317        failure
318        """
319
320        def _checkKey(ignored):
321            return True
322
323        self.patch(self.checker, "checkKey", _checkKey)
324        credentials = SSHPrivateKey(
325            b"test",
326            b"ssh-rsa",
327            keydata.publicRSA_openssh,
328            b"foo",
329            keys.Key.fromString(keydata.privateDSA_openssh).sign(b"foo"),
330        )
331        d = self.checker.requestAvatarId(credentials)
332        return self.assertFailure(d, UnauthorizedLogin)
333
334    def test_requestAvatarIdNormalizeException(self):
335        """
336        Exceptions raised while verifying the key should be normalized into an
337        C{UnauthorizedLogin} failure.
338        """
339
340        def _checkKey(ignored):
341            return True
342
343        self.patch(self.checker, "checkKey", _checkKey)
344        credentials = SSHPrivateKey(b"test", None, b"blob", b"sigData", b"sig")
345        d = self.checker.requestAvatarId(credentials)
346
347        def _verifyLoggedException(failure):
348            errors = self.flushLoggedErrors(keys.BadKeyError)
349            self.assertEqual(len(errors), 1)
350            return failure
351
352        d.addErrback(_verifyLoggedException)
353        return self.assertFailure(d, UnauthorizedLogin)
354
355
356class SSHProtocolCheckerTests(TestCase):
357    """
358    Tests for L{SSHProtocolChecker}.
359    """
360
361    skip = dependencySkip
362
363    def test_registerChecker(self):
364        """
365        L{SSHProcotolChecker.registerChecker} should add the given checker to
366        the list of registered checkers.
367        """
368        checker = checkers.SSHProtocolChecker()
369        self.assertEqual(checker.credentialInterfaces, [])
370        checker.registerChecker(
371            checkers.SSHPublicKeyDatabase(),
372        )
373        self.assertEqual(checker.credentialInterfaces, [ISSHPrivateKey])
374        self.assertIsInstance(
375            checker.checkers[ISSHPrivateKey], checkers.SSHPublicKeyDatabase
376        )
377
378    def test_registerCheckerWithInterface(self):
379        """
380        If a specific interface is passed into
381        L{SSHProtocolChecker.registerChecker}, that interface should be
382        registered instead of what the checker specifies in
383        credentialIntefaces.
384        """
385        checker = checkers.SSHProtocolChecker()
386        self.assertEqual(checker.credentialInterfaces, [])
387        checker.registerChecker(checkers.SSHPublicKeyDatabase(), IUsernamePassword)
388        self.assertEqual(checker.credentialInterfaces, [IUsernamePassword])
389        self.assertIsInstance(
390            checker.checkers[IUsernamePassword], checkers.SSHPublicKeyDatabase
391        )
392
393    def test_requestAvatarId(self):
394        """
395        L{SSHProtocolChecker.requestAvatarId} should defer to one if its
396        registered checkers to authenticate a user.
397        """
398        checker = checkers.SSHProtocolChecker()
399        passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
400        passwordDatabase.addUser(b"test", b"test")
401        checker.registerChecker(passwordDatabase)
402        d = checker.requestAvatarId(UsernamePassword(b"test", b"test"))
403
404        def _callback(avatarId):
405            self.assertEqual(avatarId, b"test")
406
407        return d.addCallback(_callback)
408
409    def test_requestAvatarIdWithNotEnoughAuthentication(self):
410        """
411        If the client indicates that it is never satisfied, by always returning
412        False from _areDone, then L{SSHProtocolChecker} should raise
413        L{NotEnoughAuthentication}.
414        """
415        checker = checkers.SSHProtocolChecker()
416
417        def _areDone(avatarId):
418            return False
419
420        self.patch(checker, "areDone", _areDone)
421
422        passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
423        passwordDatabase.addUser(b"test", b"test")
424        checker.registerChecker(passwordDatabase)
425        d = checker.requestAvatarId(UsernamePassword(b"test", b"test"))
426        return self.assertFailure(d, NotEnoughAuthentication)
427
428    def test_requestAvatarIdInvalidCredential(self):
429        """
430        If the passed credentials aren't handled by any registered checker,
431        L{SSHProtocolChecker} should raise L{UnhandledCredentials}.
432        """
433        checker = checkers.SSHProtocolChecker()
434        d = checker.requestAvatarId(UsernamePassword(b"test", b"test"))
435        return self.assertFailure(d, UnhandledCredentials)
436
437    def test_areDone(self):
438        """
439        The default L{SSHProcotolChecker.areDone} should simply return True.
440        """
441        self.assertTrue(checkers.SSHProtocolChecker().areDone(None))
442
443
444class UNIXPasswordDatabaseTests(TestCase):
445    """
446    Tests for L{UNIXPasswordDatabase}.
447    """
448
449    skip = cryptSkip or dependencySkip
450
451    def assertLoggedIn(self, d, username):
452        """
453        Assert that the L{Deferred} passed in is called back with the value
454        'username'.  This represents a valid login for this TestCase.
455
456        NOTE: To work, this method's return value must be returned from the
457        test method, or otherwise hooked up to the test machinery.
458
459        @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method.
460        @type d: L{Deferred}
461        @rtype: L{Deferred}
462        """
463        result = []
464        d.addBoth(result.append)
465        self.assertEqual(len(result), 1, "login incomplete")
466        if isinstance(result[0], Failure):
467            result[0].raiseException()
468        self.assertEqual(result[0], username)
469
470    def test_defaultCheckers(self):
471        """
472        L{UNIXPasswordDatabase} with no arguments has checks the C{pwd} database
473        and then the C{spwd} database.
474        """
475        checker = checkers.UNIXPasswordDatabase()
476
477        def crypted(username, password):
478            salt = crypt.crypt(password, username)
479            crypted = crypt.crypt(password, "$1$" + salt)
480            return crypted
481
482        pwd = UserDatabase()
483        pwd.addUser(
484            "alice", crypted("alice", "password"), 1, 2, "foo", "/foo", "/bin/sh"
485        )
486        # x and * are convention for "look elsewhere for the password"
487        pwd.addUser("bob", "x", 1, 2, "bar", "/bar", "/bin/sh")
488        spwd = ShadowDatabase()
489        spwd.addUser("alice", "wrong", 1, 2, 3, 4, 5, 6, 7)
490        spwd.addUser("bob", crypted("bob", "password"), 8, 9, 10, 11, 12, 13, 14)
491
492        self.patch(checkers, "pwd", pwd)
493        self.patch(checkers, "spwd", spwd)
494
495        mockos = MockOS()
496        self.patch(util, "os", mockos)
497
498        mockos.euid = 2345
499        mockos.egid = 1234
500
501        cred = UsernamePassword(b"alice", b"password")
502        self.assertLoggedIn(checker.requestAvatarId(cred), b"alice")
503        self.assertEqual(mockos.seteuidCalls, [])
504        self.assertEqual(mockos.setegidCalls, [])
505        cred.username = b"bob"
506        self.assertLoggedIn(checker.requestAvatarId(cred), b"bob")
507        self.assertEqual(mockos.seteuidCalls, [0, 2345])
508        self.assertEqual(mockos.setegidCalls, [0, 1234])
509
510    def assertUnauthorizedLogin(self, d):
511        """
512        Asserts that the L{Deferred} passed in is erred back with an
513        L{UnauthorizedLogin} L{Failure}.  This reprsents an invalid login for
514        this TestCase.
515
516        NOTE: To work, this method's return value must be returned from the
517        test method, or otherwise hooked up to the test machinery.
518
519        @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method.
520        @type d: L{Deferred}
521        @rtype: L{None}
522        """
523        self.assertRaises(
524            checkers.UnauthorizedLogin, self.assertLoggedIn, d, "bogus value"
525        )
526
527    def test_passInCheckers(self):
528        """
529        L{UNIXPasswordDatabase} takes a list of functions to check for UNIX
530        user information.
531        """
532        password = crypt.crypt("secret", "secret")
533        userdb = UserDatabase()
534        userdb.addUser("anybody", password, 1, 2, "foo", "/bar", "/bin/sh")
535        checker = checkers.UNIXPasswordDatabase([userdb.getpwnam])
536        self.assertLoggedIn(
537            checker.requestAvatarId(UsernamePassword(b"anybody", b"secret")), b"anybody"
538        )
539
540    def test_verifyPassword(self):
541        """
542        If the encrypted password provided by the getpwnam function is valid
543        (verified by the L{verifyCryptedPassword} function), we callback the
544        C{requestAvatarId} L{Deferred} with the username.
545        """
546
547        def verifyCryptedPassword(crypted, pw):
548            return crypted == pw
549
550        def getpwnam(username):
551            return [username, username]
552
553        self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword)
554        checker = checkers.UNIXPasswordDatabase([getpwnam])
555        credential = UsernamePassword(b"username", b"username")
556        self.assertLoggedIn(checker.requestAvatarId(credential), b"username")
557
558    def test_failOnKeyError(self):
559        """
560        If the getpwnam function raises a KeyError, the login fails with an
561        L{UnauthorizedLogin} exception.
562        """
563
564        def getpwnam(username):
565            raise KeyError(username)
566
567        checker = checkers.UNIXPasswordDatabase([getpwnam])
568        credential = UsernamePassword(b"username", b"username")
569        self.assertUnauthorizedLogin(checker.requestAvatarId(credential))
570
571    def test_failOnBadPassword(self):
572        """
573        If the verifyCryptedPassword function doesn't verify the password, the
574        login fails with an L{UnauthorizedLogin} exception.
575        """
576
577        def verifyCryptedPassword(crypted, pw):
578            return False
579
580        def getpwnam(username):
581            return [username, username]
582
583        self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword)
584        checker = checkers.UNIXPasswordDatabase([getpwnam])
585        credential = UsernamePassword(b"username", b"username")
586        self.assertUnauthorizedLogin(checker.requestAvatarId(credential))
587
588    def test_loopThroughFunctions(self):
589        """
590        UNIXPasswordDatabase.requestAvatarId loops through each getpwnam
591        function associated with it and returns a L{Deferred} which fires with
592        the result of the first one which returns a value other than None.
593        ones do not verify the password.
594        """
595
596        def verifyCryptedPassword(crypted, pw):
597            return crypted == pw
598
599        def getpwnam1(username):
600            return [username, "not the password"]
601
602        def getpwnam2(username):
603            return [username, username]
604
605        self.patch(checkers, "verifyCryptedPassword", verifyCryptedPassword)
606        checker = checkers.UNIXPasswordDatabase([getpwnam1, getpwnam2])
607        credential = UsernamePassword(b"username", b"username")
608        self.assertLoggedIn(checker.requestAvatarId(credential), b"username")
609
610    def test_failOnSpecial(self):
611        """
612        If the password returned by any function is C{""}, C{"x"}, or C{"*"} it
613        is not compared against the supplied password.  Instead it is skipped.
614        """
615        pwd = UserDatabase()
616        pwd.addUser("alice", "", 1, 2, "", "foo", "bar")
617        pwd.addUser("bob", "x", 1, 2, "", "foo", "bar")
618        pwd.addUser("carol", "*", 1, 2, "", "foo", "bar")
619        self.patch(checkers, "pwd", pwd)
620
621        checker = checkers.UNIXPasswordDatabase([checkers._pwdGetByName])
622        cred = UsernamePassword(b"alice", b"")
623        self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
624
625        cred = UsernamePassword(b"bob", b"x")
626        self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
627
628        cred = UsernamePassword(b"carol", b"*")
629        self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
630
631
632class AuthorizedKeyFileReaderTests(TestCase):
633    """
634    Tests for L{checkers.readAuthorizedKeyFile}
635    """
636
637    skip = dependencySkip
638
639    def test_ignoresComments(self):
640        """
641        L{checkers.readAuthorizedKeyFile} does not attempt to turn comments
642        into keys
643        """
644        fileobj = BytesIO(
645            b"# this comment is ignored\n"
646            b"this is not\n"
647            b"# this is again\n"
648            b"and this is not"
649        )
650        result = checkers.readAuthorizedKeyFile(fileobj, lambda x: x)
651        self.assertEqual([b"this is not", b"and this is not"], list(result))
652
653    def test_ignoresLeadingWhitespaceAndEmptyLines(self):
654        """
655        L{checkers.readAuthorizedKeyFile} ignores leading whitespace in
656        lines, as well as empty lines
657        """
658        fileobj = BytesIO(
659            b"""
660                           # ignore
661                           not ignored
662                           """
663        )
664        result = checkers.readAuthorizedKeyFile(fileobj, parseKey=lambda x: x)
665        self.assertEqual([b"not ignored"], list(result))
666
667    def test_ignoresUnparsableKeys(self):
668        """
669        L{checkers.readAuthorizedKeyFile} does not raise an exception
670        when a key fails to parse (raises a
671        L{twisted.conch.ssh.keys.BadKeyError}), but rather just keeps going
672        """
673
674        def failOnSome(line):
675            if line.startswith(b"f"):
676                raise keys.BadKeyError("failed to parse")
677            return line
678
679        fileobj = BytesIO(b"failed key\ngood key")
680        result = checkers.readAuthorizedKeyFile(fileobj, parseKey=failOnSome)
681        self.assertEqual([b"good key"], list(result))
682
683
684class InMemorySSHKeyDBTests(TestCase):
685    """
686    Tests for L{checkers.InMemorySSHKeyDB}
687    """
688
689    skip = dependencySkip
690
691    def test_implementsInterface(self):
692        """
693        L{checkers.InMemorySSHKeyDB} implements
694        L{checkers.IAuthorizedKeysDB}
695        """
696        keydb = checkers.InMemorySSHKeyDB({b"alice": [b"key"]})
697        verifyObject(checkers.IAuthorizedKeysDB, keydb)
698
699    def test_noKeysForUnauthorizedUser(self):
700        """
701        If the user is not in the mapping provided to
702        L{checkers.InMemorySSHKeyDB}, an empty iterator is returned
703        by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys}
704        """
705        keydb = checkers.InMemorySSHKeyDB({b"alice": [b"keys"]})
706        self.assertEqual([], list(keydb.getAuthorizedKeys(b"bob")))
707
708    def test_allKeysForAuthorizedUser(self):
709        """
710        If the user is in the mapping provided to
711        L{checkers.InMemorySSHKeyDB}, an iterator with all the keys
712        is returned by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys}
713        """
714        keydb = checkers.InMemorySSHKeyDB({b"alice": [b"a", b"b"]})
715        self.assertEqual([b"a", b"b"], list(keydb.getAuthorizedKeys(b"alice")))
716
717
718class UNIXAuthorizedKeysFilesTests(TestCase):
719    """
720    Tests for L{checkers.UNIXAuthorizedKeysFiles}.
721    """
722
723    skip = dependencySkip
724
725    def setUp(self):
726        mockos = MockOS()
727        mockos.path = FilePath(self.mktemp())
728        mockos.path.makedirs()
729
730        self.userdb = UserDatabase()
731        self.userdb.addUser(
732            b"alice",
733            b"password",
734            1,
735            2,
736            b"alice lastname",
737            mockos.path.path,
738            b"/bin/shell",
739        )
740
741        self.sshDir = mockos.path.child(".ssh")
742        self.sshDir.makedirs()
743        authorizedKeys = self.sshDir.child("authorized_keys")
744        authorizedKeys.setContent(b"key 1\nkey 2")
745
746        self.expectedKeys = [b"key 1", b"key 2"]
747
748    def test_implementsInterface(self):
749        """
750        L{checkers.UNIXAuthorizedKeysFiles} implements
751        L{checkers.IAuthorizedKeysDB}.
752        """
753        keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb)
754        verifyObject(checkers.IAuthorizedKeysDB, keydb)
755
756    def test_noKeysForUnauthorizedUser(self):
757        """
758        If the user is not in the user database provided to
759        L{checkers.UNIXAuthorizedKeysFiles}, an empty iterator is returned
760        by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}.
761        """
762        keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x)
763        self.assertEqual([], list(keydb.getAuthorizedKeys("bob")))
764
765    def test_allKeysInAllAuthorizedFilesForAuthorizedUser(self):
766        """
767        If the user is in the user database provided to
768        L{checkers.UNIXAuthorizedKeysFiles}, an iterator with all the keys in
769        C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2} is returned
770        by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}.
771        """
772        self.sshDir.child("authorized_keys2").setContent(b"key 3")
773        keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x)
774        self.assertEqual(
775            self.expectedKeys + [b"key 3"], list(keydb.getAuthorizedKeys(b"alice"))
776        )
777
778    def test_ignoresNonexistantFile(self):
779        """
780        L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only
781        the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2}
782        if they exist.
783        """
784        keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x)
785        self.assertEqual(self.expectedKeys, list(keydb.getAuthorizedKeys(b"alice")))
786
787    def test_ignoresUnreadableFile(self):
788        """
789        L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only
790        the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2}
791        if they are readable.
792        """
793        self.sshDir.child("authorized_keys2").makedirs()
794        keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb, parseKey=lambda x: x)
795        self.assertEqual(self.expectedKeys, list(keydb.getAuthorizedKeys(b"alice")))
796
797
798_KeyDB = namedtuple("_KeyDB", ["getAuthorizedKeys"])
799
800
801class _DummyException(Exception):
802    """
803    Fake exception to be used for testing.
804    """
805
806    pass
807
808
809class SSHPublicKeyCheckerTests(TestCase):
810    """
811    Tests for L{checkers.SSHPublicKeyChecker}.
812    """
813
814    skip = dependencySkip
815
816    def setUp(self):
817        self.credentials = SSHPrivateKey(
818            b"alice",
819            b"ssh-rsa",
820            keydata.publicRSA_openssh,
821            b"foo",
822            keys.Key.fromString(keydata.privateRSA_openssh).sign(b"foo"),
823        )
824        self.keydb = _KeyDB(lambda _: [keys.Key.fromString(keydata.publicRSA_openssh)])
825        self.checker = checkers.SSHPublicKeyChecker(self.keydb)
826
827    def test_credentialsWithoutSignature(self):
828        """
829        Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
830        credentials that do not have a signature fails with L{ValidPublicKey}.
831        """
832        self.credentials.signature = None
833        self.failureResultOf(
834            self.checker.requestAvatarId(self.credentials), ValidPublicKey
835        )
836
837    def test_credentialsWithBadKey(self):
838        """
839        Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
840        credentials that have a bad key fails with L{keys.BadKeyError}.
841        """
842        self.credentials.blob = b""
843        self.failureResultOf(
844            self.checker.requestAvatarId(self.credentials), keys.BadKeyError
845        )
846
847    def test_credentialsNoMatchingKey(self):
848        """
849        If L{checkers.IAuthorizedKeysDB.getAuthorizedKeys} returns no keys
850        that match the credentials,
851        L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with
852        L{UnauthorizedLogin}.
853        """
854        self.credentials.blob = keydata.publicDSA_openssh
855        self.failureResultOf(
856            self.checker.requestAvatarId(self.credentials), UnauthorizedLogin
857        )
858
859    def test_credentialsInvalidSignature(self):
860        """
861        Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
862        credentials that are incorrectly signed fails with
863        L{UnauthorizedLogin}.
864        """
865        self.credentials.signature = keys.Key.fromString(
866            keydata.privateDSA_openssh
867        ).sign(b"foo")
868        self.failureResultOf(
869            self.checker.requestAvatarId(self.credentials), UnauthorizedLogin
870        )
871
872    def test_failureVerifyingKey(self):
873        """
874        If L{keys.Key.verify} raises an exception,
875        L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with
876        L{UnauthorizedLogin}.
877        """
878
879        def fail(*args, **kwargs):
880            raise _DummyException()
881
882        self.patch(keys.Key, "verify", fail)
883
884        self.failureResultOf(
885            self.checker.requestAvatarId(self.credentials), UnauthorizedLogin
886        )
887        self.flushLoggedErrors(_DummyException)
888
889    def test_usernameReturnedOnSuccess(self):
890        """
891        L{checker.SSHPublicKeyChecker.requestAvatarId}, if successful,
892        callbacks with the username.
893        """
894        d = self.checker.requestAvatarId(self.credentials)
895        self.assertEqual(b"alice", self.successResultOf(d))
896