1# Copyright (c) 2019-2020 by Ron Frederick <ronf@timeheart.net> and others. 2# 3# This program and the accompanying materials are made available under 4# the terms of the Eclipse Public License v2.0 which accompanies this 5# distribution and is available at: 6# 7# http://www.eclipse.org/legal/epl-2.0/ 8# 9# This program may also be made available under the following secondary 10# licenses when the conditions for such availability set forth in the 11# Eclipse Public License v2.0 are satisfied: 12# 13# GNU General Public License, Version 2.0, or any later versions of 14# that license 15# 16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later 17# 18# Contributors: 19# Ron Frederick - initial implementation, API, and documentation 20 21"""Unit tests for AsyncSSH security key support""" 22 23import unittest 24 25import asyncssh 26 27from .server import ServerTestCase 28from .sk_stub import sk_available, stub_sk, unstub_sk, patch_sk, sk_error 29from .util import asynctest 30 31 32class _CheckSKAuth(ServerTestCase): 33 """Common code for testing security key authentication""" 34 35 _sk_devs = [2] 36 _sk_alg = 'sk-ssh-ed25519@openssh.com' 37 _sk_resident = False 38 _sk_touch_required = True 39 _sk_auth_touch_required = True 40 _sk_cert = False 41 _sk_host = False 42 43 @classmethod 44 async def start_server(cls): 45 """Start an SSH server which supports security key authentication""" 46 47 cls.addClassCleanup(unstub_sk, *stub_sk(cls._sk_devs)) 48 49 cls._privkey = asyncssh.generate_private_key( 50 cls._sk_alg, resident=cls._sk_resident, 51 touch_required=cls._sk_touch_required) 52 53 if cls._sk_host: 54 if cls._sk_cert: 55 cert = cls._privkey.generate_host_certificate( 56 cls._privkey, 'localhost', principals=['127.0.0.1']) 57 58 key = (cls._privkey, cert) 59 else: 60 key = cls._privkey 61 62 return await cls.create_server(server_host_keys=[key]) 63 else: 64 options = [] 65 66 if cls._sk_cert: 67 options.append('cert-authority') 68 69 if not cls._sk_auth_touch_required: 70 options.append('no-touch-required') 71 72 auth_keys = asyncssh.import_authorized_keys( 73 ','.join(options) + (' ' if options else '') + 74 cls._privkey.export_public_key().decode('utf-8')) 75 76 return await cls.create_server(authorized_client_keys=auth_keys) 77 78 79@unittest.skipUnless(sk_available, 'security key support not available') 80class _TestSKAuthKeyNotFound(ServerTestCase): 81 """Unit tests for security key authentication with no key found""" 82 83 @patch_sk([]) 84 @asynctest 85 async def test_enroll_key_not_found(self): 86 """Test generating key with no security key found""" 87 88 with self.assertRaises(ValueError): 89 asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com') 90 91 92@unittest.skipUnless(sk_available, 'security key support not available') 93class _TestSKAuthCTAP1(_CheckSKAuth): 94 """Unit tests for security key authentication with CTAP version 1""" 95 96 _sk_devs = [1] 97 _sk_alg = 'sk-ecdsa-sha2-nistp256@openssh.com' 98 99 @asynctest 100 async def test_auth(self): 101 """Test authenticating with a CTAP 1 security key""" 102 103 async with self.connect(username='ckey', client_keys=[self._privkey]): 104 pass 105 106 @asynctest 107 async def test_sk_unsupported_alg(self): 108 """Test unsupported security key algorithm""" 109 110 with self.assertRaises(ValueError): 111 asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com') 112 113 @asynctest 114 async def test_enroll_ctap1_error(self): 115 """Test generating key returning a CTAP 1 error""" 116 117 with sk_error('err'): 118 with self.assertRaises(ValueError): 119 asyncssh.generate_private_key(self._sk_alg) 120 121 @asynctest 122 async def test_auth_ctap1_error(self): 123 """Test security key returning a CTAP 1 error""" 124 125 with sk_error('err'): 126 with self.assertRaises(ValueError): 127 await self.connect(username='ckey', client_keys=[self._privkey]) 128 129 130@unittest.skipUnless(sk_available, 'security key support not available') 131class _TestSKAuthCTAP2(_CheckSKAuth): 132 """Unit tests for security key authentication with CTAP version 2""" 133 134 _sk_devs = [2] 135 136 @asynctest 137 async def test_auth(self): 138 """Test authenticating with a CTAP 2 security key""" 139 140 async with self.connect(username='ckey', client_keys=[self._privkey]): 141 pass 142 143 @asynctest 144 async def test_enroll_without_pin(self): 145 """Test generating key without a PIN""" 146 147 key = asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com') 148 149 self.assertIsNotNone(key) 150 151 @asynctest 152 async def test_enroll_with_pin(self): 153 """Test generating key with a PIN""" 154 155 key = asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com', 156 pin=b'123456') 157 158 self.assertIsNotNone(key) 159 160 @asynctest 161 async def test_enroll_ctap2_error(self): 162 """Test generating key returning a CTAP 2 error""" 163 164 with sk_error('err'): 165 with self.assertRaises(ValueError): 166 asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com') 167 168 @asynctest 169 async def test_auth_ctap2_error(self): 170 """Test security key returning a CTAP 2 error""" 171 172 with sk_error('err'): 173 with self.assertRaises(ValueError): 174 await self.connect(username='ckey', client_keys=[self._privkey]) 175 176 @asynctest 177 async def test_enroll_pin_invalid(self): 178 """Test generating key while providing invalid PIN""" 179 180 with sk_error('badpin'): 181 with self.assertRaises(ValueError): 182 asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com', 183 pin=b'123456') 184 185 @asynctest 186 async def test_enroll_pin_required(self): 187 """Test generating key without providing a required PIN""" 188 189 with sk_error('pinreq'): 190 with self.assertRaises(ValueError): 191 asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com') 192 193 194@unittest.skipUnless(sk_available, 'security key support not available') 195class _TestSKAuthMultipleKeys(_CheckSKAuth): 196 """Unit tests for security key authentication with multiple keys""" 197 198 _sk_devs = [2, 1] 199 200 @asynctest 201 async def test_auth_cred_not_found(self): 202 """Test authenticating with security credential not found""" 203 204 with sk_error('nocred'): 205 with self.assertRaises(ValueError): 206 await self.connect(username='ckey', client_keys=[self._privkey]) 207 208 209@unittest.skipUnless(sk_available, 'security key support not available') 210class _TestSKAuthResidentKeys(_CheckSKAuth): 211 """Unit tests for loading resident keys""" 212 213 _sk_resident = True 214 215 @asynctest 216 async def test_load_resident(self): 217 """Test loading resident keys""" 218 219 keys = asyncssh.load_resident_keys(b'123456') 220 221 async with self.connect(username='ckey', client_keys=[keys[0]]): 222 pass 223 224 @asynctest 225 async def test_load_resident_user_match(self): 226 """Test loading resident keys matching a specific user""" 227 228 keys = asyncssh.load_resident_keys(b'123456', user='AsyncSSH') 229 230 async with self.connect(username='ckey', client_keys=[keys[0]]): 231 pass 232 233 @asynctest 234 async def test_koad_resident_user_match(self): 235 """Test loading resident keys matching a specific user""" 236 237 self.assertIsNotNone(asyncssh.load_resident_keys(b'123456', 238 user='AsyncSSH')) 239 240 @asynctest 241 async def test_load_resident_no_match(self): 242 """Test loading resident keys with no user match""" 243 244 self.assertEqual(asyncssh.load_resident_keys(b'123456', 245 user='xxx'), []) 246 247 @asynctest 248 async def test_no_resident_keys(self): 249 """Test retrieving empty list of resident keys""" 250 251 with sk_error('nocred'): 252 self.assertEqual(asyncssh.load_resident_keys(b'123456'), []) 253 254 @asynctest 255 async def test_load_resident_ctap2_error(self): 256 """Test getting resident keys returning a CTAP 2 error""" 257 258 with sk_error('err'): 259 with self.assertRaises(ValueError): 260 asyncssh.load_resident_keys(b'123456') 261 262 @asynctest 263 async def test_load_resident_pin_invalid(self): 264 """Test getting resident keys while providing invalid PIN""" 265 266 with sk_error('badpin'): 267 with self.assertRaises(ValueError): 268 asyncssh.load_resident_keys(b'123456') 269 270 @asynctest 271 async def test_pin_not_set(self): 272 """Test getting resident keys from a key with no configured PIN""" 273 274 with sk_error('nopin'): 275 with self.assertRaises(ValueError): 276 asyncssh.load_resident_keys(b'123456') 277 278 279@unittest.skipUnless(sk_available, 'security key support not available') 280class _TestSKAuthTouchNotRequired(_CheckSKAuth): 281 """Unit tests for security key authentication without touch""" 282 283 _sk_touch_required = False 284 _sk_auth_touch_required = False 285 286 @asynctest 287 async def test_auth_without_touch(self): 288 """Test authenticating with a security key without touch""" 289 290 async with self.connect(username='ckey', client_keys=[self._privkey]): 291 pass 292 293 294@unittest.skipUnless(sk_available, 'security key support not available') 295class _TestSKAuthTouchRequiredECDSA(_CheckSKAuth): 296 """Unit tests for security key authentication failing without touch""" 297 298 _sk_alg = 'sk-ecdsa-sha2-nistp256@openssh.com' 299 _sk_touch_required = False 300 _sk_auth_touch_required = True 301 302 @asynctest 303 async def test_auth_touch_required(self): 304 """Test auth failing with a security key not providing touch""" 305 306 with self.assertRaises(asyncssh.PermissionDenied): 307 await self.connect(username='ckey', client_keys=[self._privkey]) 308 309 310@unittest.skipUnless(sk_available, 'security key support not available') 311class _TestSKCertAuthTouchNotRequired(_CheckSKAuth): 312 """Unit tests for security key cert authentication without touch""" 313 314 _sk_touch_required = False 315 _sk_auth_touch_required = False 316 _sk_cert = True 317 318 @asynctest 319 async def test_cert_auth_cert_touch_not_required(self): 320 """Test authenticating with a security key cert not requiring touch""" 321 322 cert = self._privkey.generate_user_certificate(self._privkey, 'name', 323 touch_required=False) 324 325 async with self.connect(username='ckey', 326 client_keys=[(self._privkey, cert)]): 327 pass 328 329 @asynctest 330 async def test_cert_auth_cert_touch_required(self): 331 """Test cert auth failing with a security key cert requiring touch""" 332 333 cert = self._privkey.generate_user_certificate(self._privkey, 'name', 334 touch_required=True) 335 336 with self.assertRaises(asyncssh.PermissionDenied): 337 await self.connect(username='ckey', 338 client_keys=[(self._privkey, cert)]) 339 340 341@unittest.skipUnless(sk_available, 'security key support not available') 342class _TestSKCertAuthTouchRequired(_CheckSKAuth): 343 """Unit tests for security key cert authentication failing without touch""" 344 345 _sk_touch_required = False 346 _sk_auth_touch_required = True 347 _sk_cert = True 348 349 @asynctest 350 async def test_cert_auth_touch_required(self): 351 """Test cert auth failing with a security key requiring touch""" 352 353 cert = self._privkey.generate_user_certificate(self._privkey, 'name', 354 touch_required=False) 355 356 with self.assertRaises(asyncssh.PermissionDenied): 357 await self.connect(username='ckey', 358 client_keys=[(self._privkey, cert)]) 359 360 @asynctest 361 async def test_cert_auth_cert_touch_required(self): 362 """Test cert auth failing with a security key cert requiring touch""" 363 364 cert = self._privkey.generate_user_certificate(self._privkey, 'name', 365 touch_required=True) 366 367 with self.assertRaises(asyncssh.PermissionDenied): 368 await self.connect(username='ckey', 369 client_keys=[(self._privkey, cert)]) 370 371 372@unittest.skipUnless(sk_available, 'security key support not available') 373class _TestSKHostAuth(_CheckSKAuth): 374 """Unit tests for security key host authentication""" 375 376 _sk_host = True 377 378 @asynctest 379 async def test_sk_host_auth(self): 380 """Test a server using a security key as a host key""" 381 382 pubkey = self._privkey.convert_to_public() 383 384 async with self.connect(known_hosts=([pubkey], [], [])): 385 pass 386 387 388@unittest.skipUnless(sk_available, 'security key support not available') 389class _TestSKHostCertAuth(_CheckSKAuth): 390 """Unit tests for security key host cert authentication""" 391 392 _sk_cert = True 393 _sk_host = True 394 395 @asynctest 396 async def test_sk_host_auth(self): 397 """Test a server host using a security key host certificate""" 398 399 pubkey = self._privkey.convert_to_public() 400 401 async with self.connect(known_hosts=([pubkey], [pubkey], [])): 402 pass 403