1# Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com>
2# Copyright (C) 2013-2014 science + computing ag
3# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
4#
5#
6# This file is part of paramiko.
7#
8# Paramiko is free software; you can redistribute it and/or modify it under the
9# terms of the GNU Lesser General Public License as published by the Free
10# Software Foundation; either version 2.1 of the License, or (at your option)
11# any later version.
12#
13# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
14# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Lesser General Public License
19# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
20# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
21
22"""
23Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
24"""
25
26import socket
27import threading
28
29import paramiko
30
31from .util import _support, needs_gssapi, KerberosTestCase, update_env
32from .test_client import FINGERPRINTS
33
34
35class NullServer(paramiko.ServerInterface):
36    def get_allowed_auths(self, username):
37        return "gssapi-with-mic,publickey"
38
39    def check_auth_gssapi_with_mic(
40        self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
41    ):
42        if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
43            return paramiko.AUTH_SUCCESSFUL
44        return paramiko.AUTH_FAILED
45
46    def enable_auth_gssapi(self):
47        return True
48
49    def check_auth_publickey(self, username, key):
50        try:
51            expected = FINGERPRINTS[key.get_name()]
52        except KeyError:
53            return paramiko.AUTH_FAILED
54        else:
55            if key.get_fingerprint() == expected:
56                return paramiko.AUTH_SUCCESSFUL
57        return paramiko.AUTH_FAILED
58
59    def check_channel_request(self, kind, chanid):
60        return paramiko.OPEN_SUCCEEDED
61
62    def check_channel_exec_request(self, channel, command):
63        if command != b"yes":
64            return False
65        return True
66
67
68@needs_gssapi
69class GSSAuthTest(KerberosTestCase):
70    def setUp(self):
71        # TODO: username and targ_name should come from os.environ or whatever
72        # the approved pytest method is for runtime-configuring test data.
73        self.username = self.realm.user_princ
74        self.hostname = socket.getfqdn(self.realm.hostname)
75        self.sockl = socket.socket()
76        self.sockl.bind((self.realm.hostname, 0))
77        self.sockl.listen(1)
78        self.addr, self.port = self.sockl.getsockname()
79        self.event = threading.Event()
80        update_env(self, self.realm.env)
81        thread = threading.Thread(target=self._run)
82        thread.start()
83
84    def tearDown(self):
85        for attr in "tc ts socks sockl".split():
86            if hasattr(self, attr):
87                getattr(self, attr).close()
88
89    def _run(self):
90        self.socks, addr = self.sockl.accept()
91        self.ts = paramiko.Transport(self.socks)
92        host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
93        self.ts.add_server_key(host_key)
94        server = NullServer()
95        self.ts.start_server(self.event, server)
96
97    def _test_connection(self, **kwargs):
98        """
99        (Most) kwargs get passed directly into SSHClient.connect().
100
101        The exception is ... no exception yet
102        """
103        host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
104        public_host_key = paramiko.RSAKey(data=host_key.asbytes())
105
106        self.tc = paramiko.SSHClient()
107        self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
108        self.tc.get_host_keys().add(
109            "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
110        )
111        self.tc.connect(
112            hostname=self.addr,
113            port=self.port,
114            username=self.username,
115            gss_host=self.hostname,
116            gss_auth=True,
117            **kwargs
118        )
119
120        self.event.wait(1.0)
121        self.assert_(self.event.is_set())
122        self.assert_(self.ts.is_active())
123        self.assertEquals(self.username, self.ts.get_username())
124        self.assertEquals(True, self.ts.is_authenticated())
125
126        stdin, stdout, stderr = self.tc.exec_command("yes")
127        schan = self.ts.accept(1.0)
128
129        schan.send("Hello there.\n")
130        schan.send_stderr("This is on stderr.\n")
131        schan.close()
132
133        self.assertEquals("Hello there.\n", stdout.readline())
134        self.assertEquals("", stdout.readline())
135        self.assertEquals("This is on stderr.\n", stderr.readline())
136        self.assertEquals("", stderr.readline())
137
138        stdin.close()
139        stdout.close()
140        stderr.close()
141
142    def test_gss_auth(self):
143        """
144        Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
145        (gssapi-with-mic) in client and server mode.
146        """
147        self._test_connection(allow_agent=False, look_for_keys=False)
148
149    def test_auth_trickledown(self):
150        """
151        Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
152        """
153        self.hostname = (
154            "this_host_does_not_exists_and_causes_a_GSSAPI-exception"
155        )
156        self._test_connection(
157            key_filename=[_support("test_rsa.key")],
158            allow_agent=False,
159            look_for_keys=False,
160        )
161