1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 2# Copyright (C) 2013-2014 science + computing ag 3# Author: Sebastian Deiss <sebastian.deiss@t-online.de> 4# 5# 6# This file is part of paramiko. 7# 8# Paramiko is free software; you can redistribute it and/or modify it under the 9# terms of the GNU Lesser General Public License as published by the Free 10# Software Foundation; either version 2.1 of the License, or (at your option) 11# any later version. 12# 13# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 14# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 15# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 16# details. 17# 18# You should have received a copy of the GNU Lesser General Public License 19# along with Paramiko; if not, write to the Free Software Foundation, Inc., 20# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. 21 22""" 23Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic) 24""" 25 26import socket 27import threading 28 29import paramiko 30 31from .util import _support, needs_gssapi, KerberosTestCase, update_env 32from .test_client import FINGERPRINTS 33 34 35class NullServer(paramiko.ServerInterface): 36 def get_allowed_auths(self, username): 37 return "gssapi-with-mic,publickey" 38 39 def check_auth_gssapi_with_mic( 40 self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None 41 ): 42 if gss_authenticated == paramiko.AUTH_SUCCESSFUL: 43 return paramiko.AUTH_SUCCESSFUL 44 return paramiko.AUTH_FAILED 45 46 def enable_auth_gssapi(self): 47 return True 48 49 def check_auth_publickey(self, username, key): 50 try: 51 expected = FINGERPRINTS[key.get_name()] 52 except KeyError: 53 return paramiko.AUTH_FAILED 54 else: 55 if key.get_fingerprint() == expected: 56 return paramiko.AUTH_SUCCESSFUL 57 return paramiko.AUTH_FAILED 58 59 def check_channel_request(self, kind, chanid): 60 return paramiko.OPEN_SUCCEEDED 61 62 def check_channel_exec_request(self, channel, command): 63 if command != b"yes": 64 return False 65 return True 66 67 68@needs_gssapi 69class GSSAuthTest(KerberosTestCase): 70 def setUp(self): 71 # TODO: username and targ_name should come from os.environ or whatever 72 # the approved pytest method is for runtime-configuring test data. 73 self.username = self.realm.user_princ 74 self.hostname = socket.getfqdn(self.realm.hostname) 75 self.sockl = socket.socket() 76 self.sockl.bind((self.realm.hostname, 0)) 77 self.sockl.listen(1) 78 self.addr, self.port = self.sockl.getsockname() 79 self.event = threading.Event() 80 update_env(self, self.realm.env) 81 thread = threading.Thread(target=self._run) 82 thread.start() 83 84 def tearDown(self): 85 for attr in "tc ts socks sockl".split(): 86 if hasattr(self, attr): 87 getattr(self, attr).close() 88 89 def _run(self): 90 self.socks, addr = self.sockl.accept() 91 self.ts = paramiko.Transport(self.socks) 92 host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") 93 self.ts.add_server_key(host_key) 94 server = NullServer() 95 self.ts.start_server(self.event, server) 96 97 def _test_connection(self, **kwargs): 98 """ 99 (Most) kwargs get passed directly into SSHClient.connect(). 100 101 The exception is ... no exception yet 102 """ 103 host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") 104 public_host_key = paramiko.RSAKey(data=host_key.asbytes()) 105 106 self.tc = paramiko.SSHClient() 107 self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) 108 self.tc.get_host_keys().add( 109 "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key 110 ) 111 self.tc.connect( 112 hostname=self.addr, 113 port=self.port, 114 username=self.username, 115 gss_host=self.hostname, 116 gss_auth=True, 117 **kwargs 118 ) 119 120 self.event.wait(1.0) 121 self.assert_(self.event.is_set()) 122 self.assert_(self.ts.is_active()) 123 self.assertEquals(self.username, self.ts.get_username()) 124 self.assertEquals(True, self.ts.is_authenticated()) 125 126 stdin, stdout, stderr = self.tc.exec_command("yes") 127 schan = self.ts.accept(1.0) 128 129 schan.send("Hello there.\n") 130 schan.send_stderr("This is on stderr.\n") 131 schan.close() 132 133 self.assertEquals("Hello there.\n", stdout.readline()) 134 self.assertEquals("", stdout.readline()) 135 self.assertEquals("This is on stderr.\n", stderr.readline()) 136 self.assertEquals("", stderr.readline()) 137 138 stdin.close() 139 stdout.close() 140 stderr.close() 141 142 def test_gss_auth(self): 143 """ 144 Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication 145 (gssapi-with-mic) in client and server mode. 146 """ 147 self._test_connection(allow_agent=False, look_for_keys=False) 148 149 def test_auth_trickledown(self): 150 """ 151 Failed gssapi-with-mic doesn't prevent subsequent key from succeeding 152 """ 153 self.hostname = ( 154 "this_host_does_not_exists_and_causes_a_GSSAPI-exception" 155 ) 156 self._test_connection( 157 key_filename=[_support("test_rsa.key")], 158 allow_agent=False, 159 look_for_keys=False, 160 ) 161