1# Copyright (c) 2016-2020 by Ron Frederick <ronf@timeheart.net> and others. 2# 3# This program and the accompanying materials are made available under 4# the terms of the Eclipse Public License v2.0 which accompanies this 5# distribution and is available at: 6# 7# http://www.eclipse.org/legal/epl-2.0/ 8# 9# This program may also be made available under the following secondary 10# licenses when the conditions for such availability set forth in the 11# Eclipse Public License v2.0 are satisfied: 12# 13# GNU General Public License, Version 2.0, or any later versions of 14# that license 15# 16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later 17# 18# Contributors: 19# Ron Frederick - initial implementation, API, and documentation 20 21"""Unit tests for AsyncSSH connection authentication""" 22 23import asyncio 24import os 25import unittest 26 27from unittest.mock import patch 28 29import asyncssh 30from asyncssh.misc import async_context_manager, write_file 31from asyncssh.packet import String 32from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST 33 34from .keysign_stub import create_subprocess_exec_stub 35from .server import Server, ServerTestCase 36from .util import asynctest, gss_available, patch_getnameinfo, patch_gss 37from .util import make_certificate, x509_available 38 39 40class _FailValidateHostSSHServerConnection(asyncssh.SSHServerConnection): 41 """Test error in validating host key signature""" 42 43 async def validate_host_based_auth(self, username, key_data, client_host, 44 client_username, msg, signature): 45 """Validate host based authentication for the specified host and user""" 46 47 return await super().validate_host_based_auth(username, key_data, 48 client_host, 49 client_username, 50 msg + b'\xff', signature) 51 52 53class _AsyncGSSServer(asyncssh.SSHServer): 54 """Server for testing async GSS authentication""" 55 56 # pylint: disable=useless-super-delegation 57 58 async def validate_gss_principal(self, username, user_principal, 59 host_principal): 60 """Return whether password is valid for this user""" 61 62 return super().validate_gss_principal(username, user_principal, 63 host_principal) 64 65 66class _NullServer(Server): 67 """Server for testing disabled auth""" 68 69 async def begin_auth(self, username): 70 """Handle client authentication request""" 71 72 return False 73 74 75class _HostBasedServer(Server): 76 """Server for testing host-based authentication""" 77 78 def __init__(self, host_key=None, ca_key=None): 79 super().__init__() 80 81 self._host_key = \ 82 asyncssh.read_public_key(host_key) if host_key else None 83 self._ca_key = \ 84 asyncssh.read_public_key(ca_key) if ca_key else None 85 86 def host_based_auth_supported(self): 87 """Return whether or not host based authentication is supported""" 88 89 return True 90 91 def validate_host_public_key(self, client_host, client_addr, 92 client_port, key): 93 """Return whether key is an authorized key for this host""" 94 95 # pylint: disable=unused-argument 96 97 return key == self._host_key 98 99 def validate_host_ca_key(self, client_host, client_addr, client_port, key): 100 """Return whether key is an authorized CA key for this host""" 101 102 # pylint: disable=unused-argument 103 104 return key == self._ca_key 105 106 def validate_host_based_user(self, username, client_host, client_username): 107 """Return whether remote host and user is authorized for this user""" 108 109 # pylint: disable=unused-argument 110 111 return client_username == 'user' 112 113 114class _AsyncHostBasedServer(Server): 115 """Server for testing async host-based authentication""" 116 117 # pylint: disable=useless-super-delegation 118 119 async def validate_host_based_user(self, username, client_host, 120 client_username): 121 """Return whether remote host and user is authorized for this user""" 122 123 return super().validate_host_based_user(username, client_host, 124 client_username) 125 126 127class _InvalidUsernameClientConnection(asyncssh.connection.SSHClientConnection): 128 """Test sending a client username with invalid Unicode to the server""" 129 130 async def host_based_auth_requested(self): 131 """Return a host key pair, host, and user to authenticate with""" 132 133 keypair, host, _ = await super().host_based_auth_requested() 134 135 return keypair, host, b'\xff' 136 137 138class _PublicKeyClient(asyncssh.SSHClient): 139 """Test client public key authentication""" 140 141 def __init__(self, keylist, delay=0): 142 self._keylist = keylist 143 self._delay = delay 144 145 async def public_key_auth_requested(self): 146 """Return a public key to authenticate with""" 147 148 if self._delay: 149 await asyncio.sleep(self._delay) 150 151 return self._keylist.pop(0) if self._keylist else None 152 153 154class _AsyncPublicKeyClient(_PublicKeyClient): 155 """Test async client public key authentication""" 156 157 # pylint: disable=useless-super-delegation 158 159 async def public_key_auth_requested(self): 160 """Return a public key to authenticate with""" 161 162 return await super().public_key_auth_requested() 163 164 165class _PublicKeyServer(Server): 166 """Server for testing public key authentication""" 167 168 def __init__(self, client_keys=(), authorized_keys=None, delay=0): 169 super().__init__() 170 self._client_keys = client_keys 171 self._authorized_keys = authorized_keys 172 self._delay = delay 173 174 def connection_made(self, conn): 175 """Called when a connection is made""" 176 177 super().connection_made(conn) 178 conn.send_auth_banner('auth banner') 179 180 async def begin_auth(self, username): 181 """Handle client authentication request""" 182 183 if self._authorized_keys: 184 self._conn.set_authorized_keys(self._authorized_keys) 185 else: 186 self._client_keys = asyncssh.load_public_keys(self._client_keys) 187 188 if self._delay: 189 await asyncio.sleep(self._delay) 190 191 return True 192 193 def public_key_auth_supported(self): 194 """Return whether or not public key authentication is supported""" 195 196 return True 197 198 def validate_public_key(self, username, key): 199 """Return whether key is an authorized client key for this user""" 200 201 return key in self._client_keys 202 203 def validate_ca_key(self, username, key): 204 """Return whether key is an authorized CA key for this user""" 205 206 return key in self._client_keys 207 208 209class _AsyncPublicKeyServer(_PublicKeyServer): 210 """Server for testing async public key authentication""" 211 212 # pylint: disable=useless-super-delegation 213 214 async def begin_auth(self, username): 215 """Handle client authentication request""" 216 217 return await super().begin_auth(username) 218 219 async def validate_public_key(self, username, key): 220 """Return whether key is an authorized client key for this user""" 221 222 return super().validate_public_key(username, key) 223 224 async def validate_ca_key(self, username, key): 225 """Return whether key is an authorized CA key for this user""" 226 227 return super().validate_ca_key(username, key) 228 229 230class _PasswordClient(asyncssh.SSHClient): 231 """Test client password authentication""" 232 233 def __init__(self, password, old_password, new_password): 234 self._password = password 235 self._old_password = old_password 236 self._new_password = new_password 237 238 def password_auth_requested(self): 239 """Return a password to authenticate with""" 240 241 if self._password: 242 result = self._password 243 self._password = None 244 return result 245 else: 246 return None 247 248 def password_change_requested(self, prompt, lang): 249 """Change the client's password""" 250 251 return self._old_password, self._new_password 252 253 254class _AsyncPasswordClient(_PasswordClient): 255 """Test async client password authentication""" 256 257 # pylint: disable=useless-super-delegation 258 259 async def password_auth_requested(self): 260 """Return a password to authenticate with""" 261 262 return super().password_auth_requested() 263 264 async def password_change_requested(self, prompt, lang): 265 """Change the client's password""" 266 267 return super().password_change_requested(prompt, lang) 268 269 270class _PasswordServer(Server): 271 """Server for testing password authentication""" 272 273 def password_auth_supported(self): 274 """Enable password authentication""" 275 276 return True 277 278 def validate_password(self, username, password): 279 """Accept password of pw, trigger password change on oldpw""" 280 281 if password == 'oldpw': 282 raise asyncssh.PasswordChangeRequired('Password change required') 283 else: 284 return password == 'pw' 285 286 def change_password(self, username, old_password, new_password): 287 """Only allow password change from password oldpw""" 288 289 return old_password == 'oldpw' 290 291 292class _AsyncPasswordServer(_PasswordServer): 293 """Server for testing async password authentication""" 294 295 # pylint: disable=useless-super-delegation 296 297 async def validate_password(self, username, password): 298 """Return whether password is valid for this user""" 299 300 return super().validate_password(username, password) 301 302 async def change_password(self, username, old_password, new_password): 303 """Handle a request to change a user's password""" 304 305 return super().change_password(username, old_password, new_password) 306 307 308class _KbdintClient(asyncssh.SSHClient): 309 """Test keyboard-interactive client auth""" 310 311 def __init__(self, responses): 312 self._responses = responses 313 314 def kbdint_auth_requested(self): 315 """Return the list of supported keyboard-interactive auth methods""" 316 317 return '' if self._responses else None 318 319 def kbdint_challenge_received(self, name, instructions, lang, prompts): 320 """Return responses to a keyboard-interactive auth challenge""" 321 322 # pylint: disable=unused-argument 323 324 if not prompts: 325 return [] 326 elif self._responses: 327 result = self._responses 328 self._responses = None 329 return result 330 else: 331 return None 332 333 334class _AsyncKbdintClient(_KbdintClient): 335 """Test keyboard-interactive client auth""" 336 337 # pylint: disable=useless-super-delegation 338 339 async def kbdint_auth_requested(self): 340 """Return the list of supported keyboard-interactive auth methods""" 341 342 return super().kbdint_auth_requested() 343 344 async def kbdint_challenge_received(self, name, instructions, 345 lang, prompts): 346 """Return responses to a keyboard-interactive auth challenge""" 347 348 return super().kbdint_challenge_received(name, instructions, 349 lang, prompts) 350 351 352class _KbdintServer(Server): 353 """Server for testing keyboard-interactive authentication""" 354 355 def __init__(self): 356 super().__init__() 357 self._kbdint_round = 0 358 359 def kbdint_auth_supported(self): 360 """Enable keyboard-interactive authentication""" 361 362 return True 363 364 def get_kbdint_challenge(self, username, lang, submethods): 365 """Return an initial challenge with only instructions""" 366 367 return '', 'instructions', '', [] 368 369 def validate_kbdint_response(self, username, responses): 370 """Return a password challenge after the instructions""" 371 372 if self._kbdint_round == 0: 373 if username == 'none': 374 result = ('', '', '', []) 375 elif username == 'pw': 376 result = ('', '', '', [('Password:', False)]) 377 elif username == 'pc': 378 result = ('', '', '', [('Passcode:', False)]) 379 elif username == 'multi': 380 result = ('', '', '', [('Prompt1:', True), ('Prompt2', True)]) 381 else: 382 result = ('', '', '', [('Other Challenge:', False)]) 383 else: 384 if responses in ([], ['kbdint'], ['1', '2']): 385 result = True 386 else: 387 result = ('', '', '', [('Second Challenge:', True)]) 388 389 self._kbdint_round += 1 390 return result 391 392 393class _AsyncKbdintServer(_KbdintServer): 394 """Server for testing async keyboard-interactive authentication""" 395 396 # pylint: disable=useless-super-delegation 397 398 async def get_kbdint_challenge(self, username, lang, submethods): 399 """Return a keyboard-interactive auth challenge""" 400 401 return super().get_kbdint_challenge(username, lang, submethods) 402 403 async def validate_kbdint_response(self, username, responses): 404 """Return whether the keyboard-interactive response is valid 405 for this user""" 406 407 return super().validate_kbdint_response(username, responses) 408 409 410class _UnknownAuthClientConnection(asyncssh.connection.SSHClientConnection): 411 """Test getting back an unknown auth method from the SSH server""" 412 413 def try_next_auth(self): 414 """Attempt client authentication using an unknown method""" 415 416 self._auth_methods = [b'unknown'] + self._auth_methods 417 super().try_next_auth() 418 419 420class _TestNullAuth(ServerTestCase): 421 """Unit tests for testing disabled authentication""" 422 423 @classmethod 424 async def start_server(cls): 425 """Start an SSH server which supports disabled authentication""" 426 427 return await cls.create_server(_NullServer) 428 429 @asynctest 430 async def test_disabled_auth(self): 431 """Test disabled authentication""" 432 433 async with self.connect(username='user'): 434 pass 435 436 @asynctest 437 async def test_disabled_trivial_auth(self): 438 """Test disabling trivial auth with no authentication""" 439 440 with self.assertRaises(asyncssh.PermissionDenied): 441 await self.connect(username='user', disable_trivial_auth=True) 442 443 444@unittest.skipUnless(gss_available, 'GSS not available') 445@patch_gss 446class _TestGSSAuth(ServerTestCase): 447 """Unit tests for GSS authentication""" 448 449 @classmethod 450 async def start_server(cls): 451 """Start an SSH server which supports GSS authentication""" 452 453 return await cls.create_server(_AsyncGSSServer, gss_host='1') 454 455 @asynctest 456 async def test_gss_kex_auth(self): 457 """Test GSS key exchange authentication""" 458 459 async with self.connect(kex_algs=['gss-gex-sha256'], 460 username='user', gss_host='1'): 461 pass 462 463 @asynctest 464 async def test_gss_mic_auth(self): 465 """Test GSS MIC authentication""" 466 467 async with self.connect(kex_algs=['ecdh-sha2-nistp256'], 468 username='user', gss_host='1'): 469 pass 470 471 @asynctest 472 async def test_gss_delegate(self): 473 """Test GSS credential delegation""" 474 475 async with self.connect(username='user', gss_host='1', 476 gss_delegate_creds=True): 477 pass 478 479 @asynctest 480 async def test_gss_kex_disabled(self): 481 """Test GSS key exchange being disabled""" 482 483 with self.assertRaises(asyncssh.PermissionDenied): 484 await self.connect(username='user', gss_host=(), gss_kex=False, 485 preferred_auth='gssapi-keyex') 486 487 @asynctest 488 async def test_gss_auth_disabled(self): 489 """Test GSS authentication being disabled""" 490 491 with self.assertRaises(asyncssh.PermissionDenied): 492 await self.connect(username='user', gss_host=(), gss_auth=False) 493 494 @asynctest 495 async def test_gss_auth_unavailable(self): 496 """Test GSS authentication being unavailable""" 497 498 with self.assertRaises(asyncssh.PermissionDenied): 499 await self.connect(username='user1', gss_host=()) 500 501 @asynctest 502 async def test_gss_client_error(self): 503 """Test GSS client error""" 504 505 with self.assertRaises(asyncssh.PermissionDenied): 506 await self.connect(gss_host='1,init_error', username='user') 507 508 @asynctest 509 async def test_disabled_trivial_gss_kex_auth(self): 510 """Test disabling trivial auth with GSS key exchange authentication""" 511 512 async with self.connect(kex_algs=['gss-gex-sha256'], 513 username='user', gss_host='1', 514 disable_trivial_auth=True): 515 pass 516 517 @asynctest 518 async def test_disabled_trivial_gss_mic_auth(self): 519 """Test disabling trivial auth with GSS MIC authentication""" 520 521 async with self.connect(kex_algs=['ecdh-sha2-nistp256'], 522 username='user', gss_host='1', 523 disable_trivial_auth=True): 524 pass 525 526 527@unittest.skipUnless(gss_available, 'GSS not available') 528@patch_gss 529class _TestGSSServerAuthDisabled(ServerTestCase): 530 """Unit tests for with GSS key exchange and auth disabled on server""" 531 532 @classmethod 533 async def start_server(cls): 534 """Start an SSH server with GSS key exchange and auth disabled""" 535 536 return await cls.create_server(gss_host='1', gss_kex=False, 537 gss_auth=False) 538 539 @asynctest 540 async def test_gss_kex_unavailable(self): 541 """Test GSS key exchange being unavailable""" 542 543 with self.assertRaises(asyncssh.PermissionDenied): 544 await self.connect(username='user', gss_host=(), 545 preferred_auth='gssapi-keyex') 546 547 @asynctest 548 async def test_gss_auth_unavailable(self): 549 """Test GSS authentication being unavailable""" 550 551 with self.assertRaises(asyncssh.PermissionDenied): 552 await self.connect(username='user', gss_host=(), 553 preferred_auth='gssapi-with-mic') 554 555 556 557@unittest.skipUnless(gss_available, 'GSS not available') 558@patch_gss 559class _TestGSSServerError(ServerTestCase): 560 """Unit tests for GSS server error""" 561 562 @classmethod 563 async def start_server(cls): 564 """Start an SSH server which raises an error on GSS authentication""" 565 566 return await cls.create_server(gss_host='1,init_error') 567 568 @asynctest 569 async def test_gss_server_error(self): 570 """Test GSS error on server""" 571 572 with self.assertRaises(asyncssh.PermissionDenied): 573 await self.connect(username='user') 574 575 576@unittest.skipUnless(gss_available, 'GSS not available') 577@patch_gss 578class _TestGSSFQDN(ServerTestCase): 579 """Unit tests for GSS server error""" 580 581 @classmethod 582 async def start_server(cls): 583 """Start an SSH server which raises an error on GSS authentication""" 584 585 def mock_gethostname(): 586 """Return a non-fully-qualified hostname""" 587 588 return 'host' 589 590 def mock_getfqdn(): 591 """Confirm getfqdn is called on relative hostnames""" 592 593 return '1' 594 595 with patch('socket.gethostname', mock_gethostname): 596 with patch('socket.getfqdn', mock_getfqdn): 597 return await cls.create_server(gss_host=()) 598 599 @asynctest 600 async def test_gss_fqdn_lookup(self): 601 """Test GSS FQDN lookup""" 602 603 async with self.connect(username='user', gss_host=()): 604 pass 605 606 607@patch_getnameinfo 608class _TestHostBasedAuth(ServerTestCase): 609 """Unit tests for host-based authentication""" 610 611 @classmethod 612 async def start_server(cls): 613 """Start an SSH server which supports host-based authentication""" 614 615 return await cls.create_server( 616 _HostBasedServer, known_client_hosts='known_hosts') 617 618 @asynctest 619 async def test_client_host_auth(self): 620 """Test connecting with host-based authentication""" 621 622 async with self.connect(username='user', client_host_keys='skey', 623 client_username='user'): 624 pass 625 626 @asynctest 627 async def test_client_host_auth_disabled(self): 628 """Test connecting with host-based authentication disabled""" 629 630 with self.assertRaises(asyncssh.PermissionDenied): 631 await self.connect(username='user', client_host_keys='skey', 632 client_username='user', host_based_auth=False) 633 634 @asynctest 635 async def test_client_host_key_bytes(self): 636 """Test client host key passed in as bytes""" 637 638 with open('skey', 'rb') as f: 639 skey = f.read() 640 641 async with self.connect(username='user', client_host_keys=[skey], 642 client_username='user'): 643 pass 644 645 @asynctest 646 async def test_client_host_key_sshkey(self): 647 """Test client host key passed in as an SSHKey""" 648 649 skey = asyncssh.read_private_key('skey') 650 651 async with self.connect(username='user', client_host_keys=[skey], 652 client_username='user'): 653 pass 654 655 @asynctest 656 async def test_client_host_key_keypairs(self): 657 """Test client host keys passed in as a list of SSHKeyPairs""" 658 659 keys = asyncssh.load_keypairs('skey') 660 661 async with self.connect(username='user', client_host_keys=keys, 662 client_username='user'): 663 pass 664 665 @asynctest 666 async def test_client_host_signature_algs(self): 667 """Test host based authentication with specific signature algorithms""" 668 669 for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'): 670 async with self.connect(username='user', client_host_keys='skey', 671 client_username='user', 672 signature_algs=[alg]): 673 pass 674 675 @asynctest 676 async def test_no_server_signature_algs(self): 677 """Test a server which doesn't advertise signature algorithms""" 678 679 def skip_ext_info(self): 680 """Don't send extension information""" 681 682 # pylint: disable=unused-argument 683 684 return [] 685 686 with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg', 687 skip_ext_info): 688 async with self.connect(username='user', client_host_keys='skey', 689 client_username='user'): 690 pass 691 692 @asynctest 693 async def test_untrusted_client_host_key(self): 694 """Test untrusted client host key""" 695 696 with self.assertRaises(asyncssh.PermissionDenied): 697 await self.connect(username='user', client_host_keys='ckey', 698 client_username='user') 699 700 @asynctest 701 async def test_missing_cert(self): 702 """Test missing client host certificate""" 703 704 with self.assertRaises(OSError): 705 await self.connect(username='user', 706 client_host_keys=[('skey', 'xxx')], 707 client_username='user') 708 709 @asynctest 710 async def test_invalid_client_host_signature(self): 711 """Test invalid client host signature""" 712 713 with patch('asyncssh.connection.SSHServerConnection', 714 _FailValidateHostSSHServerConnection): 715 with self.assertRaises(asyncssh.PermissionDenied): 716 await self.connect(username='user', client_host_keys='skey', 717 client_username='user') 718 719 @asynctest 720 async def test_client_host_trailing_dot(self): 721 """Test stripping of trailing dot from client host""" 722 723 async with self.connect(username='user', client_host_keys='skey', 724 client_host='localhost.', 725 client_username='user'): 726 pass 727 728 @asynctest 729 async def test_mismatched_client_host(self): 730 """Test ignoring of mismatched client host due to canonicalization""" 731 732 async with self.connect(username='user', client_host_keys='skey', 733 client_host='xxx', client_username='user'): 734 pass 735 736 @asynctest 737 async def test_mismatched_client_username(self): 738 """Test mismatched client username""" 739 740 with self.assertRaises(asyncssh.PermissionDenied): 741 await self.connect(username='user', client_host_keys='skey', 742 client_username='xxx') 743 744 @asynctest 745 async def test_invalid_client_username(self): 746 """Test invalid client username""" 747 748 with patch('asyncssh.connection.SSHClientConnection', 749 _InvalidUsernameClientConnection): 750 with self.assertRaises(asyncssh.ProtocolError): 751 await self.connect(username='user', client_host_keys='skey') 752 753 @asynctest 754 async def test_expired_cert(self): 755 """Test expired certificate""" 756 757 ckey = asyncssh.read_private_key('ckey') 758 skey = asyncssh.read_private_key('skey') 759 760 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 761 CERT_TYPE_HOST, ckey, skey, ['localhost'], 762 valid_before=1) 763 764 with self.assertRaises(asyncssh.PermissionDenied): 765 await self.connect(username='user', client_host_keys=[(ckey, cert)], 766 client_username='user') 767 768 @asynctest 769 async def test_untrusted_ca(self): 770 """Test untrusted CA""" 771 772 ckey = asyncssh.read_private_key('ckey') 773 774 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 775 CERT_TYPE_HOST, ckey, ckey, ['localhost']) 776 777 with self.assertRaises(asyncssh.PermissionDenied): 778 await self.connect(username='user', client_host_keys=[(ckey, cert)], 779 client_username='user') 780 781 @asynctest 782 async def test_disabled_trivial_client_host_auth(self): 783 """Test disabling trivial auth with host-based authentication""" 784 785 with self.assertRaises(asyncssh.PermissionDenied): 786 await self.connect(username='user', client_host_keys='skey', 787 client_username='user', 788 disable_trivial_auth=True) 789 790 791@patch_getnameinfo 792class _TestCallbackHostBasedAuth(ServerTestCase): 793 """Unit tests for host-based authentication using callback""" 794 795 @classmethod 796 async def start_server(cls): 797 """Start an SSH server which supports host-based authentication""" 798 799 def server_factory(): 800 """Return an SSHServer which can validate the client host key""" 801 802 return _HostBasedServer(host_key='skey.pub', ca_key='skey.pub') 803 804 return await cls.create_server(server_factory) 805 806 @asynctest 807 async def test_validate_client_host_callback(self): 808 """Test using callback to validate client host key""" 809 810 async with self.connect(username='user', 811 client_host_keys=[('skey', None)], 812 client_username='user'): 813 pass 814 815 @asynctest 816 async def test_validate_client_host_ca_callback(self): 817 """Test using callback to validate client host CA key""" 818 819 async with self.connect(username='user', client_host_keys='skey', 820 client_username='user'): 821 pass 822 823 @asynctest 824 async def test_untrusted_client_host_callback(self): 825 """Test callback to validate client host key returning failure""" 826 827 with self.assertRaises(asyncssh.PermissionDenied): 828 await self.connect(username='user', 829 client_host_keys=[('ckey', None)], 830 client_username='user') 831 832 @asynctest 833 async def test_untrusted_client_host_ca_callback(self): 834 """Test callback to validate client host CA key returning failure""" 835 836 with self.assertRaises(asyncssh.PermissionDenied): 837 await self.connect(username='user', client_host_keys='ckey', 838 client_username='user') 839 840 841@patch_getnameinfo 842class _TestKeysignHostBasedAuth(ServerTestCase): 843 """Unit tests for host-based authentication using ssh-keysign""" 844 845 @classmethod 846 async def start_server(cls): 847 """Start an SSH server which supports host-based authentication""" 848 849 return await cls.create_server(_HostBasedServer, 850 known_client_hosts='known_hosts') 851 852 @async_context_manager 853 async def _connect_keysign(self, client_host_keysign=True, 854 client_host_keys=None, keysign_dirs=('.',)): 855 """Open a connection to test host-based auth using ssh-keysign""" 856 857 with patch('asyncio.create_subprocess_exec', 858 create_subprocess_exec_stub): 859 with patch('asyncssh.keysign._DEFAULT_KEYSIGN_DIRS', keysign_dirs): 860 with patch('asyncssh.public_key._DEFAULT_HOST_KEY_DIRS', ['.']): 861 with patch('asyncssh.public_key._DEFAULT_HOST_KEY_FILES', 862 ['skey', 'xxx']): 863 return await self.connect( 864 username='user', 865 client_host_keysign=client_host_keysign, 866 client_host_keys=client_host_keys, 867 client_username='user') 868 869 @asynctest 870 async def test_keysign(self): 871 """Test host-based authentication using ssh-keysign""" 872 873 async with self._connect_keysign(): 874 pass 875 876 @asynctest 877 async def test_explciit_keysign(self): 878 """Test ssh-keysign with an explicit path""" 879 880 async with self._connect_keysign(client_host_keysign='.'): 881 pass 882 883 @asynctest 884 async def test_keysign_explicit_host_keys(self): 885 """Test ssh-keysign with explicit host public keys""" 886 887 async with self._connect_keysign(client_host_keys='skey.pub'): 888 pass 889 890 @asynctest 891 async def test_invalid_keysign_response(self): 892 """Test invalid ssh-keysign response""" 893 894 with patch('asyncssh.keysign.KEYSIGN_VERSION', 0): 895 with self.assertRaises(asyncssh.PermissionDenied): 896 await self._connect_keysign() 897 898 @asynctest 899 async def test_keysign_error(self): 900 """Test ssh-keysign error response""" 901 902 with patch('asyncssh.keysign.KEYSIGN_VERSION', 1): 903 with self.assertRaises(asyncssh.PermissionDenied): 904 await self._connect_keysign() 905 906 @asynctest 907 async def test_invalid_keysign_version(self): 908 """Test invalid version in ssh-keysign request""" 909 910 with patch('asyncssh.keysign.KEYSIGN_VERSION', 99): 911 with self.assertRaises(asyncssh.PermissionDenied): 912 await self._connect_keysign() 913 914 @asynctest 915 async def test_keysign_not_found(self): 916 """Test ssh-keysign executable not being found""" 917 918 with self.assertRaises(ValueError): 919 await self._connect_keysign(keysign_dirs=()) 920 921 @asynctest 922 async def test_explicit_keysign_not_found(self): 923 """Test explicit ssh-keysign executable not being found""" 924 925 with self.assertRaises(ValueError): 926 await self._connect_keysign(client_host_keysign='xxx') 927 928 @asynctest 929 async def test_keysign_dir_not_present(self): 930 """Test ssh-keysign executable not in a keysign dir""" 931 932 with self.assertRaises(ValueError): 933 await self._connect_keysign(keysign_dirs=('xxx',)) 934 935 936@patch_getnameinfo 937class _TestHostBasedAsyncServerAuth(_TestHostBasedAuth): 938 """Unit tests for host-based authentication with async server callbacks""" 939 940 @classmethod 941 async def start_server(cls): 942 """Start an SSH server which supports async host-based auth""" 943 944 return await cls.create_server(_AsyncHostBasedServer, 945 known_client_hosts='known_hosts', 946 trust_client_host=True) 947 948 @asynctest 949 async def test_mismatched_client_host(self): 950 """Test mismatch of trusted client host""" 951 952 with self.assertRaises(asyncssh.PermissionDenied): 953 await self.connect(username='user', client_host_keys='skey', 954 client_host='xxx', client_username='user') 955 956 957@patch_getnameinfo 958class _TestLimitedHostBasedSignatureAlgs(ServerTestCase): 959 """Unit tests for limited host key signature algorithms""" 960 961 @classmethod 962 async def start_server(cls): 963 """Start an SSH server which supports host-based authentication""" 964 965 return await cls.create_server( 966 _HostBasedServer, known_client_hosts='known_hosts', 967 signature_algs=['ssh-rsa', 'rsa-sha2-512']) 968 969 @asynctest 970 async def test_mismatched_host_signature_algs(self): 971 """Test mismatched host key signature algorithms""" 972 973 with self.assertRaises(asyncssh.PermissionDenied): 974 await self.connect(username='ckey', client_host_keys='skey', 975 client_username='user', 976 signature_algs=['rsa-sha2-256']) 977 978 @asynctest 979 async def test_host_signature_alg_fallback(self): 980 """Test fall back to default host key signature algorithm""" 981 982 async with self.connect(username='ckey', client_host_keys='skey', 983 client_username='user', 984 signature_algs=['rsa-sha2-256', 'ssh-rsa']): 985 pass 986 987 988class _TestPublicKeyAuth(ServerTestCase): 989 """Unit tests for public key authentication""" 990 991 @classmethod 992 async def start_server(cls): 993 """Start an SSH server which supports public key authentication""" 994 995 return await cls.create_server( 996 _PublicKeyServer, authorized_client_keys='authorized_keys') 997 998 @async_context_manager 999 async def _connect_publickey(self, keylist, test_async=False): 1000 """Open a connection to test public key auth""" 1001 1002 def client_factory(): 1003 """Return an SSHClient to use to do public key auth""" 1004 1005 cls = _AsyncPublicKeyClient if test_async else _PublicKeyClient 1006 return cls(keylist) 1007 1008 conn, _ = await self.create_connection(client_factory, username='ckey', 1009 client_keys=None) 1010 1011 return conn 1012 1013 @asynctest 1014 async def test_encrypted_client_key(self): 1015 """Test public key auth with encrypted client key""" 1016 1017 async with self.connect(username='ckey', client_keys='ckey_encrypted', 1018 passphrase='passphrase'): 1019 pass 1020 1021 @asynctest 1022 async def test_encrypted_client_key_bad_passphrase(self): 1023 """Test wrong passphrase for encrypted client key""" 1024 1025 with self.assertRaises(asyncssh.KeyEncryptionError): 1026 await self.connect(username='ckey', client_keys='ckey_encrypted', 1027 passphrase='xxx') 1028 1029 @asynctest 1030 async def test_encrypted_client_key_missing_passphrase(self): 1031 """Test missing passphrase for encrypted client key""" 1032 1033 with self.assertRaises(asyncssh.KeyImportError): 1034 await self.connect(username='ckey', client_keys='ckey_encrypted') 1035 1036 @asynctest 1037 async def test_client_certs(self): 1038 """Test trusted client certificate via client_certs""" 1039 1040 async with self.connect(username='ckey', client_keys='ckey', 1041 client_certs='ckey-cert.pub'): 1042 pass 1043 1044 @asynctest 1045 async def test_agent_auth(self): 1046 """Test connecting with ssh-agent authentication""" 1047 1048 if not self.agent_available(): # pragma: no cover 1049 self.skipTest('ssh-agent not available') 1050 1051 async with self.connect(username='ckey'): 1052 pass 1053 1054 @asynctest 1055 async def test_agent_identities(self): 1056 """Test connecting with ssh-agent auth with specific identities""" 1057 1058 if not self.agent_available(): # pragma: no cover 1059 self.skipTest('ssh-agent not available') 1060 1061 ckey = asyncssh.read_private_key('ckey') 1062 ckey.write_private_key('ckey.pem', 'pkcs8-pem') 1063 1064 ckey_cert = asyncssh.read_certificate('ckey-cert.pub') 1065 ckey_ecdsa = asyncssh.read_public_key('ckey_ecdsa.pub') 1066 1067 for pubkey in ('ckey-cert.pub', 'ckey_ecdsa.pub', 'ckey.pem', 1068 ckey_cert, ckey_ecdsa, ckey_ecdsa.public_data): 1069 async with self.connect(username='ckey', agent_identities=pubkey): 1070 pass 1071 1072 @asynctest 1073 async def test_agent_identities_config(self): 1074 """Test connecting with ssh-agent auth and IdentitiesOnly config""" 1075 1076 if not self.agent_available(): # pragma: no cover 1077 self.skipTest('ssh-agent not available') 1078 1079 write_file('ckey_err', b'') 1080 1081 write_file('config', 'IdentitiesOnly True\n' 1082 'IdentityFile ckey-cert.pub\n' 1083 'IdentityFile ckey_ecdsa.pub\n' 1084 'IdentityFile ckey_err\n', 'w') 1085 1086 async with self.connect(username='ckey', config='config'): 1087 pass 1088 1089 @asynctest 1090 async def test_agent_identities_config_default_keys(self): 1091 """Test connecting with ssh-agent auth and default IdentitiesOnly""" 1092 1093 if not self.agent_available(): # pragma: no cover 1094 self.skipTest('ssh-agent not available') 1095 1096 write_file('config', 'IdentitiesOnly True\n', 'w') 1097 1098 async with self.connect(username='ckey', config='config'): 1099 pass 1100 1101 @asynctest 1102 async def test_agent_signature_algs(self): 1103 """Test ssh-agent keys with specific signature algorithms""" 1104 1105 if not self.agent_available(): # pragma: no cover 1106 self.skipTest('ssh-agent not available') 1107 1108 for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'): 1109 async with self.connect(username='ckey', signature_algs=[alg]): 1110 pass 1111 1112 @asynctest 1113 async def test_agent_auth_failure(self): 1114 """Test failure connecting with ssh-agent authentication""" 1115 1116 if not self.agent_available(): # pragma: no cover 1117 self.skipTest('ssh-agent not available') 1118 1119 with patch.dict(os.environ, HOME='xxx'): 1120 with self.assertRaises(asyncssh.PermissionDenied): 1121 await self.connect(username='ckey', agent_path='xxx', 1122 known_hosts='.ssh/known_hosts') 1123 1124 @asynctest 1125 async def test_agent_auth_unset(self): 1126 """Test connecting with no local keys and no ssh-agent configured""" 1127 1128 with patch.dict(os.environ, HOME='xxx', SSH_AUTH_SOCK=''): 1129 with self.assertRaises(asyncssh.PermissionDenied): 1130 await self.connect(username='ckey', 1131 known_hosts='.ssh/known_hosts') 1132 1133 @asynctest 1134 async def test_public_key_auth(self): 1135 """Test connecting with public key authentication""" 1136 1137 async with self.connect(username='ckey', client_keys='ckey'): 1138 pass 1139 1140 @asynctest 1141 async def test_public_key_auth_disabled(self): 1142 """Test connecting with public key authentication disabled""" 1143 1144 with self.assertRaises(asyncssh.PermissionDenied): 1145 await self.connect(username='ckey', client_keys='ckey', 1146 public_key_auth=False) 1147 1148 @asynctest 1149 async def test_public_key_auth_not_preferred(self): 1150 """Test public key authentication not being in preferred auth list""" 1151 1152 with self.assertRaises(asyncssh.PermissionDenied): 1153 await self.connect(username='ckey', client_keys='ckey', 1154 preferred_auth='password') 1155 1156 @asynctest 1157 async def test_public_key_signature_algs(self): 1158 """Test public key authentication with specific signature algorithms""" 1159 1160 for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'): 1161 async with self.connect(username='ckey', client_keys='ckey', 1162 signature_algs=[alg]): 1163 pass 1164 1165 @asynctest 1166 async def test_no_server_signature_algs(self): 1167 """Test a server which doesn't advertise signature algorithms""" 1168 1169 def skip_ext_info(self): 1170 """Don't send extension information""" 1171 1172 # pylint: disable=unused-argument 1173 1174 return [] 1175 1176 with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg', 1177 skip_ext_info): 1178 async with self.connect(username='ckey', client_keys='ckey', 1179 agent_path=None): 1180 pass 1181 1182 @asynctest 1183 async def test_default_public_key_auth(self): 1184 """Test connecting with default public key authentication""" 1185 1186 async with self.connect(username='ckey', agent_path=None): 1187 pass 1188 1189 @asynctest 1190 async def test_invalid_default_key(self): 1191 """Test connecting with invalid default client key""" 1192 1193 key_path = os.path.join('.ssh', 'id_dsa') 1194 with open(key_path, 'w') as f: 1195 f.write('-----XXX-----') 1196 1197 with self.assertRaises(asyncssh.KeyImportError): 1198 await self.connect(username='ckey', agent_path=None) 1199 1200 os.remove(key_path) 1201 1202 @asynctest 1203 async def test_client_key_bytes(self): 1204 """Test client key passed in as bytes""" 1205 1206 with open('ckey', 'rb') as f: 1207 ckey = f.read() 1208 1209 async with self.connect(username='ckey', client_keys=[ckey]): 1210 pass 1211 1212 @asynctest 1213 async def test_client_key_sshkey(self): 1214 """Test client key passed in as an SSHKey""" 1215 1216 ckey = asyncssh.read_private_key('ckey') 1217 1218 async with self.connect(username='ckey', client_keys=[ckey]): 1219 pass 1220 1221 @asynctest 1222 async def test_client_key_keypairs(self): 1223 """Test client keys passed in as a list of SSHKeyPairs""" 1224 1225 keys = asyncssh.load_keypairs('ckey') 1226 1227 async with self.connect(username='ckey', client_keys=keys): 1228 pass 1229 1230 @asynctest 1231 async def test_client_key_agent_keypairs(self): 1232 """Test client keys passed in as a list of SSHAgentKeyPairs""" 1233 1234 if not self.agent_available(): # pragma: no cover 1235 self.skipTest('ssh-agent not available') 1236 1237 async with asyncssh.connect_agent() as agent: 1238 for key in await agent.get_keys(): 1239 async with self.connect(username='ckey', client_keys=[key]): 1240 pass 1241 1242 @asynctest 1243 async def test_keypair_with_replaced_cert(self): 1244 """Test connecting with a keypair with replaced cert""" 1245 1246 ckey = asyncssh.load_keypairs(['ckey'])[0] 1247 1248 async with self.connect(username='ckey', 1249 client_keys=[(ckey, 'ckey-cert.pub')]): 1250 pass 1251 1252 @asynctest 1253 async def test_agent_keypair_with_replaced_cert(self): 1254 """Test connecting with sn agent key with replaced cert""" 1255 1256 if not self.agent_available(): # pragma: no cover 1257 self.skipTest('ssh-agent not available') 1258 1259 async with asyncssh.connect_agent() as agent: 1260 ckey = (await agent.get_keys())[2] 1261 1262 async with self.connect(username='ckey', 1263 client_keys=[(ckey, 'ckey-cert.pub')]): 1264 pass 1265 1266 @asynctest 1267 async def test_untrusted_client_key(self): 1268 """Test untrusted client key""" 1269 1270 with self.assertRaises(asyncssh.PermissionDenied): 1271 await self.connect(username='ckey', client_keys='skey', 1272 agent_path=None) 1273 1274 @asynctest 1275 async def test_missing_cert(self): 1276 """Test missing client certificate""" 1277 1278 with self.assertRaises(OSError): 1279 await self.connect(username='ckey', client_keys=[('ckey', 'xxx')]) 1280 1281 @asynctest 1282 async def test_expired_cert(self): 1283 """Test expired certificate""" 1284 1285 ckey = asyncssh.read_private_key('ckey') 1286 skey = asyncssh.read_private_key('skey') 1287 1288 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1289 CERT_TYPE_USER, skey, ckey, ['ckey'], 1290 valid_before=1) 1291 1292 with self.assertRaises(asyncssh.PermissionDenied): 1293 await self.connect(username='ckey', client_keys=[(skey, cert)], 1294 agent_path=None) 1295 1296 @asynctest 1297 async def test_allowed_address(self): 1298 """Test allowed address in certificate""" 1299 1300 ckey = asyncssh.read_private_key('ckey') 1301 skey = asyncssh.read_private_key('skey') 1302 1303 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1304 CERT_TYPE_USER, skey, ckey, ['ckey'], 1305 options={'source-address': 1306 String('0.0.0.0/0,::/0')}) 1307 1308 async with self.connect(username='ckey', client_keys=[(skey, cert)]): 1309 pass 1310 1311 @asynctest 1312 async def test_disallowed_address(self): 1313 """Test disallowed address in certificate""" 1314 1315 ckey = asyncssh.read_private_key('ckey') 1316 skey = asyncssh.read_private_key('skey') 1317 1318 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1319 CERT_TYPE_USER, skey, ckey, ['ckey'], 1320 options={'source-address': String('0.0.0.0')}) 1321 1322 with self.assertRaises(asyncssh.PermissionDenied): 1323 await self.connect(username='ckey', client_keys=[(skey, cert)], 1324 agent_path=None) 1325 1326 @asynctest 1327 async def test_untrusted_ca(self): 1328 """Test untrusted CA""" 1329 1330 skey = asyncssh.read_private_key('skey') 1331 1332 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1333 CERT_TYPE_USER, skey, skey, ['skey']) 1334 1335 with self.assertRaises(asyncssh.PermissionDenied): 1336 await self.connect(username='ckey', client_keys=[(skey, cert)], 1337 agent_path=None) 1338 1339 @asynctest 1340 async def test_mismatched_ca(self): 1341 """Test mismatched CA""" 1342 1343 ckey = asyncssh.read_private_key('ckey') 1344 skey = asyncssh.read_private_key('skey') 1345 1346 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1347 CERT_TYPE_USER, skey, skey, ['skey']) 1348 1349 with self.assertRaises(ValueError): 1350 await self.connect(username='ckey', client_keys=[(ckey, cert)]) 1351 1352 @asynctest 1353 async def test_callback(self): 1354 """Test connecting with public key authentication using callback""" 1355 1356 async with self._connect_publickey(['ckey'], test_async=True): 1357 pass 1358 1359 @asynctest 1360 async def test_callback_sshkeypair(self): 1361 """Test client key passed in as an SSHKeyPair by callback""" 1362 1363 if not self.agent_available(): # pragma: no cover 1364 self.skipTest('ssh-agent not available') 1365 1366 async with asyncssh.connect_agent() as agent: 1367 keylist = await agent.get_keys() 1368 1369 async with self._connect_publickey(keylist): 1370 pass 1371 1372 @asynctest 1373 async def test_callback_untrusted_client_key(self): 1374 """Test failure connecting with public key authentication callback""" 1375 1376 with self.assertRaises(asyncssh.PermissionDenied): 1377 await self._connect_publickey(['skey']) 1378 1379 @asynctest 1380 async def test_unknown_auth(self): 1381 """Test server returning an unknown auth method before public key""" 1382 1383 with patch('asyncssh.connection.SSHClientConnection', 1384 _UnknownAuthClientConnection): 1385 async with self.connect(username='ckey', client_keys='ckey', 1386 agent_path=None): 1387 pass 1388 1389 @asynctest 1390 async def test_disabled_trivial_public_key_auth(self): 1391 """Test disabling trivial auth with public key authentication""" 1392 1393 async with self.connect(username='ckey', agent_path=None, 1394 disable_trivial_auth=True): 1395 pass 1396 1397 1398class _TestPublicKeyAsyncServerAuth(_TestPublicKeyAuth): 1399 """Unit tests for public key authentication with async server callbacks""" 1400 1401 @classmethod 1402 async def start_server(cls): 1403 """Start an SSH server which supports async public key auth""" 1404 1405 def server_factory(): 1406 """Return an SSH server which trusts specific client keys""" 1407 1408 return _AsyncPublicKeyServer(client_keys=['ckey.pub', 1409 'ckey_ecdsa.pub']) 1410 1411 return await cls.create_server(server_factory) 1412 1413 1414class _TestLimitedPublicKeySignatureAlgs(ServerTestCase): 1415 """Unit tests for limited public key signature algorithms""" 1416 1417 @classmethod 1418 async def start_server(cls): 1419 """Start an SSH server which supports public key authentication""" 1420 1421 return await cls.create_server( 1422 _PublicKeyServer, authorized_client_keys='authorized_keys', 1423 signature_algs=['ssh-rsa', 'rsa-sha2-512']) 1424 1425 @asynctest 1426 async def test_mismatched_client_signature_algs(self): 1427 """Test mismatched client key signature algorithms""" 1428 1429 with self.assertRaises(asyncssh.PermissionDenied): 1430 await self.connect(username='ckey', client_keys='ckey', 1431 signature_algs=['rsa-sha2-256']) 1432 1433 @asynctest 1434 async def test_client_signature_alg_fallback(self): 1435 """Test fall back to default client key signature algorithm""" 1436 1437 async with self.connect(username='ckey', client_keys='ckey', 1438 signature_algs=['rsa-sha2-256', 'ssh-rsa']): 1439 pass 1440 1441 1442class _TestSetAuthorizedKeys(ServerTestCase): 1443 """Unit tests for public key authentication with set_authorized_keys""" 1444 1445 @classmethod 1446 async def start_server(cls): 1447 """Start an SSH server which supports public key authentication""" 1448 1449 def server_factory(): 1450 """Return an SSH server which calls set_authorized_keys""" 1451 1452 return _PublicKeyServer(authorized_keys='authorized_keys') 1453 1454 return await cls.create_server(server_factory) 1455 1456 @asynctest 1457 async def test_set_authorized_keys(self): 1458 """Test set_authorized_keys method on server""" 1459 1460 async with self.connect(username='ckey', client_keys='ckey'): 1461 pass 1462 1463 @asynctest 1464 async def test_cert_principals(self): 1465 """Test certificate principals check""" 1466 1467 ckey = asyncssh.read_private_key('ckey') 1468 1469 cert = make_certificate('ssh-rsa-cert-v01@openssh.com', 1470 CERT_TYPE_USER, ckey, ckey, ['ckey']) 1471 1472 async with self.connect(username='ckey', client_keys=[(ckey, cert)]): 1473 pass 1474 1475 1476class _TestPreloadedAuthorizedKeys(ServerTestCase): 1477 """Unit tests for authentication with pre-loaded authorized keys""" 1478 1479 @classmethod 1480 async def start_server(cls): 1481 """Start an SSH server which supports public key authentication""" 1482 1483 def server_factory(): 1484 """Return an SSH server which calls set_authorized_keys""" 1485 1486 authorized_keys = asyncssh.read_authorized_keys('authorized_keys') 1487 return _PublicKeyServer(authorized_keys=authorized_keys) 1488 1489 return await cls.create_server(server_factory) 1490 1491 @asynctest 1492 async def test_pre_loaded_authorized_keys(self): 1493 """Test pre-loaded authorized keys file""" 1494 1495 async with self.connect(username='ckey', client_keys='ckey'): 1496 pass 1497 1498 1499class _TestPreloadedAuthorizedKeysFileList(ServerTestCase): 1500 """Unit tests with pre-loaded authorized keys file list""" 1501 1502 @classmethod 1503 async def start_server(cls): 1504 """Start an SSH server which supports public key authentication""" 1505 1506 def server_factory(): 1507 """Return an SSH server which calls set_authorized_keys""" 1508 1509 authorized_keys = asyncssh.read_authorized_keys(['authorized_keys']) 1510 return _PublicKeyServer(authorized_keys=authorized_keys) 1511 1512 return await cls.create_server(server_factory) 1513 1514 @asynctest 1515 async def test_pre_loaded_authorized_keys(self): 1516 """Test pre-loaded authorized keys file list""" 1517 1518 async with self.connect(username='ckey', client_keys='ckey'): 1519 pass 1520 1521 1522@unittest.skipUnless(x509_available, 'X.509 not available') 1523class _TestX509Auth(ServerTestCase): 1524 """Unit tests for X.509 certificate authentication""" 1525 1526 @classmethod 1527 async def start_server(cls): 1528 """Start an SSH server which supports public key authentication""" 1529 1530 return await cls.create_server( 1531 _PublicKeyServer, authorized_client_keys='authorized_keys_x509') 1532 1533 @asynctest 1534 async def test_x509_self(self): 1535 """Test connecting with X.509 self-signed certificate""" 1536 1537 async with self.connect(username='ckey', 1538 client_keys=['ckey_x509_self']): 1539 pass 1540 1541 @asynctest 1542 async def test_x509_chain(self): 1543 """Test connecting with X.509 certificate chain""" 1544 1545 async with self.connect(username='ckey', 1546 client_keys=['ckey_x509_chain']): 1547 pass 1548 1549 @asynctest 1550 async def test_keypair_with_x509_cert(self): 1551 """Test connecting with a keypair with replaced X.509 cert""" 1552 1553 ckey = asyncssh.load_keypairs(['ckey'])[0] 1554 1555 async with self.connect(username='ckey', 1556 client_keys=[(ckey, 'ckey_x509_chain')]): 1557 pass 1558 1559 @asynctest 1560 async def test_agent_keypair_with_x509_cert(self): 1561 """Test connecting with sn agent key with replaced X.509 cert""" 1562 1563 if not self.agent_available(): # pragma: no cover 1564 self.skipTest('ssh-agent not available') 1565 1566 async with asyncssh.connect_agent() as agent: 1567 ckey = (await agent.get_keys())[2] 1568 1569 async with self.connect(username='ckey', 1570 client_keys=[(ckey, 'ckey_x509_chain')]): 1571 pass 1572 1573 @asynctest 1574 async def test_x509_incomplete_chain(self): 1575 """Test connecting with incomplete X.509 certificate chain""" 1576 1577 with self.assertRaises(asyncssh.PermissionDenied): 1578 await self.connect(username='ckey', 1579 client_keys=[('ckey_x509_chain', 1580 'ckey_x509_partial.pem')]) 1581 1582 @asynctest 1583 async def test_x509_untrusted_cert(self): 1584 """Test connecting with untrusted X.509 certificate chain""" 1585 1586 with self.assertRaises(asyncssh.PermissionDenied): 1587 await self.connect(username='ckey', client_keys=['skey_x509_chain']) 1588 1589 @asynctest 1590 async def test_disabled_trivial_x509_auth(self): 1591 """Test disabling trivial auth with X.509 certificate authentication""" 1592 1593 async with self.connect(username='ckey', 1594 client_keys=['ckey_x509_self'], 1595 disable_trivial_auth=True): 1596 pass 1597 1598 1599@unittest.skipUnless(x509_available, 'X.509 not available') 1600class _TestX509AuthDisabled(ServerTestCase): 1601 """Unit tests for disabled X.509 certificate authentication""" 1602 1603 @classmethod 1604 async def start_server(cls): 1605 """Start an SSH server which doesn't support X.509 authentication""" 1606 1607 return await cls.create_server( 1608 _PublicKeyServer, x509_trusted_certs=None, 1609 authorized_client_keys='authorized_keys') 1610 1611 @asynctest 1612 async def test_failed_x509_auth(self): 1613 """Test connect failure with X.509 certificate""" 1614 1615 with self.assertRaises(asyncssh.PermissionDenied): 1616 await self.connect(username='ckey', client_keys=['ckey_x509_self'], 1617 signature_algs=['x509v3-ssh-rsa']) 1618 1619 @asynctest 1620 async def test_non_x509(self): 1621 """Test connecting without an X.509 certificate""" 1622 1623 async with self.connect(username='ckey', client_keys=['ckey']): 1624 pass 1625 1626 1627@unittest.skipUnless(x509_available, 'X.509 not available') 1628class _TestX509Subject(ServerTestCase): 1629 """Unit tests for X.509 certificate authentication by subject name""" 1630 1631 @classmethod 1632 async def start_server(cls): 1633 """Start an SSH server which supports public key authentication""" 1634 1635 authorized_keys = asyncssh.import_authorized_keys( 1636 'x509v3-ssh-rsa subject=OU=name\n') 1637 1638 return await cls.create_server( 1639 _PublicKeyServer, authorized_client_keys=authorized_keys, 1640 x509_trusted_certs=['ckey_x509_self.pub']) 1641 1642 @asynctest 1643 async def test_x509_subject(self): 1644 """Test authenticating X.509 certificate by subject name""" 1645 1646 async with self.connect(username='ckey', 1647 client_keys=['ckey_x509_self']): 1648 pass 1649 1650 1651@unittest.skipUnless(x509_available, 'X.509 not available') 1652class _TestX509Untrusted(ServerTestCase): 1653 """Unit tests for X.509 authentication with no trusted certificates""" 1654 1655 @classmethod 1656 async def start_server(cls): 1657 """Start an SSH server which supports public key authentication""" 1658 1659 return await cls.create_server(_PublicKeyServer, 1660 authorized_client_keys=None) 1661 1662 @asynctest 1663 async def test_x509_untrusted(self): 1664 """Test untrusted X.509 self-signed certificate""" 1665 1666 with self.assertRaises(asyncssh.PermissionDenied): 1667 await self.connect(username='ckey', client_keys=['ckey_x509_self']) 1668 1669 1670@unittest.skipUnless(x509_available, 'X.509 not available') 1671class _TestX509Disabled(ServerTestCase): 1672 """Unit tests for X.509 authentication with server support disabled""" 1673 1674 @classmethod 1675 async def start_server(cls): 1676 """Start an SSH server with X.509 authentication disabled""" 1677 1678 return await cls.create_server(_PublicKeyServer, x509_purposes=None) 1679 1680 @asynctest 1681 async def test_x509_disabled(self): 1682 """Test X.509 client certificate with server support disabled""" 1683 1684 with self.assertRaises(asyncssh.PermissionDenied): 1685 await self.connect(username='ckey', client_keys='skey_x509_self') 1686 1687 1688class _TestPasswordAuth(ServerTestCase): 1689 """Unit tests for password authentication""" 1690 1691 @classmethod 1692 async def start_server(cls): 1693 """Start an SSH server which supports password authentication""" 1694 1695 return await cls.create_server(_PasswordServer) 1696 1697 @async_context_manager 1698 async def _connect_password(self, username, password, old_password='', 1699 new_password='', disable_trivial_auth=False, 1700 test_async=False): 1701 """Open a connection to test password authentication""" 1702 1703 def client_factory(): 1704 """Return an SSHClient to use to do password change""" 1705 1706 cls = _AsyncPasswordClient if test_async else _PasswordClient 1707 return cls(password, old_password, new_password) 1708 1709 conn, _ = await self.create_connection( 1710 client_factory, username=username, client_keys=None, 1711 disable_trivial_auth=disable_trivial_auth) 1712 1713 return conn 1714 1715 @asynctest 1716 async def test_password_auth(self): 1717 """Test connecting with password authentication""" 1718 1719 async with self.connect(username='pw', password='pw', client_keys=None): 1720 pass 1721 1722 @asynctest 1723 async def test_password_auth_disabled(self): 1724 """Test connecting with password authentication disabled""" 1725 1726 with self.assertRaises(asyncssh.PermissionDenied): 1727 await self.connect(username='pw', password='kbdint', 1728 password_auth=False, preferred_auth='password') 1729 1730 @asynctest 1731 async def test_password_auth_failure(self): 1732 """Test _failure connecting with password authentication""" 1733 1734 with self.assertRaises(asyncssh.PermissionDenied): 1735 await self.connect(username='pw', password='badpw', 1736 client_keys=None) 1737 1738 @asynctest 1739 async def test_password_auth_callback(self): 1740 """Test connecting with password authentication callback""" 1741 1742 async with self._connect_password('pw', 'pw', test_async=True): 1743 pass 1744 1745 @asynctest 1746 async def test_password_auth_callback_failure(self): 1747 """Test failure connecting with password authentication callback""" 1748 1749 with self.assertRaises(asyncssh.PermissionDenied): 1750 await self._connect_password('pw', 'badpw') 1751 1752 @asynctest 1753 async def test_password_change(self): 1754 """Test password change""" 1755 1756 async with self._connect_password('pw', 'oldpw', 'oldpw', 'pw', 1757 test_async=True): 1758 pass 1759 1760 @asynctest 1761 async def test_password_change_failure(self): 1762 """Test failure of password change""" 1763 1764 with self.assertRaises(asyncssh.PermissionDenied): 1765 await self._connect_password('pw', 'oldpw', 'badpw', 'pw') 1766 1767 @asynctest 1768 async def test_disabled_trivial_password_auth(self): 1769 """Test disabling trivial auth with password authentication""" 1770 1771 async with self.connect(username='pw', password='pw', 1772 client_keys=None, disable_trivial_auth=True): 1773 pass 1774 1775 @asynctest 1776 async def test_disabled_trivial_password_change(self): 1777 """Test disabling trivial aith with password change""" 1778 1779 async with self._connect_password('pw', 'oldpw', 'oldpw', 'pw', 1780 disable_trivial_auth=True): 1781 pass 1782 1783 1784class _TestPasswordAsyncServerAuth(_TestPasswordAuth): 1785 """Unit tests for password authentication with async server callbacks""" 1786 1787 @classmethod 1788 async def start_server(cls): 1789 """Start an SSH server which supports async password authentication""" 1790 1791 return await cls.create_server(_AsyncPasswordServer) 1792 1793 1794class _TestKbdintAuth(ServerTestCase): 1795 """Unit tests for keyboard-interactive authentication""" 1796 1797 @classmethod 1798 async def start_server(cls): 1799 """Start an SSH server which supports keyboard-interactive auth""" 1800 1801 return await cls.create_server(_KbdintServer) 1802 1803 @async_context_manager 1804 async def _connect_kbdint(self, username, responses, test_async=False): 1805 """Open a connection to test keyboard-interactive auth""" 1806 1807 def client_factory(): 1808 """Return an SSHClient to use to do keyboard-interactive auth""" 1809 1810 cls = _AsyncKbdintClient if test_async else _KbdintClient 1811 return cls(responses) 1812 1813 conn, _ = await self.create_connection(client_factory, 1814 username=username, 1815 client_keys=None) 1816 1817 return conn 1818 1819 @asynctest 1820 async def test_kbdint_auth_no_prompts(self): 1821 """Test keyboard-interactive authentication with no prompts""" 1822 1823 async with self.connect(username='none', password='kbdint', 1824 client_keys=None): 1825 pass 1826 1827 @asynctest 1828 async def test_kbdint_auth_password(self): 1829 """Test keyboard-interactive authentication via password""" 1830 1831 async with self.connect(username='pw', password='kbdint', 1832 client_keys=None): 1833 pass 1834 1835 @asynctest 1836 async def test_kbdint_auth_passcode(self): 1837 """Test keyboard-interactive authentication via passcode""" 1838 1839 async with self.connect(username='pc', password='kbdint', 1840 client_keys=None): 1841 pass 1842 1843 @asynctest 1844 async def test_kbdint_auth_not_password(self): 1845 """Test keyboard-interactive authentication other than password""" 1846 1847 with self.assertRaises(asyncssh.PermissionDenied): 1848 await self.connect(username='kbdint', password='kbdint', 1849 client_keys=None) 1850 1851 @asynctest 1852 async def test_kbdint_auth_multi_not_password(self): 1853 """Test keyboard-interactive authentication with multiple prompts""" 1854 1855 with self.assertRaises(asyncssh.PermissionDenied): 1856 await self.connect(username='multi', password='kbdint', 1857 client_keys=None) 1858 1859 @asynctest 1860 async def test_kbdint_auth_disabled(self): 1861 """Test connecting with keyboard-interactive authentication disabled""" 1862 1863 with self.assertRaises(asyncssh.PermissionDenied): 1864 await self.connect(username='pw', password='kbdint', 1865 kbdint_auth=False) 1866 1867 @asynctest 1868 async def test_kbdint_auth_failure(self): 1869 """Test failure connecting with keyboard-interactive authentication""" 1870 1871 with self.assertRaises(asyncssh.PermissionDenied): 1872 await self.connect(username='kbdint', password='badpw', 1873 client_keys=None) 1874 1875 @asynctest 1876 async def test_kbdint_auth_callback(self): 1877 """Test keyboard-interactive auth callback""" 1878 1879 async with self._connect_kbdint('kbdint', ['kbdint'], test_async=True): 1880 pass 1881 1882 @asynctest 1883 async def test_kbdint_auth_callback_multi(self): 1884 """Test keyboard-interactive auth callback with multiple challenges""" 1885 1886 async with self._connect_kbdint('multi', ['1', '2'], test_async=True): 1887 pass 1888 1889 @asynctest 1890 async def test_kbdint_auth_callback_faliure(self): 1891 """Test failure connecting with keyboard-interactive auth callback""" 1892 1893 with self.assertRaises(asyncssh.PermissionDenied): 1894 await self._connect_kbdint('kbdint', ['badpw']) 1895 1896 @asynctest 1897 async def test_disabled_trivial_kbdint_auth(self): 1898 """Test disabled trivial auth with keyboard-interactive auth""" 1899 1900 async with self.connect(username='pw', password='kbdint', 1901 client_keys=None, disable_trivial_auth=True): 1902 pass 1903 1904 @asynctest 1905 async def test_disabled_trivial_kbdint_no_prompts(self): 1906 """Test disabled trivial with with no keyboard-interactive prompts""" 1907 1908 with self.assertRaises(asyncssh.PermissionDenied): 1909 await self.connect(username='none', password='kbdint', 1910 client_keys=None, disable_trivial_auth=True) 1911 1912 1913class _TestKbdintAsyncServerAuth(_TestKbdintAuth): 1914 """Unit tests for keyboard-interactive auth with async server callbacks""" 1915 1916 @classmethod 1917 async def start_server(cls): 1918 """Start an SSH server which supports async kbd-int auth""" 1919 1920 return await cls.create_server(_AsyncKbdintServer) 1921 1922 1923class _TestKbdintPasswordServerAuth(ServerTestCase): 1924 """Unit tests for keyboard-interactive auth with server password auth""" 1925 1926 @classmethod 1927 async def start_server(cls): 1928 """Start an SSH server which supports server password auth""" 1929 1930 return await cls.create_server(_PasswordServer) 1931 1932 @async_context_manager 1933 async def _connect_kbdint(self, username, responses): 1934 """Open a connection to test keyboard-interactive auth""" 1935 1936 def client_factory(): 1937 """Return an SSHClient to use to do keyboard-interactive auth""" 1938 1939 return _KbdintClient(responses) 1940 1941 conn, _ = await self.create_connection(client_factory, 1942 username=username, 1943 client_keys=None) 1944 1945 return conn 1946 1947 @asynctest 1948 async def test_kbdint_password_auth(self): 1949 """Test keyboard-interactive server password authentication""" 1950 1951 async with self._connect_kbdint('pw', ['pw']): 1952 pass 1953 1954 @asynctest 1955 async def test_kbdint_password_auth_multiple_responses(self): 1956 """Test multiple responses to server password authentication""" 1957 1958 with self.assertRaises(asyncssh.PermissionDenied): 1959 await self._connect_kbdint('pw', ['xxx', 'yyy']) 1960 1961 @asynctest 1962 async def test_kbdint_password_change(self): 1963 """Test keyboard-interactive server password change""" 1964 1965 with self.assertRaises(asyncssh.PermissionDenied): 1966 await self._connect_kbdint('pw', ['oldpw']) 1967 1968 1969class _TestClientLoginTimeout(ServerTestCase): 1970 """Unit test for client login timeout""" 1971 1972 @classmethod 1973 async def start_server(cls): 1974 """Start an SSH server which supports public key authentication""" 1975 1976 def server_factory(): 1977 """Return an SSHServer that delays before starting auth""" 1978 1979 return _PublicKeyServer(delay=2) 1980 1981 return await cls.create_server( 1982 server_factory, authorized_client_keys='authorized_keys') 1983 1984 @asynctest 1985 async def test_client_login_timeout_exceeded(self): 1986 """Test client login timeout exceeded""" 1987 1988 with self.assertRaises(asyncssh.ConnectionLost): 1989 await self.connect(username='ckey', client_keys='ckey', 1990 login_timeout=1) 1991 1992 @asynctest 1993 async def test_client_login_timeout_exceeded_string(self): 1994 """Test client login timeout exceeded with string value""" 1995 1996 with self.assertRaises(asyncssh.ConnectionLost): 1997 await self.connect(username='ckey', client_keys='ckey', 1998 login_timeout='0m1s') 1999 2000 @asynctest 2001 async def test_invalid_client_login_timeout(self): 2002 """Test invalid client login timeout""" 2003 2004 with self.assertRaises(ValueError): 2005 await self.connect(login_timeout=-1) 2006 2007 2008class _TestServerLoginTimeoutExceeded(ServerTestCase): 2009 """Unit test for server login timeout""" 2010 2011 @classmethod 2012 async def start_server(cls): 2013 """Start an SSH server with a 1 second login timeout""" 2014 2015 return await cls.create_server( 2016 _PublicKeyServer, authorized_client_keys='authorized_keys', 2017 login_timeout=1) 2018 2019 @asynctest 2020 async def test_server_login_timeout_exceeded(self): 2021 """Test server_login timeout exceeded""" 2022 2023 def client_factory(): 2024 """Return an SSHClient that delays before providing a key""" 2025 2026 return _PublicKeyClient(['ckey'], 2) 2027 2028 with self.assertRaises(asyncssh.ConnectionLost): 2029 await self.create_connection(client_factory, username='ckey', 2030 client_keys=None) 2031 2032 2033class _TestServerLoginTimeoutDisabled(ServerTestCase): 2034 """Unit test for disabled server login timeout""" 2035 2036 @classmethod 2037 async def start_server(cls): 2038 """Start an SSH server with no login timeout""" 2039 2040 return await cls.create_server( 2041 _PublicKeyServer, authorized_client_keys='authorized_keys', 2042 login_timeout=None) 2043 2044 @asynctest 2045 async def test_server_login_timeout_disabled(self): 2046 """Test with login timeout disabled""" 2047 2048 async with self.connect(username='ckey', client_keys='ckey'): 2049 pass 2050