1# Copyright (c) 2016-2020 by Ron Frederick <ronf@timeheart.net> and others.
2#
3# This program and the accompanying materials are made available under
4# the terms of the Eclipse Public License v2.0 which accompanies this
5# distribution and is available at:
6#
7#     http://www.eclipse.org/legal/epl-2.0/
8#
9# This program may also be made available under the following secondary
10# licenses when the conditions for such availability set forth in the
11# Eclipse Public License v2.0 are satisfied:
12#
13#    GNU General Public License, Version 2.0, or any later versions of
14#    that license
15#
16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
17#
18# Contributors:
19#     Ron Frederick - initial implementation, API, and documentation
20
21"""Unit tests for AsyncSSH connection authentication"""
22
23import asyncio
24import os
25import unittest
26
27from unittest.mock import patch
28
29import asyncssh
30from asyncssh.misc import async_context_manager, write_file
31from asyncssh.packet import String
32from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST
33
34from .keysign_stub import create_subprocess_exec_stub
35from .server import Server, ServerTestCase
36from .util import asynctest, gss_available, patch_getnameinfo, patch_gss
37from .util import make_certificate, x509_available
38
39
40class _FailValidateHostSSHServerConnection(asyncssh.SSHServerConnection):
41    """Test error in validating host key signature"""
42
43    async def validate_host_based_auth(self, username, key_data, client_host,
44                                       client_username, msg, signature):
45        """Validate host based authentication for the specified host and user"""
46
47        return await super().validate_host_based_auth(username, key_data,
48                                                      client_host,
49                                                      client_username,
50                                                      msg + b'\xff', signature)
51
52
53class _AsyncGSSServer(asyncssh.SSHServer):
54    """Server for testing async GSS authentication"""
55
56    # pylint: disable=useless-super-delegation
57
58    async def validate_gss_principal(self, username, user_principal,
59                                     host_principal):
60        """Return whether password is valid for this user"""
61
62        return super().validate_gss_principal(username, user_principal,
63                                              host_principal)
64
65
66class _NullServer(Server):
67    """Server for testing disabled auth"""
68
69    async def begin_auth(self, username):
70        """Handle client authentication request"""
71
72        return False
73
74
75class _HostBasedServer(Server):
76    """Server for testing host-based authentication"""
77
78    def __init__(self, host_key=None, ca_key=None):
79        super().__init__()
80
81        self._host_key = \
82            asyncssh.read_public_key(host_key) if host_key else None
83        self._ca_key = \
84            asyncssh.read_public_key(ca_key) if ca_key else None
85
86    def host_based_auth_supported(self):
87        """Return whether or not host based authentication is supported"""
88
89        return True
90
91    def validate_host_public_key(self, client_host, client_addr,
92                                 client_port, key):
93        """Return whether key is an authorized key for this host"""
94
95        # pylint: disable=unused-argument
96
97        return key == self._host_key
98
99    def validate_host_ca_key(self, client_host, client_addr, client_port, key):
100        """Return whether key is an authorized CA key for this host"""
101
102        # pylint: disable=unused-argument
103
104        return key == self._ca_key
105
106    def validate_host_based_user(self, username, client_host, client_username):
107        """Return whether remote host and user is authorized for this user"""
108
109        # pylint: disable=unused-argument
110
111        return client_username == 'user'
112
113
114class _AsyncHostBasedServer(Server):
115    """Server for testing async host-based authentication"""
116
117    # pylint: disable=useless-super-delegation
118
119    async def validate_host_based_user(self, username, client_host,
120                                       client_username):
121        """Return whether remote host and user is authorized for this user"""
122
123        return super().validate_host_based_user(username, client_host,
124                                                client_username)
125
126
127class _InvalidUsernameClientConnection(asyncssh.connection.SSHClientConnection):
128    """Test sending a client username with invalid Unicode to the server"""
129
130    async def host_based_auth_requested(self):
131        """Return a host key pair, host, and user to authenticate with"""
132
133        keypair, host, _ = await super().host_based_auth_requested()
134
135        return keypair, host, b'\xff'
136
137
138class _PublicKeyClient(asyncssh.SSHClient):
139    """Test client public key authentication"""
140
141    def __init__(self, keylist, delay=0):
142        self._keylist = keylist
143        self._delay = delay
144
145    async def public_key_auth_requested(self):
146        """Return a public key to authenticate with"""
147
148        if self._delay:
149            await asyncio.sleep(self._delay)
150
151        return self._keylist.pop(0) if self._keylist else None
152
153
154class _AsyncPublicKeyClient(_PublicKeyClient):
155    """Test async client public key authentication"""
156
157    # pylint: disable=useless-super-delegation
158
159    async def public_key_auth_requested(self):
160        """Return a public key to authenticate with"""
161
162        return await super().public_key_auth_requested()
163
164
165class _PublicKeyServer(Server):
166    """Server for testing public key authentication"""
167
168    def __init__(self, client_keys=(), authorized_keys=None, delay=0):
169        super().__init__()
170        self._client_keys = client_keys
171        self._authorized_keys = authorized_keys
172        self._delay = delay
173
174    def connection_made(self, conn):
175        """Called when a connection is made"""
176
177        super().connection_made(conn)
178        conn.send_auth_banner('auth banner')
179
180    async def begin_auth(self, username):
181        """Handle client authentication request"""
182
183        if self._authorized_keys:
184            self._conn.set_authorized_keys(self._authorized_keys)
185        else:
186            self._client_keys = asyncssh.load_public_keys(self._client_keys)
187
188        if self._delay:
189            await asyncio.sleep(self._delay)
190
191        return True
192
193    def public_key_auth_supported(self):
194        """Return whether or not public key authentication is supported"""
195
196        return True
197
198    def validate_public_key(self, username, key):
199        """Return whether key is an authorized client key for this user"""
200
201        return key in self._client_keys
202
203    def validate_ca_key(self, username, key):
204        """Return whether key is an authorized CA key for this user"""
205
206        return key in self._client_keys
207
208
209class _AsyncPublicKeyServer(_PublicKeyServer):
210    """Server for testing async public key authentication"""
211
212    # pylint: disable=useless-super-delegation
213
214    async def begin_auth(self, username):
215        """Handle client authentication request"""
216
217        return await super().begin_auth(username)
218
219    async def validate_public_key(self, username, key):
220        """Return whether key is an authorized client key for this user"""
221
222        return super().validate_public_key(username, key)
223
224    async def validate_ca_key(self, username, key):
225        """Return whether key is an authorized CA key for this user"""
226
227        return super().validate_ca_key(username, key)
228
229
230class _PasswordClient(asyncssh.SSHClient):
231    """Test client password authentication"""
232
233    def __init__(self, password, old_password, new_password):
234        self._password = password
235        self._old_password = old_password
236        self._new_password = new_password
237
238    def password_auth_requested(self):
239        """Return a password to authenticate with"""
240
241        if self._password:
242            result = self._password
243            self._password = None
244            return result
245        else:
246            return None
247
248    def password_change_requested(self, prompt, lang):
249        """Change the client's password"""
250
251        return self._old_password, self._new_password
252
253
254class _AsyncPasswordClient(_PasswordClient):
255    """Test async client password authentication"""
256
257    # pylint: disable=useless-super-delegation
258
259    async def password_auth_requested(self):
260        """Return a password to authenticate with"""
261
262        return super().password_auth_requested()
263
264    async def password_change_requested(self, prompt, lang):
265        """Change the client's password"""
266
267        return super().password_change_requested(prompt, lang)
268
269
270class _PasswordServer(Server):
271    """Server for testing password authentication"""
272
273    def password_auth_supported(self):
274        """Enable password authentication"""
275
276        return True
277
278    def validate_password(self, username, password):
279        """Accept password of pw, trigger password change on oldpw"""
280
281        if password == 'oldpw':
282            raise asyncssh.PasswordChangeRequired('Password change required')
283        else:
284            return password == 'pw'
285
286    def change_password(self, username, old_password, new_password):
287        """Only allow password change from password oldpw"""
288
289        return old_password == 'oldpw'
290
291
292class _AsyncPasswordServer(_PasswordServer):
293    """Server for testing async password authentication"""
294
295    # pylint: disable=useless-super-delegation
296
297    async def validate_password(self, username, password):
298        """Return whether password is valid for this user"""
299
300        return super().validate_password(username, password)
301
302    async def change_password(self, username, old_password, new_password):
303        """Handle a request to change a user's password"""
304
305        return super().change_password(username, old_password, new_password)
306
307
308class _KbdintClient(asyncssh.SSHClient):
309    """Test keyboard-interactive client auth"""
310
311    def __init__(self, responses):
312        self._responses = responses
313
314    def kbdint_auth_requested(self):
315        """Return the list of supported keyboard-interactive auth methods"""
316
317        return '' if self._responses else None
318
319    def kbdint_challenge_received(self, name, instructions, lang, prompts):
320        """Return responses to a keyboard-interactive auth challenge"""
321
322        # pylint: disable=unused-argument
323
324        if not prompts:
325            return []
326        elif self._responses:
327            result = self._responses
328            self._responses = None
329            return result
330        else:
331            return None
332
333
334class _AsyncKbdintClient(_KbdintClient):
335    """Test keyboard-interactive client auth"""
336
337    # pylint: disable=useless-super-delegation
338
339    async def kbdint_auth_requested(self):
340        """Return the list of supported keyboard-interactive auth methods"""
341
342        return super().kbdint_auth_requested()
343
344    async def kbdint_challenge_received(self, name, instructions,
345                                        lang, prompts):
346        """Return responses to a keyboard-interactive auth challenge"""
347
348        return super().kbdint_challenge_received(name, instructions,
349                                                 lang, prompts)
350
351
352class _KbdintServer(Server):
353    """Server for testing keyboard-interactive authentication"""
354
355    def __init__(self):
356        super().__init__()
357        self._kbdint_round = 0
358
359    def kbdint_auth_supported(self):
360        """Enable keyboard-interactive authentication"""
361
362        return True
363
364    def get_kbdint_challenge(self, username, lang, submethods):
365        """Return an initial challenge with only instructions"""
366
367        return '', 'instructions', '', []
368
369    def validate_kbdint_response(self, username, responses):
370        """Return a password challenge after the instructions"""
371
372        if self._kbdint_round == 0:
373            if username == 'none':
374                result = ('', '', '', [])
375            elif username == 'pw':
376                result = ('', '', '', [('Password:', False)])
377            elif username == 'pc':
378                result = ('', '', '', [('Passcode:', False)])
379            elif username == 'multi':
380                result = ('', '', '', [('Prompt1:', True), ('Prompt2', True)])
381            else:
382                result = ('', '', '', [('Other Challenge:', False)])
383        else:
384            if responses in ([], ['kbdint'], ['1', '2']):
385                result = True
386            else:
387                result = ('', '', '', [('Second Challenge:', True)])
388
389        self._kbdint_round += 1
390        return result
391
392
393class _AsyncKbdintServer(_KbdintServer):
394    """Server for testing async keyboard-interactive authentication"""
395
396    # pylint: disable=useless-super-delegation
397
398    async def get_kbdint_challenge(self, username, lang, submethods):
399        """Return a keyboard-interactive auth challenge"""
400
401        return super().get_kbdint_challenge(username, lang, submethods)
402
403    async def validate_kbdint_response(self, username, responses):
404        """Return whether the keyboard-interactive response is valid
405           for this user"""
406
407        return super().validate_kbdint_response(username, responses)
408
409
410class _UnknownAuthClientConnection(asyncssh.connection.SSHClientConnection):
411    """Test getting back an unknown auth method from the SSH server"""
412
413    def try_next_auth(self):
414        """Attempt client authentication using an unknown method"""
415
416        self._auth_methods = [b'unknown'] + self._auth_methods
417        super().try_next_auth()
418
419
420class _TestNullAuth(ServerTestCase):
421    """Unit tests for testing disabled authentication"""
422
423    @classmethod
424    async def start_server(cls):
425        """Start an SSH server which supports disabled authentication"""
426
427        return await cls.create_server(_NullServer)
428
429    @asynctest
430    async def test_disabled_auth(self):
431        """Test disabled authentication"""
432
433        async with self.connect(username='user'):
434            pass
435
436    @asynctest
437    async def test_disabled_trivial_auth(self):
438        """Test disabling trivial auth with no authentication"""
439
440        with self.assertRaises(asyncssh.PermissionDenied):
441            await self.connect(username='user', disable_trivial_auth=True)
442
443
444@unittest.skipUnless(gss_available, 'GSS not available')
445@patch_gss
446class _TestGSSAuth(ServerTestCase):
447    """Unit tests for GSS authentication"""
448
449    @classmethod
450    async def start_server(cls):
451        """Start an SSH server which supports GSS authentication"""
452
453        return await cls.create_server(_AsyncGSSServer, gss_host='1')
454
455    @asynctest
456    async def test_gss_kex_auth(self):
457        """Test GSS key exchange authentication"""
458
459        async with self.connect(kex_algs=['gss-gex-sha256'],
460                                username='user', gss_host='1'):
461            pass
462
463    @asynctest
464    async def test_gss_mic_auth(self):
465        """Test GSS MIC authentication"""
466
467        async with self.connect(kex_algs=['ecdh-sha2-nistp256'],
468                                username='user', gss_host='1'):
469            pass
470
471    @asynctest
472    async def test_gss_delegate(self):
473        """Test GSS credential delegation"""
474
475        async with self.connect(username='user', gss_host='1',
476                                gss_delegate_creds=True):
477            pass
478
479    @asynctest
480    async def test_gss_kex_disabled(self):
481        """Test GSS key exchange being disabled"""
482
483        with self.assertRaises(asyncssh.PermissionDenied):
484            await self.connect(username='user', gss_host=(), gss_kex=False,
485                               preferred_auth='gssapi-keyex')
486
487    @asynctest
488    async def test_gss_auth_disabled(self):
489        """Test GSS authentication being disabled"""
490
491        with self.assertRaises(asyncssh.PermissionDenied):
492            await self.connect(username='user', gss_host=(), gss_auth=False)
493
494    @asynctest
495    async def test_gss_auth_unavailable(self):
496        """Test GSS authentication being unavailable"""
497
498        with self.assertRaises(asyncssh.PermissionDenied):
499            await self.connect(username='user1', gss_host=())
500
501    @asynctest
502    async def test_gss_client_error(self):
503        """Test GSS client error"""
504
505        with self.assertRaises(asyncssh.PermissionDenied):
506            await self.connect(gss_host='1,init_error', username='user')
507
508    @asynctest
509    async def test_disabled_trivial_gss_kex_auth(self):
510        """Test disabling trivial auth with GSS key exchange authentication"""
511
512        async with self.connect(kex_algs=['gss-gex-sha256'],
513                                username='user', gss_host='1',
514                                disable_trivial_auth=True):
515            pass
516
517    @asynctest
518    async def test_disabled_trivial_gss_mic_auth(self):
519        """Test disabling trivial auth with GSS MIC authentication"""
520
521        async with self.connect(kex_algs=['ecdh-sha2-nistp256'],
522                                username='user', gss_host='1',
523                                disable_trivial_auth=True):
524            pass
525
526
527@unittest.skipUnless(gss_available, 'GSS not available')
528@patch_gss
529class _TestGSSServerAuthDisabled(ServerTestCase):
530    """Unit tests for with GSS key exchange and auth disabled on server"""
531
532    @classmethod
533    async def start_server(cls):
534        """Start an SSH server with GSS key exchange and auth disabled"""
535
536        return await cls.create_server(gss_host='1', gss_kex=False,
537                                       gss_auth=False)
538
539    @asynctest
540    async def test_gss_kex_unavailable(self):
541        """Test GSS key exchange being unavailable"""
542
543        with self.assertRaises(asyncssh.PermissionDenied):
544            await self.connect(username='user', gss_host=(),
545                               preferred_auth='gssapi-keyex')
546
547    @asynctest
548    async def test_gss_auth_unavailable(self):
549        """Test GSS authentication being unavailable"""
550
551        with self.assertRaises(asyncssh.PermissionDenied):
552            await self.connect(username='user', gss_host=(),
553                               preferred_auth='gssapi-with-mic')
554
555
556
557@unittest.skipUnless(gss_available, 'GSS not available')
558@patch_gss
559class _TestGSSServerError(ServerTestCase):
560    """Unit tests for GSS server error"""
561
562    @classmethod
563    async def start_server(cls):
564        """Start an SSH server which raises an error on GSS authentication"""
565
566        return await cls.create_server(gss_host='1,init_error')
567
568    @asynctest
569    async def test_gss_server_error(self):
570        """Test GSS error on server"""
571
572        with self.assertRaises(asyncssh.PermissionDenied):
573            await self.connect(username='user')
574
575
576@unittest.skipUnless(gss_available, 'GSS not available')
577@patch_gss
578class _TestGSSFQDN(ServerTestCase):
579    """Unit tests for GSS server error"""
580
581    @classmethod
582    async def start_server(cls):
583        """Start an SSH server which raises an error on GSS authentication"""
584
585        def mock_gethostname():
586            """Return a non-fully-qualified hostname"""
587
588            return 'host'
589
590        def mock_getfqdn():
591            """Confirm getfqdn is called on relative hostnames"""
592
593            return '1'
594
595        with patch('socket.gethostname', mock_gethostname):
596            with patch('socket.getfqdn', mock_getfqdn):
597                return await cls.create_server(gss_host=())
598
599    @asynctest
600    async def test_gss_fqdn_lookup(self):
601        """Test GSS FQDN lookup"""
602
603        async with self.connect(username='user', gss_host=()):
604            pass
605
606
607@patch_getnameinfo
608class _TestHostBasedAuth(ServerTestCase):
609    """Unit tests for host-based authentication"""
610
611    @classmethod
612    async def start_server(cls):
613        """Start an SSH server which supports host-based authentication"""
614
615        return await cls.create_server(
616            _HostBasedServer, known_client_hosts='known_hosts')
617
618    @asynctest
619    async def test_client_host_auth(self):
620        """Test connecting with host-based authentication"""
621
622        async with self.connect(username='user', client_host_keys='skey',
623                                client_username='user'):
624            pass
625
626    @asynctest
627    async def test_client_host_auth_disabled(self):
628        """Test connecting with host-based authentication disabled"""
629
630        with self.assertRaises(asyncssh.PermissionDenied):
631            await self.connect(username='user', client_host_keys='skey',
632                               client_username='user', host_based_auth=False)
633
634    @asynctest
635    async def test_client_host_key_bytes(self):
636        """Test client host key passed in as bytes"""
637
638        with open('skey', 'rb') as f:
639            skey = f.read()
640
641        async with self.connect(username='user', client_host_keys=[skey],
642                                client_username='user'):
643            pass
644
645    @asynctest
646    async def test_client_host_key_sshkey(self):
647        """Test client host key passed in as an SSHKey"""
648
649        skey = asyncssh.read_private_key('skey')
650
651        async with self.connect(username='user', client_host_keys=[skey],
652                                client_username='user'):
653            pass
654
655    @asynctest
656    async def test_client_host_key_keypairs(self):
657        """Test client host keys passed in as a list of SSHKeyPairs"""
658
659        keys = asyncssh.load_keypairs('skey')
660
661        async with self.connect(username='user', client_host_keys=keys,
662                                client_username='user'):
663            pass
664
665    @asynctest
666    async def test_client_host_signature_algs(self):
667        """Test host based authentication with specific signature algorithms"""
668
669        for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'):
670            async with self.connect(username='user', client_host_keys='skey',
671                                    client_username='user',
672                                    signature_algs=[alg]):
673                pass
674
675    @asynctest
676    async def test_no_server_signature_algs(self):
677        """Test a server which doesn't advertise signature algorithms"""
678
679        def skip_ext_info(self):
680            """Don't send extension information"""
681
682            # pylint: disable=unused-argument
683
684            return []
685
686        with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg',
687                   skip_ext_info):
688            async with self.connect(username='user', client_host_keys='skey',
689                                    client_username='user'):
690                pass
691
692    @asynctest
693    async def test_untrusted_client_host_key(self):
694        """Test untrusted client host key"""
695
696        with self.assertRaises(asyncssh.PermissionDenied):
697            await self.connect(username='user', client_host_keys='ckey',
698                               client_username='user')
699
700    @asynctest
701    async def test_missing_cert(self):
702        """Test missing client host certificate"""
703
704        with self.assertRaises(OSError):
705            await self.connect(username='user',
706                               client_host_keys=[('skey', 'xxx')],
707                               client_username='user')
708
709    @asynctest
710    async def test_invalid_client_host_signature(self):
711        """Test invalid client host signature"""
712
713        with patch('asyncssh.connection.SSHServerConnection',
714                   _FailValidateHostSSHServerConnection):
715            with self.assertRaises(asyncssh.PermissionDenied):
716                await self.connect(username='user', client_host_keys='skey',
717                                   client_username='user')
718
719    @asynctest
720    async def test_client_host_trailing_dot(self):
721        """Test stripping of trailing dot from client host"""
722
723        async with self.connect(username='user', client_host_keys='skey',
724                                client_host='localhost.',
725                                client_username='user'):
726            pass
727
728    @asynctest
729    async def test_mismatched_client_host(self):
730        """Test ignoring of mismatched client host due to canonicalization"""
731
732        async with self.connect(username='user', client_host_keys='skey',
733                                client_host='xxx', client_username='user'):
734            pass
735
736    @asynctest
737    async def test_mismatched_client_username(self):
738        """Test mismatched client username"""
739
740        with self.assertRaises(asyncssh.PermissionDenied):
741            await self.connect(username='user', client_host_keys='skey',
742                               client_username='xxx')
743
744    @asynctest
745    async def test_invalid_client_username(self):
746        """Test invalid client username"""
747
748        with patch('asyncssh.connection.SSHClientConnection',
749                   _InvalidUsernameClientConnection):
750            with self.assertRaises(asyncssh.ProtocolError):
751                await self.connect(username='user', client_host_keys='skey')
752
753    @asynctest
754    async def test_expired_cert(self):
755        """Test expired certificate"""
756
757        ckey = asyncssh.read_private_key('ckey')
758        skey = asyncssh.read_private_key('skey')
759
760        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
761                                CERT_TYPE_HOST, ckey, skey, ['localhost'],
762                                valid_before=1)
763
764        with self.assertRaises(asyncssh.PermissionDenied):
765            await self.connect(username='user', client_host_keys=[(ckey, cert)],
766                               client_username='user')
767
768    @asynctest
769    async def test_untrusted_ca(self):
770        """Test untrusted CA"""
771
772        ckey = asyncssh.read_private_key('ckey')
773
774        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
775                                CERT_TYPE_HOST, ckey, ckey, ['localhost'])
776
777        with self.assertRaises(asyncssh.PermissionDenied):
778            await self.connect(username='user', client_host_keys=[(ckey, cert)],
779                               client_username='user')
780
781    @asynctest
782    async def test_disabled_trivial_client_host_auth(self):
783        """Test disabling trivial auth with host-based authentication"""
784
785        with self.assertRaises(asyncssh.PermissionDenied):
786            await self.connect(username='user', client_host_keys='skey',
787                               client_username='user',
788                               disable_trivial_auth=True)
789
790
791@patch_getnameinfo
792class _TestCallbackHostBasedAuth(ServerTestCase):
793    """Unit tests for host-based authentication using callback"""
794
795    @classmethod
796    async def start_server(cls):
797        """Start an SSH server which supports host-based authentication"""
798
799        def server_factory():
800            """Return an SSHServer which can validate the client host key"""
801
802            return _HostBasedServer(host_key='skey.pub', ca_key='skey.pub')
803
804        return await cls.create_server(server_factory)
805
806    @asynctest
807    async def test_validate_client_host_callback(self):
808        """Test using callback to validate client host key"""
809
810        async with self.connect(username='user',
811                                client_host_keys=[('skey', None)],
812                                client_username='user'):
813            pass
814
815    @asynctest
816    async def test_validate_client_host_ca_callback(self):
817        """Test using callback to validate client host CA key"""
818
819        async with self.connect(username='user', client_host_keys='skey',
820                                client_username='user'):
821            pass
822
823    @asynctest
824    async def test_untrusted_client_host_callback(self):
825        """Test callback to validate client host key returning failure"""
826
827        with self.assertRaises(asyncssh.PermissionDenied):
828            await self.connect(username='user',
829                               client_host_keys=[('ckey', None)],
830                               client_username='user')
831
832    @asynctest
833    async def test_untrusted_client_host_ca_callback(self):
834        """Test callback to validate client host CA key returning failure"""
835
836        with self.assertRaises(asyncssh.PermissionDenied):
837            await self.connect(username='user', client_host_keys='ckey',
838                               client_username='user')
839
840
841@patch_getnameinfo
842class _TestKeysignHostBasedAuth(ServerTestCase):
843    """Unit tests for host-based authentication using ssh-keysign"""
844
845    @classmethod
846    async def start_server(cls):
847        """Start an SSH server which supports host-based authentication"""
848
849        return await cls.create_server(_HostBasedServer,
850                                       known_client_hosts='known_hosts')
851
852    @async_context_manager
853    async def _connect_keysign(self, client_host_keysign=True,
854                               client_host_keys=None, keysign_dirs=('.',)):
855        """Open a connection to test host-based auth using ssh-keysign"""
856
857        with patch('asyncio.create_subprocess_exec',
858                   create_subprocess_exec_stub):
859            with patch('asyncssh.keysign._DEFAULT_KEYSIGN_DIRS', keysign_dirs):
860                with patch('asyncssh.public_key._DEFAULT_HOST_KEY_DIRS', ['.']):
861                    with patch('asyncssh.public_key._DEFAULT_HOST_KEY_FILES',
862                               ['skey', 'xxx']):
863                        return await self.connect(
864                            username='user',
865                            client_host_keysign=client_host_keysign,
866                            client_host_keys=client_host_keys,
867                            client_username='user')
868
869    @asynctest
870    async def test_keysign(self):
871        """Test host-based authentication using ssh-keysign"""
872
873        async with self._connect_keysign():
874            pass
875
876    @asynctest
877    async def test_explciit_keysign(self):
878        """Test ssh-keysign with an explicit path"""
879
880        async with self._connect_keysign(client_host_keysign='.'):
881            pass
882
883    @asynctest
884    async def test_keysign_explicit_host_keys(self):
885        """Test ssh-keysign with explicit host public keys"""
886
887        async with self._connect_keysign(client_host_keys='skey.pub'):
888            pass
889
890    @asynctest
891    async def test_invalid_keysign_response(self):
892        """Test invalid ssh-keysign response"""
893
894        with patch('asyncssh.keysign.KEYSIGN_VERSION', 0):
895            with self.assertRaises(asyncssh.PermissionDenied):
896                await self._connect_keysign()
897
898    @asynctest
899    async def test_keysign_error(self):
900        """Test ssh-keysign error response"""
901
902        with patch('asyncssh.keysign.KEYSIGN_VERSION', 1):
903            with self.assertRaises(asyncssh.PermissionDenied):
904                await self._connect_keysign()
905
906    @asynctest
907    async def test_invalid_keysign_version(self):
908        """Test invalid version in ssh-keysign request"""
909
910        with patch('asyncssh.keysign.KEYSIGN_VERSION', 99):
911            with self.assertRaises(asyncssh.PermissionDenied):
912                await self._connect_keysign()
913
914    @asynctest
915    async def test_keysign_not_found(self):
916        """Test ssh-keysign executable not being found"""
917
918        with self.assertRaises(ValueError):
919            await self._connect_keysign(keysign_dirs=())
920
921    @asynctest
922    async def test_explicit_keysign_not_found(self):
923        """Test explicit ssh-keysign executable not being found"""
924
925        with self.assertRaises(ValueError):
926            await self._connect_keysign(client_host_keysign='xxx')
927
928    @asynctest
929    async def test_keysign_dir_not_present(self):
930        """Test ssh-keysign executable not in a keysign dir"""
931
932        with self.assertRaises(ValueError):
933            await self._connect_keysign(keysign_dirs=('xxx',))
934
935
936@patch_getnameinfo
937class _TestHostBasedAsyncServerAuth(_TestHostBasedAuth):
938    """Unit tests for host-based authentication with async server callbacks"""
939
940    @classmethod
941    async def start_server(cls):
942        """Start an SSH server which supports async host-based auth"""
943
944        return await cls.create_server(_AsyncHostBasedServer,
945                                       known_client_hosts='known_hosts',
946                                       trust_client_host=True)
947
948    @asynctest
949    async def test_mismatched_client_host(self):
950        """Test mismatch of trusted client host"""
951
952        with self.assertRaises(asyncssh.PermissionDenied):
953            await self.connect(username='user', client_host_keys='skey',
954                               client_host='xxx', client_username='user')
955
956
957@patch_getnameinfo
958class _TestLimitedHostBasedSignatureAlgs(ServerTestCase):
959    """Unit tests for limited host key signature algorithms"""
960
961    @classmethod
962    async def start_server(cls):
963        """Start an SSH server which supports host-based authentication"""
964
965        return await cls.create_server(
966            _HostBasedServer, known_client_hosts='known_hosts',
967            signature_algs=['ssh-rsa', 'rsa-sha2-512'])
968
969    @asynctest
970    async def test_mismatched_host_signature_algs(self):
971        """Test mismatched host key signature algorithms"""
972
973        with self.assertRaises(asyncssh.PermissionDenied):
974            await self.connect(username='ckey', client_host_keys='skey',
975                               client_username='user',
976                               signature_algs=['rsa-sha2-256'])
977
978    @asynctest
979    async def test_host_signature_alg_fallback(self):
980        """Test fall back to default host key signature algorithm"""
981
982        async with self.connect(username='ckey', client_host_keys='skey',
983                                client_username='user',
984                                signature_algs=['rsa-sha2-256', 'ssh-rsa']):
985            pass
986
987
988class _TestPublicKeyAuth(ServerTestCase):
989    """Unit tests for public key authentication"""
990
991    @classmethod
992    async def start_server(cls):
993        """Start an SSH server which supports public key authentication"""
994
995        return await cls.create_server(
996            _PublicKeyServer, authorized_client_keys='authorized_keys')
997
998    @async_context_manager
999    async def _connect_publickey(self, keylist, test_async=False):
1000        """Open a connection to test public key auth"""
1001
1002        def client_factory():
1003            """Return an SSHClient to use to do public key auth"""
1004
1005            cls = _AsyncPublicKeyClient if test_async else _PublicKeyClient
1006            return cls(keylist)
1007
1008        conn, _ = await self.create_connection(client_factory, username='ckey',
1009                                               client_keys=None)
1010
1011        return conn
1012
1013    @asynctest
1014    async def test_encrypted_client_key(self):
1015        """Test public key auth with encrypted client key"""
1016
1017        async with self.connect(username='ckey', client_keys='ckey_encrypted',
1018                                passphrase='passphrase'):
1019            pass
1020
1021    @asynctest
1022    async def test_encrypted_client_key_bad_passphrase(self):
1023        """Test wrong passphrase for encrypted client key"""
1024
1025        with self.assertRaises(asyncssh.KeyEncryptionError):
1026            await self.connect(username='ckey', client_keys='ckey_encrypted',
1027                               passphrase='xxx')
1028
1029    @asynctest
1030    async def test_encrypted_client_key_missing_passphrase(self):
1031        """Test missing passphrase for encrypted client key"""
1032
1033        with self.assertRaises(asyncssh.KeyImportError):
1034            await self.connect(username='ckey', client_keys='ckey_encrypted')
1035
1036    @asynctest
1037    async def test_client_certs(self):
1038        """Test trusted client certificate via client_certs"""
1039
1040        async with self.connect(username='ckey', client_keys='ckey',
1041                                client_certs='ckey-cert.pub'):
1042            pass
1043
1044    @asynctest
1045    async def test_agent_auth(self):
1046        """Test connecting with ssh-agent authentication"""
1047
1048        if not self.agent_available(): # pragma: no cover
1049            self.skipTest('ssh-agent not available')
1050
1051        async with self.connect(username='ckey'):
1052            pass
1053
1054    @asynctest
1055    async def test_agent_identities(self):
1056        """Test connecting with ssh-agent auth with specific identities"""
1057
1058        if not self.agent_available(): # pragma: no cover
1059            self.skipTest('ssh-agent not available')
1060
1061        ckey = asyncssh.read_private_key('ckey')
1062        ckey.write_private_key('ckey.pem', 'pkcs8-pem')
1063
1064        ckey_cert = asyncssh.read_certificate('ckey-cert.pub')
1065        ckey_ecdsa = asyncssh.read_public_key('ckey_ecdsa.pub')
1066
1067        for pubkey in ('ckey-cert.pub', 'ckey_ecdsa.pub', 'ckey.pem',
1068                       ckey_cert, ckey_ecdsa, ckey_ecdsa.public_data):
1069            async with self.connect(username='ckey', agent_identities=pubkey):
1070                pass
1071
1072    @asynctest
1073    async def test_agent_identities_config(self):
1074        """Test connecting with ssh-agent auth and IdentitiesOnly config"""
1075
1076        if not self.agent_available(): # pragma: no cover
1077            self.skipTest('ssh-agent not available')
1078
1079        write_file('ckey_err', b'')
1080
1081        write_file('config', 'IdentitiesOnly True\n'
1082                   'IdentityFile ckey-cert.pub\n'
1083                   'IdentityFile ckey_ecdsa.pub\n'
1084                   'IdentityFile ckey_err\n', 'w')
1085
1086        async with self.connect(username='ckey', config='config'):
1087            pass
1088
1089    @asynctest
1090    async def test_agent_identities_config_default_keys(self):
1091        """Test connecting with ssh-agent auth and default IdentitiesOnly"""
1092
1093        if not self.agent_available(): # pragma: no cover
1094            self.skipTest('ssh-agent not available')
1095
1096        write_file('config', 'IdentitiesOnly True\n', 'w')
1097
1098        async with self.connect(username='ckey', config='config'):
1099            pass
1100
1101    @asynctest
1102    async def test_agent_signature_algs(self):
1103        """Test ssh-agent keys with specific signature algorithms"""
1104
1105        if not self.agent_available(): # pragma: no cover
1106            self.skipTest('ssh-agent not available')
1107
1108        for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'):
1109            async with self.connect(username='ckey', signature_algs=[alg]):
1110                pass
1111
1112    @asynctest
1113    async def test_agent_auth_failure(self):
1114        """Test failure connecting with ssh-agent authentication"""
1115
1116        if not self.agent_available(): # pragma: no cover
1117            self.skipTest('ssh-agent not available')
1118
1119        with patch.dict(os.environ, HOME='xxx'):
1120            with self.assertRaises(asyncssh.PermissionDenied):
1121                await self.connect(username='ckey', agent_path='xxx',
1122                                   known_hosts='.ssh/known_hosts')
1123
1124    @asynctest
1125    async def test_agent_auth_unset(self):
1126        """Test connecting with no local keys and no ssh-agent configured"""
1127
1128        with patch.dict(os.environ, HOME='xxx', SSH_AUTH_SOCK=''):
1129            with self.assertRaises(asyncssh.PermissionDenied):
1130                await self.connect(username='ckey',
1131                                   known_hosts='.ssh/known_hosts')
1132
1133    @asynctest
1134    async def test_public_key_auth(self):
1135        """Test connecting with public key authentication"""
1136
1137        async with self.connect(username='ckey', client_keys='ckey'):
1138            pass
1139
1140    @asynctest
1141    async def test_public_key_auth_disabled(self):
1142        """Test connecting with public key authentication disabled"""
1143
1144        with self.assertRaises(asyncssh.PermissionDenied):
1145            await self.connect(username='ckey', client_keys='ckey',
1146                               public_key_auth=False)
1147
1148    @asynctest
1149    async def test_public_key_auth_not_preferred(self):
1150        """Test public key authentication not being in preferred auth list"""
1151
1152        with self.assertRaises(asyncssh.PermissionDenied):
1153            await self.connect(username='ckey', client_keys='ckey',
1154                               preferred_auth='password')
1155
1156    @asynctest
1157    async def test_public_key_signature_algs(self):
1158        """Test public key authentication with specific signature algorithms"""
1159
1160        for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'):
1161            async with self.connect(username='ckey', client_keys='ckey',
1162                                    signature_algs=[alg]):
1163                pass
1164
1165    @asynctest
1166    async def test_no_server_signature_algs(self):
1167        """Test a server which doesn't advertise signature algorithms"""
1168
1169        def skip_ext_info(self):
1170            """Don't send extension information"""
1171
1172            # pylint: disable=unused-argument
1173
1174            return []
1175
1176        with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg',
1177                   skip_ext_info):
1178            async with self.connect(username='ckey', client_keys='ckey',
1179                                    agent_path=None):
1180                pass
1181
1182    @asynctest
1183    async def test_default_public_key_auth(self):
1184        """Test connecting with default public key authentication"""
1185
1186        async with self.connect(username='ckey', agent_path=None):
1187            pass
1188
1189    @asynctest
1190    async def test_invalid_default_key(self):
1191        """Test connecting with invalid default client key"""
1192
1193        key_path = os.path.join('.ssh', 'id_dsa')
1194        with open(key_path, 'w') as f:
1195            f.write('-----XXX-----')
1196
1197        with self.assertRaises(asyncssh.KeyImportError):
1198            await self.connect(username='ckey', agent_path=None)
1199
1200        os.remove(key_path)
1201
1202    @asynctest
1203    async def test_client_key_bytes(self):
1204        """Test client key passed in as bytes"""
1205
1206        with open('ckey', 'rb') as f:
1207            ckey = f.read()
1208
1209        async with self.connect(username='ckey', client_keys=[ckey]):
1210            pass
1211
1212    @asynctest
1213    async def test_client_key_sshkey(self):
1214        """Test client key passed in as an SSHKey"""
1215
1216        ckey = asyncssh.read_private_key('ckey')
1217
1218        async with self.connect(username='ckey', client_keys=[ckey]):
1219            pass
1220
1221    @asynctest
1222    async def test_client_key_keypairs(self):
1223        """Test client keys passed in as a list of SSHKeyPairs"""
1224
1225        keys = asyncssh.load_keypairs('ckey')
1226
1227        async with self.connect(username='ckey', client_keys=keys):
1228            pass
1229
1230    @asynctest
1231    async def test_client_key_agent_keypairs(self):
1232        """Test client keys passed in as a list of SSHAgentKeyPairs"""
1233
1234        if not self.agent_available(): # pragma: no cover
1235            self.skipTest('ssh-agent not available')
1236
1237        async with asyncssh.connect_agent() as agent:
1238            for key in await agent.get_keys():
1239                async with self.connect(username='ckey', client_keys=[key]):
1240                    pass
1241
1242    @asynctest
1243    async def test_keypair_with_replaced_cert(self):
1244        """Test connecting with a keypair with replaced cert"""
1245
1246        ckey = asyncssh.load_keypairs(['ckey'])[0]
1247
1248        async with self.connect(username='ckey',
1249                                client_keys=[(ckey, 'ckey-cert.pub')]):
1250            pass
1251
1252    @asynctest
1253    async def test_agent_keypair_with_replaced_cert(self):
1254        """Test connecting with sn agent key with replaced cert"""
1255
1256        if not self.agent_available(): # pragma: no cover
1257            self.skipTest('ssh-agent not available')
1258
1259        async with asyncssh.connect_agent() as agent:
1260            ckey = (await agent.get_keys())[2]
1261
1262            async with self.connect(username='ckey',
1263                                    client_keys=[(ckey, 'ckey-cert.pub')]):
1264                pass
1265
1266    @asynctest
1267    async def test_untrusted_client_key(self):
1268        """Test untrusted client key"""
1269
1270        with self.assertRaises(asyncssh.PermissionDenied):
1271            await self.connect(username='ckey', client_keys='skey',
1272                               agent_path=None)
1273
1274    @asynctest
1275    async def test_missing_cert(self):
1276        """Test missing client certificate"""
1277
1278        with self.assertRaises(OSError):
1279            await self.connect(username='ckey', client_keys=[('ckey', 'xxx')])
1280
1281    @asynctest
1282    async def test_expired_cert(self):
1283        """Test expired certificate"""
1284
1285        ckey = asyncssh.read_private_key('ckey')
1286        skey = asyncssh.read_private_key('skey')
1287
1288        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1289                                CERT_TYPE_USER, skey, ckey, ['ckey'],
1290                                valid_before=1)
1291
1292        with self.assertRaises(asyncssh.PermissionDenied):
1293            await self.connect(username='ckey', client_keys=[(skey, cert)],
1294                               agent_path=None)
1295
1296    @asynctest
1297    async def test_allowed_address(self):
1298        """Test allowed address in certificate"""
1299
1300        ckey = asyncssh.read_private_key('ckey')
1301        skey = asyncssh.read_private_key('skey')
1302
1303        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1304                                CERT_TYPE_USER, skey, ckey, ['ckey'],
1305                                options={'source-address':
1306                                         String('0.0.0.0/0,::/0')})
1307
1308        async with self.connect(username='ckey', client_keys=[(skey, cert)]):
1309            pass
1310
1311    @asynctest
1312    async def test_disallowed_address(self):
1313        """Test disallowed address in certificate"""
1314
1315        ckey = asyncssh.read_private_key('ckey')
1316        skey = asyncssh.read_private_key('skey')
1317
1318        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1319                                CERT_TYPE_USER, skey, ckey, ['ckey'],
1320                                options={'source-address': String('0.0.0.0')})
1321
1322        with self.assertRaises(asyncssh.PermissionDenied):
1323            await self.connect(username='ckey', client_keys=[(skey, cert)],
1324                               agent_path=None)
1325
1326    @asynctest
1327    async def test_untrusted_ca(self):
1328        """Test untrusted CA"""
1329
1330        skey = asyncssh.read_private_key('skey')
1331
1332        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1333                                CERT_TYPE_USER, skey, skey, ['skey'])
1334
1335        with self.assertRaises(asyncssh.PermissionDenied):
1336            await self.connect(username='ckey', client_keys=[(skey, cert)],
1337                               agent_path=None)
1338
1339    @asynctest
1340    async def test_mismatched_ca(self):
1341        """Test mismatched CA"""
1342
1343        ckey = asyncssh.read_private_key('ckey')
1344        skey = asyncssh.read_private_key('skey')
1345
1346        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1347                                CERT_TYPE_USER, skey, skey, ['skey'])
1348
1349        with self.assertRaises(ValueError):
1350            await self.connect(username='ckey', client_keys=[(ckey, cert)])
1351
1352    @asynctest
1353    async def test_callback(self):
1354        """Test connecting with public key authentication using callback"""
1355
1356        async with self._connect_publickey(['ckey'], test_async=True):
1357            pass
1358
1359    @asynctest
1360    async def test_callback_sshkeypair(self):
1361        """Test client key passed in as an SSHKeyPair by callback"""
1362
1363        if not self.agent_available(): # pragma: no cover
1364            self.skipTest('ssh-agent not available')
1365
1366        async with asyncssh.connect_agent() as agent:
1367            keylist = await agent.get_keys()
1368
1369            async with self._connect_publickey(keylist):
1370                pass
1371
1372    @asynctest
1373    async def test_callback_untrusted_client_key(self):
1374        """Test failure connecting with public key authentication callback"""
1375
1376        with self.assertRaises(asyncssh.PermissionDenied):
1377            await self._connect_publickey(['skey'])
1378
1379    @asynctest
1380    async def test_unknown_auth(self):
1381        """Test server returning an unknown auth method before public key"""
1382
1383        with patch('asyncssh.connection.SSHClientConnection',
1384                   _UnknownAuthClientConnection):
1385            async with self.connect(username='ckey', client_keys='ckey',
1386                                    agent_path=None):
1387                pass
1388
1389    @asynctest
1390    async def test_disabled_trivial_public_key_auth(self):
1391        """Test disabling trivial auth with public key authentication"""
1392
1393        async with self.connect(username='ckey', agent_path=None,
1394                                disable_trivial_auth=True):
1395            pass
1396
1397
1398class _TestPublicKeyAsyncServerAuth(_TestPublicKeyAuth):
1399    """Unit tests for public key authentication with async server callbacks"""
1400
1401    @classmethod
1402    async def start_server(cls):
1403        """Start an SSH server which supports async public key auth"""
1404
1405        def server_factory():
1406            """Return an SSH server which trusts specific client keys"""
1407
1408            return _AsyncPublicKeyServer(client_keys=['ckey.pub',
1409                                                      'ckey_ecdsa.pub'])
1410
1411        return await cls.create_server(server_factory)
1412
1413
1414class _TestLimitedPublicKeySignatureAlgs(ServerTestCase):
1415    """Unit tests for limited public key signature algorithms"""
1416
1417    @classmethod
1418    async def start_server(cls):
1419        """Start an SSH server which supports public key authentication"""
1420
1421        return await cls.create_server(
1422            _PublicKeyServer, authorized_client_keys='authorized_keys',
1423            signature_algs=['ssh-rsa', 'rsa-sha2-512'])
1424
1425    @asynctest
1426    async def test_mismatched_client_signature_algs(self):
1427        """Test mismatched client key signature algorithms"""
1428
1429        with self.assertRaises(asyncssh.PermissionDenied):
1430            await self.connect(username='ckey', client_keys='ckey',
1431                               signature_algs=['rsa-sha2-256'])
1432
1433    @asynctest
1434    async def test_client_signature_alg_fallback(self):
1435        """Test fall back to default client key signature algorithm"""
1436
1437        async with self.connect(username='ckey', client_keys='ckey',
1438                                signature_algs=['rsa-sha2-256', 'ssh-rsa']):
1439            pass
1440
1441
1442class _TestSetAuthorizedKeys(ServerTestCase):
1443    """Unit tests for public key authentication with set_authorized_keys"""
1444
1445    @classmethod
1446    async def start_server(cls):
1447        """Start an SSH server which supports public key authentication"""
1448
1449        def server_factory():
1450            """Return an SSH server which calls set_authorized_keys"""
1451
1452            return _PublicKeyServer(authorized_keys='authorized_keys')
1453
1454        return await cls.create_server(server_factory)
1455
1456    @asynctest
1457    async def test_set_authorized_keys(self):
1458        """Test set_authorized_keys method on server"""
1459
1460        async with self.connect(username='ckey', client_keys='ckey'):
1461            pass
1462
1463    @asynctest
1464    async def test_cert_principals(self):
1465        """Test certificate principals check"""
1466
1467        ckey = asyncssh.read_private_key('ckey')
1468
1469        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1470                                CERT_TYPE_USER, ckey, ckey, ['ckey'])
1471
1472        async with self.connect(username='ckey', client_keys=[(ckey, cert)]):
1473            pass
1474
1475
1476class _TestPreloadedAuthorizedKeys(ServerTestCase):
1477    """Unit tests for authentication with pre-loaded authorized keys"""
1478
1479    @classmethod
1480    async def start_server(cls):
1481        """Start an SSH server which supports public key authentication"""
1482
1483        def server_factory():
1484            """Return an SSH server which calls set_authorized_keys"""
1485
1486            authorized_keys = asyncssh.read_authorized_keys('authorized_keys')
1487            return _PublicKeyServer(authorized_keys=authorized_keys)
1488
1489        return await cls.create_server(server_factory)
1490
1491    @asynctest
1492    async def test_pre_loaded_authorized_keys(self):
1493        """Test pre-loaded authorized keys file"""
1494
1495        async with self.connect(username='ckey', client_keys='ckey'):
1496            pass
1497
1498
1499class _TestPreloadedAuthorizedKeysFileList(ServerTestCase):
1500    """Unit tests with pre-loaded authorized keys file list"""
1501
1502    @classmethod
1503    async def start_server(cls):
1504        """Start an SSH server which supports public key authentication"""
1505
1506        def server_factory():
1507            """Return an SSH server which calls set_authorized_keys"""
1508
1509            authorized_keys = asyncssh.read_authorized_keys(['authorized_keys'])
1510            return _PublicKeyServer(authorized_keys=authorized_keys)
1511
1512        return await cls.create_server(server_factory)
1513
1514    @asynctest
1515    async def test_pre_loaded_authorized_keys(self):
1516        """Test pre-loaded authorized keys file list"""
1517
1518        async with self.connect(username='ckey', client_keys='ckey'):
1519            pass
1520
1521
1522@unittest.skipUnless(x509_available, 'X.509 not available')
1523class _TestX509Auth(ServerTestCase):
1524    """Unit tests for X.509 certificate authentication"""
1525
1526    @classmethod
1527    async def start_server(cls):
1528        """Start an SSH server which supports public key authentication"""
1529
1530        return await cls.create_server(
1531            _PublicKeyServer, authorized_client_keys='authorized_keys_x509')
1532
1533    @asynctest
1534    async def test_x509_self(self):
1535        """Test connecting with X.509 self-signed certificate"""
1536
1537        async with self.connect(username='ckey',
1538                                client_keys=['ckey_x509_self']):
1539            pass
1540
1541    @asynctest
1542    async def test_x509_chain(self):
1543        """Test connecting with X.509 certificate chain"""
1544
1545        async with self.connect(username='ckey',
1546                                client_keys=['ckey_x509_chain']):
1547            pass
1548
1549    @asynctest
1550    async def test_keypair_with_x509_cert(self):
1551        """Test connecting with a keypair with replaced X.509 cert"""
1552
1553        ckey = asyncssh.load_keypairs(['ckey'])[0]
1554
1555        async with self.connect(username='ckey',
1556                                client_keys=[(ckey, 'ckey_x509_chain')]):
1557            pass
1558
1559    @asynctest
1560    async def test_agent_keypair_with_x509_cert(self):
1561        """Test connecting with sn agent key with replaced X.509 cert"""
1562
1563        if not self.agent_available(): # pragma: no cover
1564            self.skipTest('ssh-agent not available')
1565
1566        async with asyncssh.connect_agent() as agent:
1567            ckey = (await agent.get_keys())[2]
1568
1569            async with self.connect(username='ckey',
1570                                    client_keys=[(ckey, 'ckey_x509_chain')]):
1571                pass
1572
1573    @asynctest
1574    async def test_x509_incomplete_chain(self):
1575        """Test connecting with incomplete X.509 certificate chain"""
1576
1577        with self.assertRaises(asyncssh.PermissionDenied):
1578            await self.connect(username='ckey',
1579                               client_keys=[('ckey_x509_chain',
1580                                             'ckey_x509_partial.pem')])
1581
1582    @asynctest
1583    async def test_x509_untrusted_cert(self):
1584        """Test connecting with untrusted X.509 certificate chain"""
1585
1586        with self.assertRaises(asyncssh.PermissionDenied):
1587            await self.connect(username='ckey', client_keys=['skey_x509_chain'])
1588
1589    @asynctest
1590    async def test_disabled_trivial_x509_auth(self):
1591        """Test disabling trivial auth with X.509 certificate authentication"""
1592
1593        async with self.connect(username='ckey',
1594                                client_keys=['ckey_x509_self'],
1595                                disable_trivial_auth=True):
1596            pass
1597
1598
1599@unittest.skipUnless(x509_available, 'X.509 not available')
1600class _TestX509AuthDisabled(ServerTestCase):
1601    """Unit tests for disabled X.509 certificate authentication"""
1602
1603    @classmethod
1604    async def start_server(cls):
1605        """Start an SSH server which doesn't support X.509 authentication"""
1606
1607        return await cls.create_server(
1608            _PublicKeyServer, x509_trusted_certs=None,
1609            authorized_client_keys='authorized_keys')
1610
1611    @asynctest
1612    async def test_failed_x509_auth(self):
1613        """Test connect failure with X.509 certificate"""
1614
1615        with self.assertRaises(asyncssh.PermissionDenied):
1616            await self.connect(username='ckey', client_keys=['ckey_x509_self'],
1617                               signature_algs=['x509v3-ssh-rsa'])
1618
1619    @asynctest
1620    async def test_non_x509(self):
1621        """Test connecting without an X.509 certificate"""
1622
1623        async with self.connect(username='ckey', client_keys=['ckey']):
1624            pass
1625
1626
1627@unittest.skipUnless(x509_available, 'X.509 not available')
1628class _TestX509Subject(ServerTestCase):
1629    """Unit tests for X.509 certificate authentication by subject name"""
1630
1631    @classmethod
1632    async def start_server(cls):
1633        """Start an SSH server which supports public key authentication"""
1634
1635        authorized_keys = asyncssh.import_authorized_keys(
1636            'x509v3-ssh-rsa subject=OU=name\n')
1637
1638        return await cls.create_server(
1639            _PublicKeyServer, authorized_client_keys=authorized_keys,
1640            x509_trusted_certs=['ckey_x509_self.pub'])
1641
1642    @asynctest
1643    async def test_x509_subject(self):
1644        """Test authenticating X.509 certificate by subject name"""
1645
1646        async with self.connect(username='ckey',
1647                                client_keys=['ckey_x509_self']):
1648            pass
1649
1650
1651@unittest.skipUnless(x509_available, 'X.509 not available')
1652class _TestX509Untrusted(ServerTestCase):
1653    """Unit tests for X.509 authentication with no trusted certificates"""
1654
1655    @classmethod
1656    async def start_server(cls):
1657        """Start an SSH server which supports public key authentication"""
1658
1659        return await cls.create_server(_PublicKeyServer,
1660                                       authorized_client_keys=None)
1661
1662    @asynctest
1663    async def test_x509_untrusted(self):
1664        """Test untrusted X.509 self-signed certificate"""
1665
1666        with self.assertRaises(asyncssh.PermissionDenied):
1667            await self.connect(username='ckey', client_keys=['ckey_x509_self'])
1668
1669
1670@unittest.skipUnless(x509_available, 'X.509 not available')
1671class _TestX509Disabled(ServerTestCase):
1672    """Unit tests for X.509 authentication with server support disabled"""
1673
1674    @classmethod
1675    async def start_server(cls):
1676        """Start an SSH server with X.509 authentication disabled"""
1677
1678        return await cls.create_server(_PublicKeyServer, x509_purposes=None)
1679
1680    @asynctest
1681    async def test_x509_disabled(self):
1682        """Test X.509 client certificate with server support disabled"""
1683
1684        with self.assertRaises(asyncssh.PermissionDenied):
1685            await self.connect(username='ckey', client_keys='skey_x509_self')
1686
1687
1688class _TestPasswordAuth(ServerTestCase):
1689    """Unit tests for password authentication"""
1690
1691    @classmethod
1692    async def start_server(cls):
1693        """Start an SSH server which supports password authentication"""
1694
1695        return await cls.create_server(_PasswordServer)
1696
1697    @async_context_manager
1698    async def _connect_password(self, username, password, old_password='',
1699                                new_password='', disable_trivial_auth=False,
1700                                test_async=False):
1701        """Open a connection to test password authentication"""
1702
1703        def client_factory():
1704            """Return an SSHClient to use to do password change"""
1705
1706            cls = _AsyncPasswordClient if test_async else _PasswordClient
1707            return cls(password, old_password, new_password)
1708
1709        conn, _ = await self.create_connection(
1710            client_factory, username=username, client_keys=None,
1711            disable_trivial_auth=disable_trivial_auth)
1712
1713        return conn
1714
1715    @asynctest
1716    async def test_password_auth(self):
1717        """Test connecting with password authentication"""
1718
1719        async with self.connect(username='pw', password='pw', client_keys=None):
1720            pass
1721
1722    @asynctest
1723    async def test_password_auth_disabled(self):
1724        """Test connecting with password authentication disabled"""
1725
1726        with self.assertRaises(asyncssh.PermissionDenied):
1727            await self.connect(username='pw', password='kbdint',
1728                               password_auth=False, preferred_auth='password')
1729
1730    @asynctest
1731    async def test_password_auth_failure(self):
1732        """Test _failure connecting with password authentication"""
1733
1734        with self.assertRaises(asyncssh.PermissionDenied):
1735            await self.connect(username='pw', password='badpw',
1736                               client_keys=None)
1737
1738    @asynctest
1739    async def test_password_auth_callback(self):
1740        """Test connecting with password authentication callback"""
1741
1742        async with self._connect_password('pw', 'pw', test_async=True):
1743            pass
1744
1745    @asynctest
1746    async def test_password_auth_callback_failure(self):
1747        """Test failure connecting with password authentication callback"""
1748
1749        with self.assertRaises(asyncssh.PermissionDenied):
1750            await self._connect_password('pw', 'badpw')
1751
1752    @asynctest
1753    async def test_password_change(self):
1754        """Test password change"""
1755
1756        async with self._connect_password('pw', 'oldpw', 'oldpw', 'pw',
1757                                          test_async=True):
1758            pass
1759
1760    @asynctest
1761    async def test_password_change_failure(self):
1762        """Test failure of password change"""
1763
1764        with self.assertRaises(asyncssh.PermissionDenied):
1765            await self._connect_password('pw', 'oldpw', 'badpw', 'pw')
1766
1767    @asynctest
1768    async def test_disabled_trivial_password_auth(self):
1769        """Test disabling trivial auth with password authentication"""
1770
1771        async with self.connect(username='pw', password='pw',
1772                                client_keys=None, disable_trivial_auth=True):
1773            pass
1774
1775    @asynctest
1776    async def test_disabled_trivial_password_change(self):
1777        """Test disabling trivial aith with password change"""
1778
1779        async with self._connect_password('pw', 'oldpw', 'oldpw', 'pw',
1780                                          disable_trivial_auth=True):
1781            pass
1782
1783
1784class _TestPasswordAsyncServerAuth(_TestPasswordAuth):
1785    """Unit tests for password authentication with async server callbacks"""
1786
1787    @classmethod
1788    async def start_server(cls):
1789        """Start an SSH server which supports async password authentication"""
1790
1791        return await cls.create_server(_AsyncPasswordServer)
1792
1793
1794class _TestKbdintAuth(ServerTestCase):
1795    """Unit tests for keyboard-interactive authentication"""
1796
1797    @classmethod
1798    async def start_server(cls):
1799        """Start an SSH server which supports keyboard-interactive auth"""
1800
1801        return await cls.create_server(_KbdintServer)
1802
1803    @async_context_manager
1804    async def _connect_kbdint(self, username, responses, test_async=False):
1805        """Open a connection to test keyboard-interactive auth"""
1806
1807        def client_factory():
1808            """Return an SSHClient to use to do keyboard-interactive auth"""
1809
1810            cls = _AsyncKbdintClient if test_async else _KbdintClient
1811            return cls(responses)
1812
1813        conn, _ = await self.create_connection(client_factory,
1814                                               username=username,
1815                                               client_keys=None)
1816
1817        return conn
1818
1819    @asynctest
1820    async def test_kbdint_auth_no_prompts(self):
1821        """Test keyboard-interactive authentication with no prompts"""
1822
1823        async with self.connect(username='none', password='kbdint',
1824                                client_keys=None):
1825            pass
1826
1827    @asynctest
1828    async def test_kbdint_auth_password(self):
1829        """Test keyboard-interactive authentication via password"""
1830
1831        async with self.connect(username='pw', password='kbdint',
1832                                client_keys=None):
1833            pass
1834
1835    @asynctest
1836    async def test_kbdint_auth_passcode(self):
1837        """Test keyboard-interactive authentication via passcode"""
1838
1839        async with self.connect(username='pc', password='kbdint',
1840                                client_keys=None):
1841            pass
1842
1843    @asynctest
1844    async def test_kbdint_auth_not_password(self):
1845        """Test keyboard-interactive authentication other than password"""
1846
1847        with self.assertRaises(asyncssh.PermissionDenied):
1848            await self.connect(username='kbdint', password='kbdint',
1849                               client_keys=None)
1850
1851    @asynctest
1852    async def test_kbdint_auth_multi_not_password(self):
1853        """Test keyboard-interactive authentication with multiple prompts"""
1854
1855        with self.assertRaises(asyncssh.PermissionDenied):
1856            await self.connect(username='multi', password='kbdint',
1857                               client_keys=None)
1858
1859    @asynctest
1860    async def test_kbdint_auth_disabled(self):
1861        """Test connecting with keyboard-interactive authentication disabled"""
1862
1863        with self.assertRaises(asyncssh.PermissionDenied):
1864            await self.connect(username='pw', password='kbdint',
1865                               kbdint_auth=False)
1866
1867    @asynctest
1868    async def test_kbdint_auth_failure(self):
1869        """Test failure connecting with keyboard-interactive authentication"""
1870
1871        with self.assertRaises(asyncssh.PermissionDenied):
1872            await self.connect(username='kbdint', password='badpw',
1873                               client_keys=None)
1874
1875    @asynctest
1876    async def test_kbdint_auth_callback(self):
1877        """Test keyboard-interactive auth callback"""
1878
1879        async with self._connect_kbdint('kbdint', ['kbdint'], test_async=True):
1880            pass
1881
1882    @asynctest
1883    async def test_kbdint_auth_callback_multi(self):
1884        """Test keyboard-interactive auth callback with multiple challenges"""
1885
1886        async with self._connect_kbdint('multi', ['1', '2'], test_async=True):
1887            pass
1888
1889    @asynctest
1890    async def test_kbdint_auth_callback_faliure(self):
1891        """Test failure connecting with keyboard-interactive auth callback"""
1892
1893        with self.assertRaises(asyncssh.PermissionDenied):
1894            await self._connect_kbdint('kbdint', ['badpw'])
1895
1896    @asynctest
1897    async def test_disabled_trivial_kbdint_auth(self):
1898        """Test disabled trivial auth with keyboard-interactive auth"""
1899
1900        async with self.connect(username='pw', password='kbdint',
1901                                client_keys=None, disable_trivial_auth=True):
1902            pass
1903
1904    @asynctest
1905    async def test_disabled_trivial_kbdint_no_prompts(self):
1906        """Test disabled trivial with with no keyboard-interactive prompts"""
1907
1908        with self.assertRaises(asyncssh.PermissionDenied):
1909            await self.connect(username='none', password='kbdint',
1910                               client_keys=None, disable_trivial_auth=True)
1911
1912
1913class _TestKbdintAsyncServerAuth(_TestKbdintAuth):
1914    """Unit tests for keyboard-interactive auth with async server callbacks"""
1915
1916    @classmethod
1917    async def start_server(cls):
1918        """Start an SSH server which supports async kbd-int auth"""
1919
1920        return await cls.create_server(_AsyncKbdintServer)
1921
1922
1923class _TestKbdintPasswordServerAuth(ServerTestCase):
1924    """Unit tests for keyboard-interactive auth with server password auth"""
1925
1926    @classmethod
1927    async def start_server(cls):
1928        """Start an SSH server which supports server password auth"""
1929
1930        return await cls.create_server(_PasswordServer)
1931
1932    @async_context_manager
1933    async def _connect_kbdint(self, username, responses):
1934        """Open a connection to test keyboard-interactive auth"""
1935
1936        def client_factory():
1937            """Return an SSHClient to use to do keyboard-interactive auth"""
1938
1939            return _KbdintClient(responses)
1940
1941        conn, _ = await self.create_connection(client_factory,
1942                                               username=username,
1943                                               client_keys=None)
1944
1945        return conn
1946
1947    @asynctest
1948    async def test_kbdint_password_auth(self):
1949        """Test keyboard-interactive server password authentication"""
1950
1951        async with self._connect_kbdint('pw', ['pw']):
1952            pass
1953
1954    @asynctest
1955    async def test_kbdint_password_auth_multiple_responses(self):
1956        """Test multiple responses to server password authentication"""
1957
1958        with self.assertRaises(asyncssh.PermissionDenied):
1959            await self._connect_kbdint('pw', ['xxx', 'yyy'])
1960
1961    @asynctest
1962    async def test_kbdint_password_change(self):
1963        """Test keyboard-interactive server password change"""
1964
1965        with self.assertRaises(asyncssh.PermissionDenied):
1966            await self._connect_kbdint('pw', ['oldpw'])
1967
1968
1969class _TestClientLoginTimeout(ServerTestCase):
1970    """Unit test for client login timeout"""
1971
1972    @classmethod
1973    async def start_server(cls):
1974        """Start an SSH server which supports public key authentication"""
1975
1976        def server_factory():
1977            """Return an SSHServer that delays before starting auth"""
1978
1979            return _PublicKeyServer(delay=2)
1980
1981        return await cls.create_server(
1982            server_factory, authorized_client_keys='authorized_keys')
1983
1984    @asynctest
1985    async def test_client_login_timeout_exceeded(self):
1986        """Test client login timeout exceeded"""
1987
1988        with self.assertRaises(asyncssh.ConnectionLost):
1989            await self.connect(username='ckey', client_keys='ckey',
1990                               login_timeout=1)
1991
1992    @asynctest
1993    async def test_client_login_timeout_exceeded_string(self):
1994        """Test client login timeout exceeded with string value"""
1995
1996        with self.assertRaises(asyncssh.ConnectionLost):
1997            await self.connect(username='ckey', client_keys='ckey',
1998                               login_timeout='0m1s')
1999
2000    @asynctest
2001    async def test_invalid_client_login_timeout(self):
2002        """Test invalid client login timeout"""
2003
2004        with self.assertRaises(ValueError):
2005            await self.connect(login_timeout=-1)
2006
2007
2008class _TestServerLoginTimeoutExceeded(ServerTestCase):
2009    """Unit test for server login timeout"""
2010
2011    @classmethod
2012    async def start_server(cls):
2013        """Start an SSH server with a 1 second login timeout"""
2014
2015        return await cls.create_server(
2016            _PublicKeyServer, authorized_client_keys='authorized_keys',
2017            login_timeout=1)
2018
2019    @asynctest
2020    async def test_server_login_timeout_exceeded(self):
2021        """Test server_login timeout exceeded"""
2022
2023        def client_factory():
2024            """Return an SSHClient that delays before providing a key"""
2025
2026            return _PublicKeyClient(['ckey'], 2)
2027
2028        with self.assertRaises(asyncssh.ConnectionLost):
2029            await self.create_connection(client_factory, username='ckey',
2030                                         client_keys=None)
2031
2032
2033class _TestServerLoginTimeoutDisabled(ServerTestCase):
2034    """Unit test for disabled server login timeout"""
2035
2036    @classmethod
2037    async def start_server(cls):
2038        """Start an SSH server with no login timeout"""
2039
2040        return await cls.create_server(
2041            _PublicKeyServer, authorized_client_keys='authorized_keys',
2042            login_timeout=None)
2043
2044    @asynctest
2045    async def test_server_login_timeout_disabled(self):
2046        """Test with login timeout disabled"""
2047
2048        async with self.connect(username='ckey', client_keys='ckey'):
2049            pass
2050