1# Copyright (c) 2019-2020 by Ron Frederick <ronf@timeheart.net> and others.
2#
3# This program and the accompanying materials are made available under
4# the terms of the Eclipse Public License v2.0 which accompanies this
5# distribution and is available at:
6#
7#     http://www.eclipse.org/legal/epl-2.0/
8#
9# This program may also be made available under the following secondary
10# licenses when the conditions for such availability set forth in the
11# Eclipse Public License v2.0 are satisfied:
12#
13#    GNU General Public License, Version 2.0, or any later versions of
14#    that license
15#
16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
17#
18# Contributors:
19#     Ron Frederick - initial implementation, API, and documentation
20
21"""Unit tests for AsyncSSH security key support"""
22
23import unittest
24
25import asyncssh
26
27from .server import ServerTestCase
28from .sk_stub import sk_available, stub_sk, unstub_sk, patch_sk, sk_error
29from .util import asynctest
30
31
32class _CheckSKAuth(ServerTestCase):
33    """Common code for testing security key authentication"""
34
35    _sk_devs = [2]
36    _sk_alg = 'sk-ssh-ed25519@openssh.com'
37    _sk_resident = False
38    _sk_touch_required = True
39    _sk_auth_touch_required = True
40    _sk_cert = False
41    _sk_host = False
42
43    @classmethod
44    async def start_server(cls):
45        """Start an SSH server which supports security key authentication"""
46
47        cls.addClassCleanup(unstub_sk, *stub_sk(cls._sk_devs))
48
49        cls._privkey = asyncssh.generate_private_key(
50            cls._sk_alg, resident=cls._sk_resident,
51            touch_required=cls._sk_touch_required)
52
53        if cls._sk_host:
54            if cls._sk_cert:
55                cert = cls._privkey.generate_host_certificate(
56                    cls._privkey, 'localhost', principals=['127.0.0.1'])
57
58                key = (cls._privkey, cert)
59            else:
60                key = cls._privkey
61
62            return await cls.create_server(server_host_keys=[key])
63        else:
64            options = []
65
66            if cls._sk_cert:
67                options.append('cert-authority')
68
69            if not cls._sk_auth_touch_required:
70                options.append('no-touch-required')
71
72            auth_keys = asyncssh.import_authorized_keys(
73                ','.join(options) + (' ' if options else '') +
74                cls._privkey.export_public_key().decode('utf-8'))
75
76            return await cls.create_server(authorized_client_keys=auth_keys)
77
78
79@unittest.skipUnless(sk_available, 'security key support not available')
80class _TestSKAuthKeyNotFound(ServerTestCase):
81    """Unit tests for security key authentication with no key found"""
82
83    @patch_sk([])
84    @asynctest
85    async def test_enroll_key_not_found(self):
86        """Test generating key with no security key found"""
87
88        with self.assertRaises(ValueError):
89            asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com')
90
91
92@unittest.skipUnless(sk_available, 'security key support not available')
93class _TestSKAuthCTAP1(_CheckSKAuth):
94    """Unit tests for security key authentication with CTAP version 1"""
95
96    _sk_devs = [1]
97    _sk_alg = 'sk-ecdsa-sha2-nistp256@openssh.com'
98
99    @asynctest
100    async def test_auth(self):
101        """Test authenticating with a CTAP 1 security key"""
102
103        async with self.connect(username='ckey', client_keys=[self._privkey]):
104            pass
105
106    @asynctest
107    async def test_sk_unsupported_alg(self):
108        """Test unsupported security key algorithm"""
109
110        with self.assertRaises(ValueError):
111            asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com')
112
113    @asynctest
114    async def test_enroll_ctap1_error(self):
115        """Test generating key returning a CTAP 1 error"""
116
117        with sk_error('err'):
118            with self.assertRaises(ValueError):
119                asyncssh.generate_private_key(self._sk_alg)
120
121    @asynctest
122    async def test_auth_ctap1_error(self):
123        """Test security key returning a CTAP 1 error"""
124
125        with sk_error('err'):
126            with self.assertRaises(ValueError):
127                await self.connect(username='ckey', client_keys=[self._privkey])
128
129
130@unittest.skipUnless(sk_available, 'security key support not available')
131class _TestSKAuthCTAP2(_CheckSKAuth):
132    """Unit tests for security key authentication with CTAP version 2"""
133
134    _sk_devs = [2]
135
136    @asynctest
137    async def test_auth(self):
138        """Test authenticating with a CTAP 2 security key"""
139
140        async with self.connect(username='ckey', client_keys=[self._privkey]):
141            pass
142
143    @asynctest
144    async def test_enroll_without_pin(self):
145        """Test generating key without a PIN"""
146
147        key = asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com')
148
149        self.assertIsNotNone(key)
150
151    @asynctest
152    async def test_enroll_with_pin(self):
153        """Test generating key with a PIN"""
154
155        key = asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com',
156                                            pin=b'123456')
157
158        self.assertIsNotNone(key)
159
160    @asynctest
161    async def test_enroll_ctap2_error(self):
162        """Test generating key returning a CTAP 2 error"""
163
164        with sk_error('err'):
165            with self.assertRaises(ValueError):
166                asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com')
167
168    @asynctest
169    async def test_auth_ctap2_error(self):
170        """Test security key returning a CTAP 2 error"""
171
172        with sk_error('err'):
173            with self.assertRaises(ValueError):
174                await self.connect(username='ckey', client_keys=[self._privkey])
175
176    @asynctest
177    async def test_enroll_pin_invalid(self):
178        """Test generating key while providing invalid PIN"""
179
180        with sk_error('badpin'):
181            with self.assertRaises(ValueError):
182                asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com',
183                                              pin=b'123456')
184
185    @asynctest
186    async def test_enroll_pin_required(self):
187        """Test generating key without providing a required PIN"""
188
189        with sk_error('pinreq'):
190            with self.assertRaises(ValueError):
191                asyncssh.generate_private_key('sk-ssh-ed25519@openssh.com')
192
193
194@unittest.skipUnless(sk_available, 'security key support not available')
195class _TestSKAuthMultipleKeys(_CheckSKAuth):
196    """Unit tests for security key authentication with multiple keys"""
197
198    _sk_devs = [2, 1]
199
200    @asynctest
201    async def test_auth_cred_not_found(self):
202        """Test authenticating with security credential not found"""
203
204        with sk_error('nocred'):
205            with self.assertRaises(ValueError):
206                await self.connect(username='ckey', client_keys=[self._privkey])
207
208
209@unittest.skipUnless(sk_available, 'security key support not available')
210class _TestSKAuthResidentKeys(_CheckSKAuth):
211    """Unit tests for loading resident keys"""
212
213    _sk_resident = True
214
215    @asynctest
216    async def test_load_resident(self):
217        """Test loading resident keys"""
218
219        keys = asyncssh.load_resident_keys(b'123456')
220
221        async with self.connect(username='ckey', client_keys=[keys[0]]):
222            pass
223
224    @asynctest
225    async def test_load_resident_user_match(self):
226        """Test loading resident keys matching a specific user"""
227
228        keys = asyncssh.load_resident_keys(b'123456', user='AsyncSSH')
229
230        async with self.connect(username='ckey', client_keys=[keys[0]]):
231            pass
232
233    @asynctest
234    async def test_koad_resident_user_match(self):
235        """Test loading resident keys matching a specific user"""
236
237        self.assertIsNotNone(asyncssh.load_resident_keys(b'123456',
238                                                         user='AsyncSSH'))
239
240    @asynctest
241    async def test_load_resident_no_match(self):
242        """Test loading resident keys with no user match"""
243
244        self.assertEqual(asyncssh.load_resident_keys(b'123456',
245                                                     user='xxx'), [])
246
247    @asynctest
248    async def test_no_resident_keys(self):
249        """Test retrieving empty list of resident keys"""
250
251        with sk_error('nocred'):
252            self.assertEqual(asyncssh.load_resident_keys(b'123456'), [])
253
254    @asynctest
255    async def test_load_resident_ctap2_error(self):
256        """Test getting resident keys returning a CTAP 2 error"""
257
258        with sk_error('err'):
259            with self.assertRaises(ValueError):
260                asyncssh.load_resident_keys(b'123456')
261
262    @asynctest
263    async def test_load_resident_pin_invalid(self):
264        """Test getting resident keys while providing invalid PIN"""
265
266        with sk_error('badpin'):
267            with self.assertRaises(ValueError):
268                asyncssh.load_resident_keys(b'123456')
269
270    @asynctest
271    async def test_pin_not_set(self):
272        """Test getting resident keys from a key with no configured PIN"""
273
274        with sk_error('nopin'):
275            with self.assertRaises(ValueError):
276                asyncssh.load_resident_keys(b'123456')
277
278
279@unittest.skipUnless(sk_available, 'security key support not available')
280class _TestSKAuthTouchNotRequired(_CheckSKAuth):
281    """Unit tests for security key authentication without touch"""
282
283    _sk_touch_required = False
284    _sk_auth_touch_required = False
285
286    @asynctest
287    async def test_auth_without_touch(self):
288        """Test authenticating with a security key without touch"""
289
290        async with self.connect(username='ckey', client_keys=[self._privkey]):
291            pass
292
293
294@unittest.skipUnless(sk_available, 'security key support not available')
295class _TestSKAuthTouchRequiredECDSA(_CheckSKAuth):
296    """Unit tests for security key authentication failing without touch"""
297
298    _sk_alg = 'sk-ecdsa-sha2-nistp256@openssh.com'
299    _sk_touch_required = False
300    _sk_auth_touch_required = True
301
302    @asynctest
303    async def test_auth_touch_required(self):
304        """Test auth failing with a security key not providing touch"""
305
306        with self.assertRaises(asyncssh.PermissionDenied):
307            await self.connect(username='ckey', client_keys=[self._privkey])
308
309
310@unittest.skipUnless(sk_available, 'security key support not available')
311class _TestSKCertAuthTouchNotRequired(_CheckSKAuth):
312    """Unit tests for security key cert authentication without touch"""
313
314    _sk_touch_required = False
315    _sk_auth_touch_required = False
316    _sk_cert = True
317
318    @asynctest
319    async def test_cert_auth_cert_touch_not_required(self):
320        """Test authenticating with a security key cert not requiring touch"""
321
322        cert = self._privkey.generate_user_certificate(self._privkey, 'name',
323                                                       touch_required=False)
324
325        async with self.connect(username='ckey',
326                                client_keys=[(self._privkey, cert)]):
327            pass
328
329    @asynctest
330    async def test_cert_auth_cert_touch_required(self):
331        """Test cert auth failing with a security key cert requiring touch"""
332
333        cert = self._privkey.generate_user_certificate(self._privkey, 'name',
334                                                       touch_required=True)
335
336        with self.assertRaises(asyncssh.PermissionDenied):
337            await self.connect(username='ckey',
338                               client_keys=[(self._privkey, cert)])
339
340
341@unittest.skipUnless(sk_available, 'security key support not available')
342class _TestSKCertAuthTouchRequired(_CheckSKAuth):
343    """Unit tests for security key cert authentication failing without touch"""
344
345    _sk_touch_required = False
346    _sk_auth_touch_required = True
347    _sk_cert = True
348
349    @asynctest
350    async def test_cert_auth_touch_required(self):
351        """Test cert auth failing with a security key requiring touch"""
352
353        cert = self._privkey.generate_user_certificate(self._privkey, 'name',
354                                                       touch_required=False)
355
356        with self.assertRaises(asyncssh.PermissionDenied):
357            await self.connect(username='ckey',
358                               client_keys=[(self._privkey, cert)])
359
360    @asynctest
361    async def test_cert_auth_cert_touch_required(self):
362        """Test cert auth failing with a security key cert requiring touch"""
363
364        cert = self._privkey.generate_user_certificate(self._privkey, 'name',
365                                                       touch_required=True)
366
367        with self.assertRaises(asyncssh.PermissionDenied):
368            await self.connect(username='ckey',
369                               client_keys=[(self._privkey, cert)])
370
371
372@unittest.skipUnless(sk_available, 'security key support not available')
373class _TestSKHostAuth(_CheckSKAuth):
374    """Unit tests for security key host authentication"""
375
376    _sk_host = True
377
378    @asynctest
379    async def test_sk_host_auth(self):
380        """Test a server using a security key as a host key"""
381
382        pubkey = self._privkey.convert_to_public()
383
384        async with self.connect(known_hosts=([pubkey], [], [])):
385            pass
386
387
388@unittest.skipUnless(sk_available, 'security key support not available')
389class _TestSKHostCertAuth(_CheckSKAuth):
390    """Unit tests for security key host cert authentication"""
391
392    _sk_cert = True
393    _sk_host = True
394
395    @asynctest
396    async def test_sk_host_auth(self):
397        """Test a server host using a security key host certificate"""
398
399        pubkey = self._privkey.convert_to_public()
400
401        async with self.connect(known_hosts=([pubkey], [pubkey], [])):
402            pass
403