1# Copyright (C) 2011  Jeff Forcier <jeff@bitprophet.org>
2#
3# This file is part of ssh.
4#
5# 'ssh' is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# 'ssh' is distrubuted in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with 'ssh'; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Suite 500, Boston, MA  02110-1335  USA.
18
19"""
20L{Transport} handles the core SSH2 protocol.
21"""
22
23import os
24import socket
25import string
26import struct
27import sys
28import threading
29import time
30import weakref
31
32import ssh
33from ssh import util
34from ssh.auth_handler import AuthHandler
35from ssh.channel import Channel
36from ssh.common import *
37from ssh.compress import ZlibCompressor, ZlibDecompressor
38from ssh.dsskey import DSSKey
39from ssh.kex_gex import KexGex
40from ssh.kex_group1 import KexGroup1
41from ssh.message import Message
42from ssh.packet import Packetizer, NeedRekeyException
43from ssh.primes import ModulusPack
44from ssh.rsakey import RSAKey
45from ssh.server import ServerInterface
46from ssh.sftp_client import SFTPClient
47from ssh.ssh_exception import SSHException, BadAuthenticationType, ChannelException
48from ssh.util import retry_on_signal
49
50from Crypto import Random
51from Crypto.Cipher import Blowfish, AES, DES3, ARC4
52from Crypto.Hash import SHA, MD5
53try:
54    from Crypto.Util import Counter
55except ImportError:
56    from ssh.util import Counter
57
58
59# for thread cleanup
60_active_threads = []
61def _join_lingering_threads():
62    for thr in _active_threads:
63        thr.stop_thread()
64import atexit
65atexit.register(_join_lingering_threads)
66
67
68class SecurityOptions (object):
69    """
70    Simple object containing the security preferences of an ssh transport.
71    These are tuples of acceptable ciphers, digests, key types, and key
72    exchange algorithms, listed in order of preference.
73
74    Changing the contents and/or order of these fields affects the underlying
75    L{Transport} (but only if you change them before starting the session).
76    If you try to add an algorithm that ssh doesn't recognize,
77    C{ValueError} will be raised.  If you try to assign something besides a
78    tuple to one of the fields, C{TypeError} will be raised.
79    """
80    __slots__ = [ 'ciphers', 'digests', 'key_types', 'kex', 'compression', '_transport' ]
81
82    def __init__(self, transport):
83        self._transport = transport
84
85    def __repr__(self):
86        """
87        Returns a string representation of this object, for debugging.
88
89        @rtype: str
90        """
91        return '<ssh.SecurityOptions for %s>' % repr(self._transport)
92
93    def _get_ciphers(self):
94        return self._transport._preferred_ciphers
95
96    def _get_digests(self):
97        return self._transport._preferred_macs
98
99    def _get_key_types(self):
100        return self._transport._preferred_keys
101
102    def _get_kex(self):
103        return self._transport._preferred_kex
104
105    def _get_compression(self):
106        return self._transport._preferred_compression
107
108    def _set(self, name, orig, x):
109        if type(x) is list:
110            x = tuple(x)
111        if type(x) is not tuple:
112            raise TypeError('expected tuple or list')
113        possible = getattr(self._transport, orig).keys()
114        forbidden = filter(lambda n: n not in possible, x)
115        if len(forbidden) > 0:
116            raise ValueError('unknown cipher')
117        setattr(self._transport, name, x)
118
119    def _set_ciphers(self, x):
120        self._set('_preferred_ciphers', '_cipher_info', x)
121
122    def _set_digests(self, x):
123        self._set('_preferred_macs', '_mac_info', x)
124
125    def _set_key_types(self, x):
126        self._set('_preferred_keys', '_key_info', x)
127
128    def _set_kex(self, x):
129        self._set('_preferred_kex', '_kex_info', x)
130
131    def _set_compression(self, x):
132        self._set('_preferred_compression', '_compression_info', x)
133
134    ciphers = property(_get_ciphers, _set_ciphers, None,
135                       "Symmetric encryption ciphers")
136    digests = property(_get_digests, _set_digests, None,
137                       "Digest (one-way hash) algorithms")
138    key_types = property(_get_key_types, _set_key_types, None,
139                         "Public-key algorithms")
140    kex = property(_get_kex, _set_kex, None, "Key exchange algorithms")
141    compression = property(_get_compression, _set_compression, None,
142                           "Compression algorithms")
143
144
145class ChannelMap (object):
146    def __init__(self):
147        # (id -> Channel)
148        self._map = weakref.WeakValueDictionary()
149        self._lock = threading.Lock()
150
151    def put(self, chanid, chan):
152        self._lock.acquire()
153        try:
154            self._map[chanid] = chan
155        finally:
156            self._lock.release()
157
158    def get(self, chanid):
159        self._lock.acquire()
160        try:
161            return self._map.get(chanid, None)
162        finally:
163            self._lock.release()
164
165    def delete(self, chanid):
166        self._lock.acquire()
167        try:
168            try:
169                del self._map[chanid]
170            except KeyError:
171                pass
172        finally:
173            self._lock.release()
174
175    def values(self):
176        self._lock.acquire()
177        try:
178            return self._map.values()
179        finally:
180            self._lock.release()
181
182    def __len__(self):
183        self._lock.acquire()
184        try:
185            return len(self._map)
186        finally:
187            self._lock.release()
188
189
190class Transport (threading.Thread):
191    """
192    An SSH Transport attaches to a stream (usually a socket), negotiates an
193    encrypted session, authenticates, and then creates stream tunnels, called
194    L{Channel}s, across the session.  Multiple channels can be multiplexed
195    across a single session (and often are, in the case of port forwardings).
196    """
197
198    _PROTO_ID = '2.0'
199    _CLIENT_ID = 'ssh_%s' % (ssh.__version__)
200
201    _preferred_ciphers = ( 'aes128-ctr', 'aes256-ctr', 'aes128-cbc', 'blowfish-cbc', 'aes256-cbc', '3des-cbc',
202        'arcfour128', 'arcfour256' )
203    _preferred_macs = ( 'hmac-sha1', 'hmac-md5', 'hmac-sha1-96', 'hmac-md5-96' )
204    _preferred_keys = ( 'ssh-rsa', 'ssh-dss' )
205    _preferred_kex = ( 'diffie-hellman-group1-sha1', 'diffie-hellman-group-exchange-sha1' )
206    _preferred_compression = ( 'none', )
207
208    _cipher_info = {
209        'aes128-ctr': { 'class': AES, 'mode': AES.MODE_CTR, 'block-size': 16, 'key-size': 16 },
210        'aes256-ctr': { 'class': AES, 'mode': AES.MODE_CTR, 'block-size': 16, 'key-size': 32 },
211        'blowfish-cbc': { 'class': Blowfish, 'mode': Blowfish.MODE_CBC, 'block-size': 8, 'key-size': 16 },
212        'aes128-cbc': { 'class': AES, 'mode': AES.MODE_CBC, 'block-size': 16, 'key-size': 16 },
213        'aes256-cbc': { 'class': AES, 'mode': AES.MODE_CBC, 'block-size': 16, 'key-size': 32 },
214        '3des-cbc': { 'class': DES3, 'mode': DES3.MODE_CBC, 'block-size': 8, 'key-size': 24 },
215        'arcfour128': { 'class': ARC4, 'mode': None, 'block-size': 8, 'key-size': 16 },
216        'arcfour256': { 'class': ARC4, 'mode': None, 'block-size': 8, 'key-size': 32 },
217        }
218
219    _mac_info = {
220        'hmac-sha1': { 'class': SHA, 'size': 20 },
221        'hmac-sha1-96': { 'class': SHA, 'size': 12 },
222        'hmac-md5': { 'class': MD5, 'size': 16 },
223        'hmac-md5-96': { 'class': MD5, 'size': 12 },
224        }
225
226    _key_info = {
227        'ssh-rsa': RSAKey,
228        'ssh-dss': DSSKey,
229        }
230
231    _kex_info = {
232        'diffie-hellman-group1-sha1': KexGroup1,
233        'diffie-hellman-group-exchange-sha1': KexGex,
234        }
235
236    _compression_info = {
237        # zlib@openssh.com is just zlib, but only turned on after a successful
238        # authentication.  openssh servers may only offer this type because
239        # they've had troubles with security holes in zlib in the past.
240        'zlib@openssh.com': ( ZlibCompressor, ZlibDecompressor ),
241        'zlib': ( ZlibCompressor, ZlibDecompressor ),
242        'none': ( None, None ),
243    }
244
245
246    _modulus_pack = None
247
248    def __init__(self, sock):
249        """
250        Create a new SSH session over an existing socket, or socket-like
251        object.  This only creates the Transport object; it doesn't begin the
252        SSH session yet.  Use L{connect} or L{start_client} to begin a client
253        session, or L{start_server} to begin a server session.
254
255        If the object is not actually a socket, it must have the following
256        methods:
257            - C{send(str)}: Writes from 1 to C{len(str)} bytes, and
258              returns an int representing the number of bytes written.  Returns
259              0 or raises C{EOFError} if the stream has been closed.
260            - C{recv(int)}: Reads from 1 to C{int} bytes and returns them as a
261              string.  Returns 0 or raises C{EOFError} if the stream has been
262              closed.
263            - C{close()}: Closes the socket.
264            - C{settimeout(n)}: Sets a (float) timeout on I/O operations.
265
266        For ease of use, you may also pass in an address (as a tuple) or a host
267        string as the C{sock} argument.  (A host string is a hostname with an
268        optional port (separated by C{":"}) which will be converted into a
269        tuple of C{(hostname, port)}.)  A socket will be connected to this
270        address and used for communication.  Exceptions from the C{socket} call
271        may be thrown in this case.
272
273        @param sock: a socket or socket-like object to create the session over.
274        @type sock: socket
275        """
276        if isinstance(sock, (str, unicode)):
277            # convert "host:port" into (host, port)
278            hl = sock.split(':', 1)
279            if len(hl) == 1:
280                sock = (hl[0], 22)
281            else:
282                sock = (hl[0], int(hl[1]))
283        if type(sock) is tuple:
284            # connect to the given (host, port)
285            hostname, port = sock
286            reason = 'No suitable address family'
287            for (family, socktype, proto, canonname, sockaddr) in socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
288                if socktype == socket.SOCK_STREAM:
289                    af = family
290                    addr = sockaddr
291                    sock = socket.socket(af, socket.SOCK_STREAM)
292                    try:
293                        retry_on_signal(lambda: sock.connect((hostname, port)))
294                    except socket.error, e:
295                        reason = str(e)
296                    else:
297                        break
298            else:
299                raise SSHException(
300                    'Unable to connect to %s: %s' % (hostname, reason))
301        # okay, normal socket-ish flow here...
302        threading.Thread.__init__(self)
303        self.setDaemon(True)
304        self.rng = rng
305        self.sock = sock
306        # Python < 2.3 doesn't have the settimeout method - RogerB
307        try:
308            # we set the timeout so we can check self.active periodically to
309            # see if we should bail.  socket.timeout exception is never
310            # propagated.
311            self.sock.settimeout(0.1)
312        except AttributeError:
313            pass
314
315        # negotiated crypto parameters
316        self.packetizer = Packetizer(sock)
317        self.local_version = 'SSH-' + self._PROTO_ID + '-' + self._CLIENT_ID
318        self.remote_version = ''
319        self.local_cipher = self.remote_cipher = ''
320        self.local_kex_init = self.remote_kex_init = None
321        self.local_mac = self.remote_mac = None
322        self.local_compression = self.remote_compression = None
323        self.session_id = None
324        self.host_key_type = None
325        self.host_key = None
326
327        # state used during negotiation
328        self.kex_engine = None
329        self.H = None
330        self.K = None
331
332        self.active = False
333        self.initial_kex_done = False
334        self.in_kex = False
335        self.authenticated = False
336        self._expected_packet = tuple()
337        self.lock = threading.Lock()    # synchronization (always higher level than write_lock)
338
339        # tracking open channels
340        self._channels = ChannelMap()
341        self.channel_events = { }       # (id -> Event)
342        self.channels_seen = { }        # (id -> True)
343        self._channel_counter = 1
344        self.window_size = 65536
345        self.max_packet_size = 34816
346        self._forward_agent_handler = None
347        self._x11_handler = None
348        self._tcp_handler = None
349
350        self.saved_exception = None
351        self.clear_to_send = threading.Event()
352        self.clear_to_send_lock = threading.Lock()
353        self.clear_to_send_timeout = 30.0
354        self.log_name = 'ssh.transport'
355        self.logger = util.get_logger(self.log_name)
356        self.packetizer.set_log(self.logger)
357        self.auth_handler = None
358        self.global_response = None     # response Message from an arbitrary global request
359        self.completion_event = None    # user-defined event callbacks
360        self.banner_timeout = 15        # how long (seconds) to wait for the SSH banner
361
362        # server mode:
363        self.server_mode = False
364        self.server_object = None
365        self.server_key_dict = { }
366        self.server_accepts = [ ]
367        self.server_accept_cv = threading.Condition(self.lock)
368        self.subsystem_table = { }
369
370    def __repr__(self):
371        """
372        Returns a string representation of this object, for debugging.
373
374        @rtype: str
375        """
376        out = '<ssh.Transport at %s' % hex(long(id(self)) & 0xffffffffL)
377        if not self.active:
378            out += ' (unconnected)'
379        else:
380            if self.local_cipher != '':
381                out += ' (cipher %s, %d bits)' % (self.local_cipher,
382                                                  self._cipher_info[self.local_cipher]['key-size'] * 8)
383            if self.is_authenticated():
384                out += ' (active; %d open channel(s))' % len(self._channels)
385            elif self.initial_kex_done:
386                out += ' (connected; awaiting auth)'
387            else:
388                out += ' (connecting)'
389        out += '>'
390        return out
391
392    def atfork(self):
393        """
394        Terminate this Transport without closing the session.  On posix
395        systems, if a Transport is open during process forking, both parent
396        and child will share the underlying socket, but only one process can
397        use the connection (without corrupting the session).  Use this method
398        to clean up a Transport object without disrupting the other process.
399
400        @since: 1.5.3
401        """
402        self.sock.close()
403        self.close()
404
405    def get_security_options(self):
406        """
407        Return a L{SecurityOptions} object which can be used to tweak the
408        encryption algorithms this transport will permit, and the order of
409        preference for them.
410
411        @return: an object that can be used to change the preferred algorithms
412            for encryption, digest (hash), public key, and key exchange.
413        @rtype: L{SecurityOptions}
414        """
415        return SecurityOptions(self)
416
417    def start_client(self, event=None):
418        """
419        Negotiate a new SSH2 session as a client.  This is the first step after
420        creating a new L{Transport}.  A separate thread is created for protocol
421        negotiation.
422
423        If an event is passed in, this method returns immediately.  When
424        negotiation is done (successful or not), the given C{Event} will
425        be triggered.  On failure, L{is_active} will return C{False}.
426
427        (Since 1.4) If C{event} is C{None}, this method will not return until
428        negotation is done.  On success, the method returns normally.
429        Otherwise an SSHException is raised.
430
431        After a successful negotiation, you will usually want to authenticate,
432        calling L{auth_password <Transport.auth_password>} or
433        L{auth_publickey <Transport.auth_publickey>}.
434
435        @note: L{connect} is a simpler method for connecting as a client.
436
437        @note: After calling this method (or L{start_server} or L{connect}),
438            you should no longer directly read from or write to the original
439            socket object.
440
441        @param event: an event to trigger when negotiation is complete
442            (optional)
443        @type event: threading.Event
444
445        @raise SSHException: if negotiation fails (and no C{event} was passed
446            in)
447        """
448        self.active = True
449        if event is not None:
450            # async, return immediately and let the app poll for completion
451            self.completion_event = event
452            self.start()
453            return
454
455        # synchronous, wait for a result
456        self.completion_event = event = threading.Event()
457        self.start()
458        Random.atfork()
459        while True:
460            event.wait(0.1)
461            if not self.active:
462                e = self.get_exception()
463                if e is not None:
464                    raise e
465                raise SSHException('Negotiation failed.')
466            if event.isSet():
467                break
468
469    def start_server(self, event=None, server=None):
470        """
471        Negotiate a new SSH2 session as a server.  This is the first step after
472        creating a new L{Transport} and setting up your server host key(s).  A
473        separate thread is created for protocol negotiation.
474
475        If an event is passed in, this method returns immediately.  When
476        negotiation is done (successful or not), the given C{Event} will
477        be triggered.  On failure, L{is_active} will return C{False}.
478
479        (Since 1.4) If C{event} is C{None}, this method will not return until
480        negotation is done.  On success, the method returns normally.
481        Otherwise an SSHException is raised.
482
483        After a successful negotiation, the client will need to authenticate.
484        Override the methods
485        L{get_allowed_auths <ServerInterface.get_allowed_auths>},
486        L{check_auth_none <ServerInterface.check_auth_none>},
487        L{check_auth_password <ServerInterface.check_auth_password>}, and
488        L{check_auth_publickey <ServerInterface.check_auth_publickey>} in the
489        given C{server} object to control the authentication process.
490
491        After a successful authentication, the client should request to open
492        a channel.  Override
493        L{check_channel_request <ServerInterface.check_channel_request>} in the
494        given C{server} object to allow channels to be opened.
495
496        @note: After calling this method (or L{start_client} or L{connect}),
497            you should no longer directly read from or write to the original
498            socket object.
499
500        @param event: an event to trigger when negotiation is complete.
501        @type event: threading.Event
502        @param server: an object used to perform authentication and create
503            L{Channel}s.
504        @type server: L{server.ServerInterface}
505
506        @raise SSHException: if negotiation fails (and no C{event} was passed
507            in)
508        """
509        if server is None:
510            server = ServerInterface()
511        self.server_mode = True
512        self.server_object = server
513        self.active = True
514        if event is not None:
515            # async, return immediately and let the app poll for completion
516            self.completion_event = event
517            self.start()
518            return
519
520        # synchronous, wait for a result
521        self.completion_event = event = threading.Event()
522        self.start()
523        while True:
524            event.wait(0.1)
525            if not self.active:
526                e = self.get_exception()
527                if e is not None:
528                    raise e
529                raise SSHException('Negotiation failed.')
530            if event.isSet():
531                break
532
533    def add_server_key(self, key):
534        """
535        Add a host key to the list of keys used for server mode.  When behaving
536        as a server, the host key is used to sign certain packets during the
537        SSH2 negotiation, so that the client can trust that we are who we say
538        we are.  Because this is used for signing, the key must contain private
539        key info, not just the public half.  Only one key of each type (RSA or
540        DSS) is kept.
541
542        @param key: the host key to add, usually an L{RSAKey <rsakey.RSAKey>} or
543            L{DSSKey <dsskey.DSSKey>}.
544        @type key: L{PKey <pkey.PKey>}
545        """
546        self.server_key_dict[key.get_name()] = key
547
548    def get_server_key(self):
549        """
550        Return the active host key, in server mode.  After negotiating with the
551        client, this method will return the negotiated host key.  If only one
552        type of host key was set with L{add_server_key}, that's the only key
553        that will ever be returned.  But in cases where you have set more than
554        one type of host key (for example, an RSA key and a DSS key), the key
555        type will be negotiated by the client, and this method will return the
556        key of the type agreed on.  If the host key has not been negotiated
557        yet, C{None} is returned.  In client mode, the behavior is undefined.
558
559        @return: host key of the type negotiated by the client, or C{None}.
560        @rtype: L{PKey <pkey.PKey>}
561        """
562        try:
563            return self.server_key_dict[self.host_key_type]
564        except KeyError:
565            pass
566        return None
567
568    def load_server_moduli(filename=None):
569        """
570        I{(optional)}
571        Load a file of prime moduli for use in doing group-exchange key
572        negotiation in server mode.  It's a rather obscure option and can be
573        safely ignored.
574
575        In server mode, the remote client may request "group-exchange" key
576        negotiation, which asks the server to send a random prime number that
577        fits certain criteria.  These primes are pretty difficult to compute,
578        so they can't be generated on demand.  But many systems contain a file
579        of suitable primes (usually named something like C{/etc/ssh/moduli}).
580        If you call C{load_server_moduli} and it returns C{True}, then this
581        file of primes has been loaded and we will support "group-exchange" in
582        server mode.  Otherwise server mode will just claim that it doesn't
583        support that method of key negotiation.
584
585        @param filename: optional path to the moduli file, if you happen to
586            know that it's not in a standard location.
587        @type filename: str
588        @return: True if a moduli file was successfully loaded; False
589            otherwise.
590        @rtype: bool
591
592        @note: This has no effect when used in client mode.
593        """
594        Transport._modulus_pack = ModulusPack(rng)
595        # places to look for the openssh "moduli" file
596        file_list = [ '/etc/ssh/moduli', '/usr/local/etc/moduli' ]
597        if filename is not None:
598            file_list.insert(0, filename)
599        for fn in file_list:
600            try:
601                Transport._modulus_pack.read_file(fn)
602                return True
603            except IOError:
604                pass
605        # none succeeded
606        Transport._modulus_pack = None
607        return False
608    load_server_moduli = staticmethod(load_server_moduli)
609
610    def close(self):
611        """
612        Close this session, and any open channels that are tied to it.
613        """
614        if not self.active:
615            return
616        self.active = False
617        self.packetizer.close()
618        self.join()
619        for chan in self._channels.values():
620            chan._unlink()
621
622    def get_remote_server_key(self):
623        """
624        Return the host key of the server (in client mode).
625
626        @note: Previously this call returned a tuple of (key type, key string).
627            You can get the same effect by calling
628            L{PKey.get_name <pkey.PKey.get_name>} for the key type, and
629            C{str(key)} for the key string.
630
631        @raise SSHException: if no session is currently active.
632
633        @return: public key of the remote server
634        @rtype: L{PKey <pkey.PKey>}
635        """
636        if (not self.active) or (not self.initial_kex_done):
637            raise SSHException('No existing session')
638        return self.host_key
639
640    def is_active(self):
641        """
642        Return true if this session is active (open).
643
644        @return: True if the session is still active (open); False if the
645            session is closed
646        @rtype: bool
647        """
648        return self.active
649
650    def open_session(self):
651        """
652        Request a new channel to the server, of type C{"session"}.  This
653        is just an alias for C{open_channel('session')}.
654
655        @return: a new L{Channel}
656        @rtype: L{Channel}
657
658        @raise SSHException: if the request is rejected or the session ends
659            prematurely
660        """
661        return self.open_channel('session')
662
663    def open_x11_channel(self, src_addr=None):
664        """
665        Request a new channel to the client, of type C{"x11"}.  This
666        is just an alias for C{open_channel('x11', src_addr=src_addr)}.
667
668        @param src_addr: the source address of the x11 server (port is the
669            x11 port, ie. 6010)
670        @type src_addr: (str, int)
671        @return: a new L{Channel}
672        @rtype: L{Channel}
673
674        @raise SSHException: if the request is rejected or the session ends
675            prematurely
676        """
677        return self.open_channel('x11', src_addr=src_addr)
678
679    def open_forward_agent_channel(self):
680        """
681        Request a new channel to the client, of type
682        C{"auth-agent@openssh.com"}.
683
684        This is just an alias for C{open_channel('auth-agent@openssh.com')}.
685        @return: a new L{Channel}
686        @rtype: L{Channel}
687
688        @raise SSHException: if the request is rejected or the session ends
689            prematurely
690        """
691        return self.open_channel('auth-agent@openssh.com')
692
693    def open_forwarded_tcpip_channel(self, (src_addr, src_port), (dest_addr, dest_port)):
694        """
695        Request a new channel back to the client, of type C{"forwarded-tcpip"}.
696        This is used after a client has requested port forwarding, for sending
697        incoming connections back to the client.
698
699        @param src_addr: originator's address
700        @param src_port: originator's port
701        @param dest_addr: local (server) connected address
702        @param dest_port: local (server) connected port
703        """
704        return self.open_channel('forwarded-tcpip', (dest_addr, dest_port), (src_addr, src_port))
705
706    def open_channel(self, kind, dest_addr=None, src_addr=None):
707        """
708        Request a new channel to the server.  L{Channel}s are socket-like
709        objects used for the actual transfer of data across the session.
710        You may only request a channel after negotiating encryption (using
711        L{connect} or L{start_client}) and authenticating.
712
713        @param kind: the kind of channel requested (usually C{"session"},
714            C{"forwarded-tcpip"}, C{"direct-tcpip"}, or C{"x11"})
715        @type kind: str
716        @param dest_addr: the destination address of this port forwarding,
717            if C{kind} is C{"forwarded-tcpip"} or C{"direct-tcpip"} (ignored
718            for other channel types)
719        @type dest_addr: (str, int)
720        @param src_addr: the source address of this port forwarding, if
721            C{kind} is C{"forwarded-tcpip"}, C{"direct-tcpip"}, or C{"x11"}
722        @type src_addr: (str, int)
723        @return: a new L{Channel} on success
724        @rtype: L{Channel}
725
726        @raise SSHException: if the request is rejected or the session ends
727            prematurely
728        """
729        if not self.active:
730            raise SSHException('SSH session not active')
731        self.lock.acquire()
732        try:
733            chanid = self._next_channel()
734            m = Message()
735            m.add_byte(chr(MSG_CHANNEL_OPEN))
736            m.add_string(kind)
737            m.add_int(chanid)
738            m.add_int(self.window_size)
739            m.add_int(self.max_packet_size)
740            if (kind == 'forwarded-tcpip') or (kind == 'direct-tcpip'):
741                m.add_string(dest_addr[0])
742                m.add_int(dest_addr[1])
743                m.add_string(src_addr[0])
744                m.add_int(src_addr[1])
745            elif kind == 'x11':
746                m.add_string(src_addr[0])
747                m.add_int(src_addr[1])
748            chan = Channel(chanid)
749            self._channels.put(chanid, chan)
750            self.channel_events[chanid] = event = threading.Event()
751            self.channels_seen[chanid] = True
752            chan._set_transport(self)
753            chan._set_window(self.window_size, self.max_packet_size)
754        finally:
755            self.lock.release()
756        self._send_user_message(m)
757        while True:
758            event.wait(0.1);
759            if not self.active:
760                e = self.get_exception()
761                if e is None:
762                    e = SSHException('Unable to open channel.')
763                raise e
764            if event.isSet():
765                break
766        chan = self._channels.get(chanid)
767        if chan is not None:
768            return chan
769        e = self.get_exception()
770        if e is None:
771            e = SSHException('Unable to open channel.')
772        raise e
773
774    def request_port_forward(self, address, port, handler=None):
775        """
776        Ask the server to forward TCP connections from a listening port on
777        the server, across this SSH session.
778
779        If a handler is given, that handler is called from a different thread
780        whenever a forwarded connection arrives.  The handler parameters are::
781
782            handler(channel, (origin_addr, origin_port), (server_addr, server_port))
783
784        where C{server_addr} and C{server_port} are the address and port that
785        the server was listening on.
786
787        If no handler is set, the default behavior is to send new incoming
788        forwarded connections into the accept queue, to be picked up via
789        L{accept}.
790
791        @param address: the address to bind when forwarding
792        @type address: str
793        @param port: the port to forward, or 0 to ask the server to allocate
794            any port
795        @type port: int
796        @param handler: optional handler for incoming forwarded connections
797        @type handler: function(Channel, (str, int), (str, int))
798        @return: the port # allocated by the server
799        @rtype: int
800
801        @raise SSHException: if the server refused the TCP forward request
802        """
803        if not self.active:
804            raise SSHException('SSH session not active')
805        address = str(address)
806        port = int(port)
807        response = self.global_request('tcpip-forward', (address, port), wait=True)
808        if response is None:
809            raise SSHException('TCP forwarding request denied')
810        if port == 0:
811            port = response.get_int()
812        if handler is None:
813            def default_handler(channel, (src_addr, src_port), (dest_addr, dest_port)):
814                self._queue_incoming_channel(channel)
815            handler = default_handler
816        self._tcp_handler = handler
817        return port
818
819    def cancel_port_forward(self, address, port):
820        """
821        Ask the server to cancel a previous port-forwarding request.  No more
822        connections to the given address & port will be forwarded across this
823        ssh connection.
824
825        @param address: the address to stop forwarding
826        @type address: str
827        @param port: the port to stop forwarding
828        @type port: int
829        """
830        if not self.active:
831            return
832        self._tcp_handler = None
833        self.global_request('cancel-tcpip-forward', (address, port), wait=True)
834
835    def open_sftp_client(self):
836        """
837        Create an SFTP client channel from an open transport.  On success,
838        an SFTP session will be opened with the remote host, and a new
839        SFTPClient object will be returned.
840
841        @return: a new L{SFTPClient} object, referring to an sftp session
842            (channel) across this transport
843        @rtype: L{SFTPClient}
844        """
845        return SFTPClient.from_transport(self)
846
847    def send_ignore(self, bytes=None):
848        """
849        Send a junk packet across the encrypted link.  This is sometimes used
850        to add "noise" to a connection to confuse would-be attackers.  It can
851        also be used as a keep-alive for long lived connections traversing
852        firewalls.
853
854        @param bytes: the number of random bytes to send in the payload of the
855            ignored packet -- defaults to a random number from 10 to 41.
856        @type bytes: int
857        """
858        m = Message()
859        m.add_byte(chr(MSG_IGNORE))
860        if bytes is None:
861            bytes = (ord(rng.read(1)) % 32) + 10
862        m.add_bytes(rng.read(bytes))
863        self._send_user_message(m)
864
865    def renegotiate_keys(self):
866        """
867        Force this session to switch to new keys.  Normally this is done
868        automatically after the session hits a certain number of packets or
869        bytes sent or received, but this method gives you the option of forcing
870        new keys whenever you want.  Negotiating new keys causes a pause in
871        traffic both ways as the two sides swap keys and do computations.  This
872        method returns when the session has switched to new keys.
873
874        @raise SSHException: if the key renegotiation failed (which causes the
875            session to end)
876        """
877        self.completion_event = threading.Event()
878        self._send_kex_init()
879        while True:
880            self.completion_event.wait(0.1)
881            if not self.active:
882                e = self.get_exception()
883                if e is not None:
884                    raise e
885                raise SSHException('Negotiation failed.')
886            if self.completion_event.isSet():
887                break
888        return
889
890    def set_keepalive(self, interval):
891        """
892        Turn on/off keepalive packets (default is off).  If this is set, after
893        C{interval} seconds without sending any data over the connection, a
894        "keepalive" packet will be sent (and ignored by the remote host).  This
895        can be useful to keep connections alive over a NAT, for example.
896
897        @param interval: seconds to wait before sending a keepalive packet (or
898            0 to disable keepalives).
899        @type interval: int
900        """
901        self.packetizer.set_keepalive(interval,
902            lambda x=weakref.proxy(self): x.global_request('keepalive@lag.net', wait=False))
903
904    def global_request(self, kind, data=None, wait=True):
905        """
906        Make a global request to the remote host.  These are normally
907        extensions to the SSH2 protocol.
908
909        @param kind: name of the request.
910        @type kind: str
911        @param data: an optional tuple containing additional data to attach
912            to the request.
913        @type data: tuple
914        @param wait: C{True} if this method should not return until a response
915            is received; C{False} otherwise.
916        @type wait: bool
917        @return: a L{Message} containing possible additional data if the
918            request was successful (or an empty L{Message} if C{wait} was
919            C{False}); C{None} if the request was denied.
920        @rtype: L{Message}
921        """
922        if wait:
923            self.completion_event = threading.Event()
924        m = Message()
925        m.add_byte(chr(MSG_GLOBAL_REQUEST))
926        m.add_string(kind)
927        m.add_boolean(wait)
928        if data is not None:
929            m.add(*data)
930        self._log(DEBUG, 'Sending global request "%s"' % kind)
931        self._send_user_message(m)
932        if not wait:
933            return None
934        while True:
935            self.completion_event.wait(0.1)
936            if not self.active:
937                return None
938            if self.completion_event.isSet():
939                break
940        return self.global_response
941
942    def accept(self, timeout=None):
943        """
944        Return the next channel opened by the client over this transport, in
945        server mode.  If no channel is opened before the given timeout, C{None}
946        is returned.
947
948        @param timeout: seconds to wait for a channel, or C{None} to wait
949            forever
950        @type timeout: int
951        @return: a new Channel opened by the client
952        @rtype: L{Channel}
953        """
954        self.lock.acquire()
955        try:
956            if len(self.server_accepts) > 0:
957                chan = self.server_accepts.pop(0)
958            else:
959                self.server_accept_cv.wait(timeout)
960                if len(self.server_accepts) > 0:
961                    chan = self.server_accepts.pop(0)
962                else:
963                    # timeout
964                    chan = None
965        finally:
966            self.lock.release()
967        return chan
968
969    def connect(self, hostkey=None, username='', password=None, pkey=None):
970        """
971        Negotiate an SSH2 session, and optionally verify the server's host key
972        and authenticate using a password or private key.  This is a shortcut
973        for L{start_client}, L{get_remote_server_key}, and
974        L{Transport.auth_password} or L{Transport.auth_publickey}.  Use those
975        methods if you want more control.
976
977        You can use this method immediately after creating a Transport to
978        negotiate encryption with a server.  If it fails, an exception will be
979        thrown.  On success, the method will return cleanly, and an encrypted
980        session exists.  You may immediately call L{open_channel} or
981        L{open_session} to get a L{Channel} object, which is used for data
982        transfer.
983
984        @note: If you fail to supply a password or private key, this method may
985        succeed, but a subsequent L{open_channel} or L{open_session} call may
986        fail because you haven't authenticated yet.
987
988        @param hostkey: the host key expected from the server, or C{None} if
989            you don't want to do host key verification.
990        @type hostkey: L{PKey<pkey.PKey>}
991        @param username: the username to authenticate as.
992        @type username: str
993        @param password: a password to use for authentication, if you want to
994            use password authentication; otherwise C{None}.
995        @type password: str
996        @param pkey: a private key to use for authentication, if you want to
997            use private key authentication; otherwise C{None}.
998        @type pkey: L{PKey<pkey.PKey>}
999
1000        @raise SSHException: if the SSH2 negotiation fails, the host key
1001            supplied by the server is incorrect, or authentication fails.
1002        """
1003        if hostkey is not None:
1004            self._preferred_keys = [ hostkey.get_name() ]
1005
1006        self.start_client()
1007
1008        # check host key if we were given one
1009        if (hostkey is not None):
1010            key = self.get_remote_server_key()
1011            if (key.get_name() != hostkey.get_name()) or (str(key) != str(hostkey)):
1012                self._log(DEBUG, 'Bad host key from server')
1013                self._log(DEBUG, 'Expected: %s: %s' % (hostkey.get_name(), repr(str(hostkey))))
1014                self._log(DEBUG, 'Got     : %s: %s' % (key.get_name(), repr(str(key))))
1015                raise SSHException('Bad host key from server')
1016            self._log(DEBUG, 'Host key verified (%s)' % hostkey.get_name())
1017
1018        if (pkey is not None) or (password is not None):
1019            if password is not None:
1020                self._log(DEBUG, 'Attempting password auth...')
1021                self.auth_password(username, password)
1022            else:
1023                self._log(DEBUG, 'Attempting public-key auth...')
1024                self.auth_publickey(username, pkey)
1025
1026        return
1027
1028    def get_exception(self):
1029        """
1030        Return any exception that happened during the last server request.
1031        This can be used to fetch more specific error information after using
1032        calls like L{start_client}.  The exception (if any) is cleared after
1033        this call.
1034
1035        @return: an exception, or C{None} if there is no stored exception.
1036        @rtype: Exception
1037
1038        @since: 1.1
1039        """
1040        self.lock.acquire()
1041        try:
1042            e = self.saved_exception
1043            self.saved_exception = None
1044            return e
1045        finally:
1046            self.lock.release()
1047
1048    def set_subsystem_handler(self, name, handler, *larg, **kwarg):
1049        """
1050        Set the handler class for a subsystem in server mode.  If a request
1051        for this subsystem is made on an open ssh channel later, this handler
1052        will be constructed and called -- see L{SubsystemHandler} for more
1053        detailed documentation.
1054
1055        Any extra parameters (including keyword arguments) are saved and
1056        passed to the L{SubsystemHandler} constructor later.
1057
1058        @param name: name of the subsystem.
1059        @type name: str
1060        @param handler: subclass of L{SubsystemHandler} that handles this
1061            subsystem.
1062        @type handler: class
1063        """
1064        try:
1065            self.lock.acquire()
1066            self.subsystem_table[name] = (handler, larg, kwarg)
1067        finally:
1068            self.lock.release()
1069
1070    def is_authenticated(self):
1071        """
1072        Return true if this session is active and authenticated.
1073
1074        @return: True if the session is still open and has been authenticated
1075            successfully; False if authentication failed and/or the session is
1076            closed.
1077        @rtype: bool
1078        """
1079        return self.active and (self.auth_handler is not None) and self.auth_handler.is_authenticated()
1080
1081    def get_username(self):
1082        """
1083        Return the username this connection is authenticated for.  If the
1084        session is not authenticated (or authentication failed), this method
1085        returns C{None}.
1086
1087        @return: username that was authenticated, or C{None}.
1088        @rtype: string
1089        """
1090        if not self.active or (self.auth_handler is None):
1091            return None
1092        return self.auth_handler.get_username()
1093
1094    def auth_none(self, username):
1095        """
1096        Try to authenticate to the server using no authentication at all.
1097        This will almost always fail.  It may be useful for determining the
1098        list of authentication types supported by the server, by catching the
1099        L{BadAuthenticationType} exception raised.
1100
1101        @param username: the username to authenticate as
1102        @type username: string
1103        @return: list of auth types permissible for the next stage of
1104            authentication (normally empty)
1105        @rtype: list
1106
1107        @raise BadAuthenticationType: if "none" authentication isn't allowed
1108            by the server for this user
1109        @raise SSHException: if the authentication failed due to a network
1110            error
1111
1112        @since: 1.5
1113        """
1114        if (not self.active) or (not self.initial_kex_done):
1115            raise SSHException('No existing session')
1116        my_event = threading.Event()
1117        self.auth_handler = AuthHandler(self)
1118        self.auth_handler.auth_none(username, my_event)
1119        return self.auth_handler.wait_for_response(my_event)
1120
1121    def auth_password(self, username, password, event=None, fallback=True):
1122        """
1123        Authenticate to the server using a password.  The username and password
1124        are sent over an encrypted link.
1125
1126        If an C{event} is passed in, this method will return immediately, and
1127        the event will be triggered once authentication succeeds or fails.  On
1128        success, L{is_authenticated} will return C{True}.  On failure, you may
1129        use L{get_exception} to get more detailed error information.
1130
1131        Since 1.1, if no event is passed, this method will block until the
1132        authentication succeeds or fails.  On failure, an exception is raised.
1133        Otherwise, the method simply returns.
1134
1135        Since 1.5, if no event is passed and C{fallback} is C{True} (the
1136        default), if the server doesn't support plain password authentication
1137        but does support so-called "keyboard-interactive" mode, an attempt
1138        will be made to authenticate using this interactive mode.  If it fails,
1139        the normal exception will be thrown as if the attempt had never been
1140        made.  This is useful for some recent Gentoo and Debian distributions,
1141        which turn off plain password authentication in a misguided belief
1142        that interactive authentication is "more secure".  (It's not.)
1143
1144        If the server requires multi-step authentication (which is very rare),
1145        this method will return a list of auth types permissible for the next
1146        step.  Otherwise, in the normal case, an empty list is returned.
1147
1148        @param username: the username to authenticate as
1149        @type username: str
1150        @param password: the password to authenticate with
1151        @type password: str or unicode
1152        @param event: an event to trigger when the authentication attempt is
1153            complete (whether it was successful or not)
1154        @type event: threading.Event
1155        @param fallback: C{True} if an attempt at an automated "interactive"
1156            password auth should be made if the server doesn't support normal
1157            password auth
1158        @type fallback: bool
1159        @return: list of auth types permissible for the next stage of
1160            authentication (normally empty)
1161        @rtype: list
1162
1163        @raise BadAuthenticationType: if password authentication isn't
1164            allowed by the server for this user (and no event was passed in)
1165        @raise AuthenticationException: if the authentication failed (and no
1166            event was passed in)
1167        @raise SSHException: if there was a network error
1168        """
1169        if (not self.active) or (not self.initial_kex_done):
1170            # we should never try to send the password unless we're on a secure link
1171            raise SSHException('No existing session')
1172        if event is None:
1173            my_event = threading.Event()
1174        else:
1175            my_event = event
1176        self.auth_handler = AuthHandler(self)
1177        self.auth_handler.auth_password(username, password, my_event)
1178        if event is not None:
1179            # caller wants to wait for event themselves
1180            return []
1181        try:
1182            return self.auth_handler.wait_for_response(my_event)
1183        except BadAuthenticationType, x:
1184            # if password auth isn't allowed, but keyboard-interactive *is*, try to fudge it
1185            if not fallback or ('keyboard-interactive' not in x.allowed_types):
1186                raise
1187            try:
1188                def handler(title, instructions, fields):
1189                    if len(fields) > 1:
1190                        raise SSHException('Fallback authentication failed.')
1191                    if len(fields) == 0:
1192                        # for some reason, at least on os x, a 2nd request will
1193                        # be made with zero fields requested.  maybe it's just
1194                        # to try to fake out automated scripting of the exact
1195                        # type we're doing here.  *shrug* :)
1196                        return []
1197                    return [ password ]
1198                return self.auth_interactive(username, handler)
1199            except SSHException, ignored:
1200                # attempt failed; just raise the original exception
1201                raise x
1202        return None
1203
1204    def auth_publickey(self, username, key, event=None):
1205        """
1206        Authenticate to the server using a private key.  The key is used to
1207        sign data from the server, so it must include the private part.
1208
1209        If an C{event} is passed in, this method will return immediately, and
1210        the event will be triggered once authentication succeeds or fails.  On
1211        success, L{is_authenticated} will return C{True}.  On failure, you may
1212        use L{get_exception} to get more detailed error information.
1213
1214        Since 1.1, if no event is passed, this method will block until the
1215        authentication succeeds or fails.  On failure, an exception is raised.
1216        Otherwise, the method simply returns.
1217
1218        If the server requires multi-step authentication (which is very rare),
1219        this method will return a list of auth types permissible for the next
1220        step.  Otherwise, in the normal case, an empty list is returned.
1221
1222        @param username: the username to authenticate as
1223        @type username: string
1224        @param key: the private key to authenticate with
1225        @type key: L{PKey <pkey.PKey>}
1226        @param event: an event to trigger when the authentication attempt is
1227            complete (whether it was successful or not)
1228        @type event: threading.Event
1229        @return: list of auth types permissible for the next stage of
1230            authentication (normally empty)
1231        @rtype: list
1232
1233        @raise BadAuthenticationType: if public-key authentication isn't
1234            allowed by the server for this user (and no event was passed in)
1235        @raise AuthenticationException: if the authentication failed (and no
1236            event was passed in)
1237        @raise SSHException: if there was a network error
1238        """
1239        if (not self.active) or (not self.initial_kex_done):
1240            # we should never try to authenticate unless we're on a secure link
1241            raise SSHException('No existing session')
1242        if event is None:
1243            my_event = threading.Event()
1244        else:
1245            my_event = event
1246        self.auth_handler = AuthHandler(self)
1247        self.auth_handler.auth_publickey(username, key, my_event)
1248        if event is not None:
1249            # caller wants to wait for event themselves
1250            return []
1251        return self.auth_handler.wait_for_response(my_event)
1252
1253    def auth_interactive(self, username, handler, submethods=''):
1254        """
1255        Authenticate to the server interactively.  A handler is used to answer
1256        arbitrary questions from the server.  On many servers, this is just a
1257        dumb wrapper around PAM.
1258
1259        This method will block until the authentication succeeds or fails,
1260        peroidically calling the handler asynchronously to get answers to
1261        authentication questions.  The handler may be called more than once
1262        if the server continues to ask questions.
1263
1264        The handler is expected to be a callable that will handle calls of the
1265        form: C{handler(title, instructions, prompt_list)}.  The C{title} is
1266        meant to be a dialog-window title, and the C{instructions} are user
1267        instructions (both are strings).  C{prompt_list} will be a list of
1268        prompts, each prompt being a tuple of C{(str, bool)}.  The string is
1269        the prompt and the boolean indicates whether the user text should be
1270        echoed.
1271
1272        A sample call would thus be:
1273        C{handler('title', 'instructions', [('Password:', False)])}.
1274
1275        The handler should return a list or tuple of answers to the server's
1276        questions.
1277
1278        If the server requires multi-step authentication (which is very rare),
1279        this method will return a list of auth types permissible for the next
1280        step.  Otherwise, in the normal case, an empty list is returned.
1281
1282        @param username: the username to authenticate as
1283        @type username: string
1284        @param handler: a handler for responding to server questions
1285        @type handler: callable
1286        @param submethods: a string list of desired submethods (optional)
1287        @type submethods: str
1288        @return: list of auth types permissible for the next stage of
1289            authentication (normally empty).
1290        @rtype: list
1291
1292        @raise BadAuthenticationType: if public-key authentication isn't
1293            allowed by the server for this user
1294        @raise AuthenticationException: if the authentication failed
1295        @raise SSHException: if there was a network error
1296
1297        @since: 1.5
1298        """
1299        if (not self.active) or (not self.initial_kex_done):
1300            # we should never try to authenticate unless we're on a secure link
1301            raise SSHException('No existing session')
1302        my_event = threading.Event()
1303        self.auth_handler = AuthHandler(self)
1304        self.auth_handler.auth_interactive(username, handler, my_event, submethods)
1305        return self.auth_handler.wait_for_response(my_event)
1306
1307    def set_log_channel(self, name):
1308        """
1309        Set the channel for this transport's logging.  The default is
1310        C{"ssh.transport"} but it can be set to anything you want.
1311        (See the C{logging} module for more info.)  SSH Channels will log
1312        to a sub-channel of the one specified.
1313
1314        @param name: new channel name for logging
1315        @type name: str
1316
1317        @since: 1.1
1318        """
1319        self.log_name = name
1320        self.logger = util.get_logger(name)
1321        self.packetizer.set_log(self.logger)
1322
1323    def get_log_channel(self):
1324        """
1325        Return the channel name used for this transport's logging.
1326
1327        @return: channel name.
1328        @rtype: str
1329
1330        @since: 1.2
1331        """
1332        return self.log_name
1333
1334    def set_hexdump(self, hexdump):
1335        """
1336        Turn on/off logging a hex dump of protocol traffic at DEBUG level in
1337        the logs.  Normally you would want this off (which is the default),
1338        but if you are debugging something, it may be useful.
1339
1340        @param hexdump: C{True} to log protocol traffix (in hex) to the log;
1341            C{False} otherwise.
1342        @type hexdump: bool
1343        """
1344        self.packetizer.set_hexdump(hexdump)
1345
1346    def get_hexdump(self):
1347        """
1348        Return C{True} if the transport is currently logging hex dumps of
1349        protocol traffic.
1350
1351        @return: C{True} if hex dumps are being logged
1352        @rtype: bool
1353
1354        @since: 1.4
1355        """
1356        return self.packetizer.get_hexdump()
1357
1358    def use_compression(self, compress=True):
1359        """
1360        Turn on/off compression.  This will only have an affect before starting
1361        the transport (ie before calling L{connect}, etc).  By default,
1362        compression is off since it negatively affects interactive sessions.
1363
1364        @param compress: C{True} to ask the remote client/server to compress
1365            traffic; C{False} to refuse compression
1366        @type compress: bool
1367
1368        @since: 1.5.2
1369        """
1370        if compress:
1371            self._preferred_compression = ( 'zlib@openssh.com', 'zlib', 'none' )
1372        else:
1373            self._preferred_compression = ( 'none', )
1374
1375    def getpeername(self):
1376        """
1377        Return the address of the remote side of this Transport, if possible.
1378        This is effectively a wrapper around C{'getpeername'} on the underlying
1379        socket.  If the socket-like object has no C{'getpeername'} method,
1380        then C{("unknown", 0)} is returned.
1381
1382        @return: the address if the remote host, if known
1383        @rtype: tuple(str, int)
1384        """
1385        gp = getattr(self.sock, 'getpeername', None)
1386        if gp is None:
1387            return ('unknown', 0)
1388        return gp()
1389
1390    def stop_thread(self):
1391        self.active = False
1392        self.packetizer.close()
1393
1394
1395    ###  internals...
1396
1397
1398    def _log(self, level, msg, *args):
1399        if issubclass(type(msg), list):
1400            for m in msg:
1401                self.logger.log(level, m)
1402        else:
1403            self.logger.log(level, msg, *args)
1404
1405    def _get_modulus_pack(self):
1406        "used by KexGex to find primes for group exchange"
1407        return self._modulus_pack
1408
1409    def _next_channel(self):
1410        "you are holding the lock"
1411        chanid = self._channel_counter
1412        while self._channels.get(chanid) is not None:
1413            self._channel_counter = (self._channel_counter + 1) & 0xffffff
1414            chanid = self._channel_counter
1415        self._channel_counter = (self._channel_counter + 1) & 0xffffff
1416        return chanid
1417
1418    def _unlink_channel(self, chanid):
1419        "used by a Channel to remove itself from the active channel list"
1420        self._channels.delete(chanid)
1421
1422    def _send_message(self, data):
1423        self.packetizer.send_message(data)
1424
1425    def _send_user_message(self, data):
1426        """
1427        send a message, but block if we're in key negotiation.  this is used
1428        for user-initiated requests.
1429        """
1430        start = time.time()
1431        while True:
1432            self.clear_to_send.wait(0.1)
1433            if not self.active:
1434                self._log(DEBUG, 'Dropping user packet because connection is dead.')
1435                return
1436            self.clear_to_send_lock.acquire()
1437            if self.clear_to_send.isSet():
1438                break
1439            self.clear_to_send_lock.release()
1440            if time.time() > start + self.clear_to_send_timeout:
1441              raise SSHException('Key-exchange timed out waiting for key negotiation')
1442        try:
1443            self._send_message(data)
1444        finally:
1445            self.clear_to_send_lock.release()
1446
1447    def _set_K_H(self, k, h):
1448        "used by a kex object to set the K (root key) and H (exchange hash)"
1449        self.K = k
1450        self.H = h
1451        if self.session_id == None:
1452            self.session_id = h
1453
1454    def _expect_packet(self, *ptypes):
1455        "used by a kex object to register the next packet type it expects to see"
1456        self._expected_packet = tuple(ptypes)
1457
1458    def _verify_key(self, host_key, sig):
1459        key = self._key_info[self.host_key_type](Message(host_key))
1460        if key is None:
1461            raise SSHException('Unknown host key type')
1462        if not key.verify_ssh_sig(self.H, Message(sig)):
1463            raise SSHException('Signature verification (%s) failed.' % self.host_key_type)
1464        self.host_key = key
1465
1466    def _compute_key(self, id, nbytes):
1467        "id is 'A' - 'F' for the various keys used by ssh"
1468        m = Message()
1469        m.add_mpint(self.K)
1470        m.add_bytes(self.H)
1471        m.add_byte(id)
1472        m.add_bytes(self.session_id)
1473        out = sofar = SHA.new(str(m)).digest()
1474        while len(out) < nbytes:
1475            m = Message()
1476            m.add_mpint(self.K)
1477            m.add_bytes(self.H)
1478            m.add_bytes(sofar)
1479            digest = SHA.new(str(m)).digest()
1480            out += digest
1481            sofar += digest
1482        return out[:nbytes]
1483
1484    def _get_cipher(self, name, key, iv):
1485        if name not in self._cipher_info:
1486            raise SSHException('Unknown client cipher ' + name)
1487        if name in ('arcfour128', 'arcfour256'):
1488            # arcfour cipher
1489            cipher = self._cipher_info[name]['class'].new(key)
1490            # as per RFC 4345, the first 1536 bytes of keystream
1491            # generated by the cipher MUST be discarded
1492            cipher.encrypt(" " * 1536)
1493            return cipher
1494        elif name.endswith("-ctr"):
1495            # CTR modes, we need a counter
1496            counter = Counter.new(nbits=self._cipher_info[name]['block-size'] * 8, initial_value=util.inflate_long(iv, True))
1497            return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv, counter)
1498        else:
1499            return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
1500
1501    def _set_forward_agent_handler(self, handler):
1502        if handler is None:
1503            def default_handler(channel):
1504                self._queue_incoming_channel(channel)
1505            self._forward_agent_handler = default_handler
1506        else:
1507            self._forward_agent_handler = handler
1508
1509    def _set_x11_handler(self, handler):
1510        # only called if a channel has turned on x11 forwarding
1511        if handler is None:
1512            # by default, use the same mechanism as accept()
1513            def default_handler(channel, (src_addr, src_port)):
1514                self._queue_incoming_channel(channel)
1515            self._x11_handler = default_handler
1516        else:
1517            self._x11_handler = handler
1518
1519    def _queue_incoming_channel(self, channel):
1520        self.lock.acquire()
1521        try:
1522            self.server_accepts.append(channel)
1523            self.server_accept_cv.notify()
1524        finally:
1525            self.lock.release()
1526
1527    def run(self):
1528        # (use the exposed "run" method, because if we specify a thread target
1529        # of a private method, threading.Thread will keep a reference to it
1530        # indefinitely, creating a GC cycle and not letting Transport ever be
1531        # GC'd. it's a bug in Thread.)
1532
1533        # Hold reference to 'sys' so we can test sys.modules to detect
1534        # interpreter shutdown.
1535        self.sys = sys
1536
1537        # Required to prevent RNG errors when running inside many subprocess
1538        # containers.
1539        Random.atfork()
1540
1541        # active=True occurs before the thread is launched, to avoid a race
1542        _active_threads.append(self)
1543        if self.server_mode:
1544            self._log(DEBUG, 'starting thread (server mode): %s' % hex(long(id(self)) & 0xffffffffL))
1545        else:
1546            self._log(DEBUG, 'starting thread (client mode): %s' % hex(long(id(self)) & 0xffffffffL))
1547        try:
1548            try:
1549                self.packetizer.write_all(self.local_version + '\r\n')
1550                self._check_banner()
1551                self._send_kex_init()
1552                self._expect_packet(MSG_KEXINIT)
1553
1554                while self.active:
1555                    if self.packetizer.need_rekey() and not self.in_kex:
1556                        self._send_kex_init()
1557                    try:
1558                        ptype, m = self.packetizer.read_message()
1559                    except NeedRekeyException:
1560                        continue
1561                    if ptype == MSG_IGNORE:
1562                        continue
1563                    elif ptype == MSG_DISCONNECT:
1564                        self._parse_disconnect(m)
1565                        self.active = False
1566                        self.packetizer.close()
1567                        break
1568                    elif ptype == MSG_DEBUG:
1569                        self._parse_debug(m)
1570                        continue
1571                    if len(self._expected_packet) > 0:
1572                        if ptype not in self._expected_packet:
1573                            raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype))
1574                        self._expected_packet = tuple()
1575                        if (ptype >= 30) and (ptype <= 39):
1576                            self.kex_engine.parse_next(ptype, m)
1577                            continue
1578
1579                    if ptype in self._handler_table:
1580                        self._handler_table[ptype](self, m)
1581                    elif ptype in self._channel_handler_table:
1582                        chanid = m.get_int()
1583                        chan = self._channels.get(chanid)
1584                        if chan is not None:
1585                            self._channel_handler_table[ptype](chan, m)
1586                        elif chanid in self.channels_seen:
1587                            self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
1588                        else:
1589                            self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
1590                            self.active = False
1591                            self.packetizer.close()
1592                    elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
1593                        self.auth_handler._handler_table[ptype](self.auth_handler, m)
1594                    else:
1595                        self._log(WARNING, 'Oops, unhandled type %d' % ptype)
1596                        msg = Message()
1597                        msg.add_byte(chr(MSG_UNIMPLEMENTED))
1598                        msg.add_int(m.seqno)
1599                        self._send_message(msg)
1600            except SSHException, e:
1601                self._log(ERROR, 'Exception: ' + str(e))
1602                self._log(ERROR, util.tb_strings())
1603                self.saved_exception = e
1604            except EOFError, e:
1605                self._log(DEBUG, 'EOF in transport thread')
1606                #self._log(DEBUG, util.tb_strings())
1607                self.saved_exception = e
1608            except socket.error, e:
1609                if type(e.args) is tuple:
1610                    emsg = '%s (%d)' % (e.args[1], e.args[0])
1611                else:
1612                    emsg = e.args
1613                self._log(ERROR, 'Socket exception: ' + emsg)
1614                self.saved_exception = e
1615            except Exception, e:
1616                self._log(ERROR, 'Unknown exception: ' + str(e))
1617                self._log(ERROR, util.tb_strings())
1618                self.saved_exception = e
1619            _active_threads.remove(self)
1620            for chan in self._channels.values():
1621                chan._unlink()
1622            if self.active:
1623                self.active = False
1624                self.packetizer.close()
1625                if self.completion_event != None:
1626                    self.completion_event.set()
1627                if self.auth_handler is not None:
1628                    self.auth_handler.abort()
1629                for event in self.channel_events.values():
1630                    event.set()
1631                try:
1632                    self.lock.acquire()
1633                    self.server_accept_cv.notify()
1634                finally:
1635                    self.lock.release()
1636            self.sock.close()
1637        except:
1638            # Don't raise spurious 'NoneType has no attribute X' errors when we
1639            # wake up during interpreter shutdown. Or rather -- raise
1640            # everything *if* sys.modules (used as a convenient sentinel)
1641            # appears to still exist.
1642            if self.sys.modules is not None:
1643                raise
1644
1645
1646    ###  protocol stages
1647
1648
1649    def _negotiate_keys(self, m):
1650        # throws SSHException on anything unusual
1651        self.clear_to_send_lock.acquire()
1652        try:
1653            self.clear_to_send.clear()
1654        finally:
1655            self.clear_to_send_lock.release()
1656        if self.local_kex_init == None:
1657            # remote side wants to renegotiate
1658            self._send_kex_init()
1659        self._parse_kex_init(m)
1660        self.kex_engine.start_kex()
1661
1662    def _check_banner(self):
1663        # this is slow, but we only have to do it once
1664        for i in range(100):
1665            # give them 15 seconds for the first line, then just 2 seconds
1666            # each additional line.  (some sites have very high latency.)
1667            if i == 0:
1668                timeout = self.banner_timeout
1669            else:
1670                timeout = 2
1671            try:
1672                buf = self.packetizer.readline(timeout)
1673            except Exception, x:
1674                raise SSHException('Error reading SSH protocol banner' + str(x))
1675            if buf[:4] == 'SSH-':
1676                break
1677            self._log(DEBUG, 'Banner: ' + buf)
1678        if buf[:4] != 'SSH-':
1679            raise SSHException('Indecipherable protocol version "' + buf + '"')
1680        # save this server version string for later
1681        self.remote_version = buf
1682        # pull off any attached comment
1683        comment = ''
1684        i = string.find(buf, ' ')
1685        if i >= 0:
1686            comment = buf[i+1:]
1687            buf = buf[:i]
1688        # parse out version string and make sure it matches
1689        segs = buf.split('-', 2)
1690        if len(segs) < 3:
1691            raise SSHException('Invalid SSH banner')
1692        version = segs[1]
1693        client = segs[2]
1694        if version != '1.99' and version != '2.0':
1695            raise SSHException('Incompatible version (%s instead of 2.0)' % (version,))
1696        self._log(INFO, 'Connected (version %s, client %s)' % (version, client))
1697
1698    def _send_kex_init(self):
1699        """
1700        announce to the other side that we'd like to negotiate keys, and what
1701        kind of key negotiation we support.
1702        """
1703        self.clear_to_send_lock.acquire()
1704        try:
1705            self.clear_to_send.clear()
1706        finally:
1707            self.clear_to_send_lock.release()
1708        self.in_kex = True
1709        if self.server_mode:
1710            if (self._modulus_pack is None) and ('diffie-hellman-group-exchange-sha1' in self._preferred_kex):
1711                # can't do group-exchange if we don't have a pack of potential primes
1712                pkex = list(self.get_security_options().kex)
1713                pkex.remove('diffie-hellman-group-exchange-sha1')
1714                self.get_security_options().kex = pkex
1715            available_server_keys = filter(self.server_key_dict.keys().__contains__,
1716                                           self._preferred_keys)
1717        else:
1718            available_server_keys = self._preferred_keys
1719
1720        m = Message()
1721        m.add_byte(chr(MSG_KEXINIT))
1722        m.add_bytes(rng.read(16))
1723        m.add_list(self._preferred_kex)
1724        m.add_list(available_server_keys)
1725        m.add_list(self._preferred_ciphers)
1726        m.add_list(self._preferred_ciphers)
1727        m.add_list(self._preferred_macs)
1728        m.add_list(self._preferred_macs)
1729        m.add_list(self._preferred_compression)
1730        m.add_list(self._preferred_compression)
1731        m.add_string('')
1732        m.add_string('')
1733        m.add_boolean(False)
1734        m.add_int(0)
1735        # save a copy for later (needed to compute a hash)
1736        self.local_kex_init = str(m)
1737        self._send_message(m)
1738
1739    def _parse_kex_init(self, m):
1740        cookie = m.get_bytes(16)
1741        kex_algo_list = m.get_list()
1742        server_key_algo_list = m.get_list()
1743        client_encrypt_algo_list = m.get_list()
1744        server_encrypt_algo_list = m.get_list()
1745        client_mac_algo_list = m.get_list()
1746        server_mac_algo_list = m.get_list()
1747        client_compress_algo_list = m.get_list()
1748        server_compress_algo_list = m.get_list()
1749        client_lang_list = m.get_list()
1750        server_lang_list = m.get_list()
1751        kex_follows = m.get_boolean()
1752        unused = m.get_int()
1753
1754        self._log(DEBUG, 'kex algos:' + str(kex_algo_list) + ' server key:' + str(server_key_algo_list) + \
1755                  ' client encrypt:' + str(client_encrypt_algo_list) + \
1756                  ' server encrypt:' + str(server_encrypt_algo_list) + \
1757                  ' client mac:' + str(client_mac_algo_list) + \
1758                  ' server mac:' + str(server_mac_algo_list) + \
1759                  ' client compress:' + str(client_compress_algo_list) + \
1760                  ' server compress:' + str(server_compress_algo_list) + \
1761                  ' client lang:' + str(client_lang_list) + \
1762                  ' server lang:' + str(server_lang_list) + \
1763                  ' kex follows?' + str(kex_follows))
1764
1765        # as a server, we pick the first item in the client's list that we support.
1766        # as a client, we pick the first item in our list that the server supports.
1767        if self.server_mode:
1768            agreed_kex = filter(self._preferred_kex.__contains__, kex_algo_list)
1769        else:
1770            agreed_kex = filter(kex_algo_list.__contains__, self._preferred_kex)
1771        if len(agreed_kex) == 0:
1772            raise SSHException('Incompatible ssh peer (no acceptable kex algorithm)')
1773        self.kex_engine = self._kex_info[agreed_kex[0]](self)
1774
1775        if self.server_mode:
1776            available_server_keys = filter(self.server_key_dict.keys().__contains__,
1777                                           self._preferred_keys)
1778            agreed_keys = filter(available_server_keys.__contains__, server_key_algo_list)
1779        else:
1780            agreed_keys = filter(server_key_algo_list.__contains__, self._preferred_keys)
1781        if len(agreed_keys) == 0:
1782            raise SSHException('Incompatible ssh peer (no acceptable host key)')
1783        self.host_key_type = agreed_keys[0]
1784        if self.server_mode and (self.get_server_key() is None):
1785            raise SSHException('Incompatible ssh peer (can\'t match requested host key type)')
1786
1787        if self.server_mode:
1788            agreed_local_ciphers = filter(self._preferred_ciphers.__contains__,
1789                                           server_encrypt_algo_list)
1790            agreed_remote_ciphers = filter(self._preferred_ciphers.__contains__,
1791                                          client_encrypt_algo_list)
1792        else:
1793            agreed_local_ciphers = filter(client_encrypt_algo_list.__contains__,
1794                                          self._preferred_ciphers)
1795            agreed_remote_ciphers = filter(server_encrypt_algo_list.__contains__,
1796                                           self._preferred_ciphers)
1797        if (len(agreed_local_ciphers) == 0) or (len(agreed_remote_ciphers) == 0):
1798            raise SSHException('Incompatible ssh server (no acceptable ciphers)')
1799        self.local_cipher = agreed_local_ciphers[0]
1800        self.remote_cipher = agreed_remote_ciphers[0]
1801        self._log(DEBUG, 'Ciphers agreed: local=%s, remote=%s' % (self.local_cipher, self.remote_cipher))
1802
1803        if self.server_mode:
1804            agreed_remote_macs = filter(self._preferred_macs.__contains__, client_mac_algo_list)
1805            agreed_local_macs = filter(self._preferred_macs.__contains__, server_mac_algo_list)
1806        else:
1807            agreed_local_macs = filter(client_mac_algo_list.__contains__, self._preferred_macs)
1808            agreed_remote_macs = filter(server_mac_algo_list.__contains__, self._preferred_macs)
1809        if (len(agreed_local_macs) == 0) or (len(agreed_remote_macs) == 0):
1810            raise SSHException('Incompatible ssh server (no acceptable macs)')
1811        self.local_mac = agreed_local_macs[0]
1812        self.remote_mac = agreed_remote_macs[0]
1813
1814        if self.server_mode:
1815            agreed_remote_compression = filter(self._preferred_compression.__contains__, client_compress_algo_list)
1816            agreed_local_compression = filter(self._preferred_compression.__contains__, server_compress_algo_list)
1817        else:
1818            agreed_local_compression = filter(client_compress_algo_list.__contains__, self._preferred_compression)
1819            agreed_remote_compression = filter(server_compress_algo_list.__contains__, self._preferred_compression)
1820        if (len(agreed_local_compression) == 0) or (len(agreed_remote_compression) == 0):
1821            raise SSHException('Incompatible ssh server (no acceptable compression) %r %r %r' % (agreed_local_compression, agreed_remote_compression, self._preferred_compression))
1822        self.local_compression = agreed_local_compression[0]
1823        self.remote_compression = agreed_remote_compression[0]
1824
1825        self._log(DEBUG, 'using kex %s; server key type %s; cipher: local %s, remote %s; mac: local %s, remote %s; compression: local %s, remote %s' %
1826                  (agreed_kex[0], self.host_key_type, self.local_cipher, self.remote_cipher, self.local_mac,
1827                   self.remote_mac, self.local_compression, self.remote_compression))
1828
1829        # save for computing hash later...
1830        # now wait!  openssh has a bug (and others might too) where there are
1831        # actually some extra bytes (one NUL byte in openssh's case) added to
1832        # the end of the packet but not parsed.  turns out we need to throw
1833        # away those bytes because they aren't part of the hash.
1834        self.remote_kex_init = chr(MSG_KEXINIT) + m.get_so_far()
1835
1836    def _activate_inbound(self):
1837        "switch on newly negotiated encryption parameters for inbound traffic"
1838        block_size = self._cipher_info[self.remote_cipher]['block-size']
1839        if self.server_mode:
1840            IV_in = self._compute_key('A', block_size)
1841            key_in = self._compute_key('C', self._cipher_info[self.remote_cipher]['key-size'])
1842        else:
1843            IV_in = self._compute_key('B', block_size)
1844            key_in = self._compute_key('D', self._cipher_info[self.remote_cipher]['key-size'])
1845        engine = self._get_cipher(self.remote_cipher, key_in, IV_in)
1846        mac_size = self._mac_info[self.remote_mac]['size']
1847        mac_engine = self._mac_info[self.remote_mac]['class']
1848        # initial mac keys are done in the hash's natural size (not the potentially truncated
1849        # transmission size)
1850        if self.server_mode:
1851            mac_key = self._compute_key('E', mac_engine.digest_size)
1852        else:
1853            mac_key = self._compute_key('F', mac_engine.digest_size)
1854        self.packetizer.set_inbound_cipher(engine, block_size, mac_engine, mac_size, mac_key)
1855        compress_in = self._compression_info[self.remote_compression][1]
1856        if (compress_in is not None) and ((self.remote_compression != 'zlib@openssh.com') or self.authenticated):
1857            self._log(DEBUG, 'Switching on inbound compression ...')
1858            self.packetizer.set_inbound_compressor(compress_in())
1859
1860    def _activate_outbound(self):
1861        "switch on newly negotiated encryption parameters for outbound traffic"
1862        m = Message()
1863        m.add_byte(chr(MSG_NEWKEYS))
1864        self._send_message(m)
1865        block_size = self._cipher_info[self.local_cipher]['block-size']
1866        if self.server_mode:
1867            IV_out = self._compute_key('B', block_size)
1868            key_out = self._compute_key('D', self._cipher_info[self.local_cipher]['key-size'])
1869        else:
1870            IV_out = self._compute_key('A', block_size)
1871            key_out = self._compute_key('C', self._cipher_info[self.local_cipher]['key-size'])
1872        engine = self._get_cipher(self.local_cipher, key_out, IV_out)
1873        mac_size = self._mac_info[self.local_mac]['size']
1874        mac_engine = self._mac_info[self.local_mac]['class']
1875        # initial mac keys are done in the hash's natural size (not the potentially truncated
1876        # transmission size)
1877        if self.server_mode:
1878            mac_key = self._compute_key('F', mac_engine.digest_size)
1879        else:
1880            mac_key = self._compute_key('E', mac_engine.digest_size)
1881        self.packetizer.set_outbound_cipher(engine, block_size, mac_engine, mac_size, mac_key)
1882        compress_out = self._compression_info[self.local_compression][0]
1883        if (compress_out is not None) and ((self.local_compression != 'zlib@openssh.com') or self.authenticated):
1884            self._log(DEBUG, 'Switching on outbound compression ...')
1885            self.packetizer.set_outbound_compressor(compress_out())
1886        if not self.packetizer.need_rekey():
1887            self.in_kex = False
1888        # we always expect to receive NEWKEYS now
1889        self._expect_packet(MSG_NEWKEYS)
1890
1891    def _auth_trigger(self):
1892        self.authenticated = True
1893        # delayed initiation of compression
1894        if self.local_compression == 'zlib@openssh.com':
1895            compress_out = self._compression_info[self.local_compression][0]
1896            self._log(DEBUG, 'Switching on outbound compression ...')
1897            self.packetizer.set_outbound_compressor(compress_out())
1898        if self.remote_compression == 'zlib@openssh.com':
1899            compress_in = self._compression_info[self.remote_compression][1]
1900            self._log(DEBUG, 'Switching on inbound compression ...')
1901            self.packetizer.set_inbound_compressor(compress_in())
1902
1903    def _parse_newkeys(self, m):
1904        self._log(DEBUG, 'Switch to new keys ...')
1905        self._activate_inbound()
1906        # can also free a bunch of stuff here
1907        self.local_kex_init = self.remote_kex_init = None
1908        self.K = None
1909        self.kex_engine = None
1910        if self.server_mode and (self.auth_handler is None):
1911            # create auth handler for server mode
1912            self.auth_handler = AuthHandler(self)
1913        if not self.initial_kex_done:
1914            # this was the first key exchange
1915            self.initial_kex_done = True
1916        # send an event?
1917        if self.completion_event != None:
1918            self.completion_event.set()
1919        # it's now okay to send data again (if this was a re-key)
1920        if not self.packetizer.need_rekey():
1921            self.in_kex = False
1922        self.clear_to_send_lock.acquire()
1923        try:
1924            self.clear_to_send.set()
1925        finally:
1926            self.clear_to_send_lock.release()
1927        return
1928
1929    def _parse_disconnect(self, m):
1930        code = m.get_int()
1931        desc = m.get_string()
1932        self._log(INFO, 'Disconnect (code %d): %s' % (code, desc))
1933
1934    def _parse_global_request(self, m):
1935        kind = m.get_string()
1936        self._log(DEBUG, 'Received global request "%s"' % kind)
1937        want_reply = m.get_boolean()
1938        if not self.server_mode:
1939            self._log(DEBUG, 'Rejecting "%s" global request from server.' % kind)
1940            ok = False
1941        elif kind == 'tcpip-forward':
1942            address = m.get_string()
1943            port = m.get_int()
1944            ok = self.server_object.check_port_forward_request(address, port)
1945            if ok != False:
1946                ok = (ok,)
1947        elif kind == 'cancel-tcpip-forward':
1948            address = m.get_string()
1949            port = m.get_int()
1950            self.server_object.cancel_port_forward_request(address, port)
1951            ok = True
1952        else:
1953            ok = self.server_object.check_global_request(kind, m)
1954        extra = ()
1955        if type(ok) is tuple:
1956            extra = ok
1957            ok = True
1958        if want_reply:
1959            msg = Message()
1960            if ok:
1961                msg.add_byte(chr(MSG_REQUEST_SUCCESS))
1962                msg.add(*extra)
1963            else:
1964                msg.add_byte(chr(MSG_REQUEST_FAILURE))
1965            self._send_message(msg)
1966
1967    def _parse_request_success(self, m):
1968        self._log(DEBUG, 'Global request successful.')
1969        self.global_response = m
1970        if self.completion_event is not None:
1971            self.completion_event.set()
1972
1973    def _parse_request_failure(self, m):
1974        self._log(DEBUG, 'Global request denied.')
1975        self.global_response = None
1976        if self.completion_event is not None:
1977            self.completion_event.set()
1978
1979    def _parse_channel_open_success(self, m):
1980        chanid = m.get_int()
1981        server_chanid = m.get_int()
1982        server_window_size = m.get_int()
1983        server_max_packet_size = m.get_int()
1984        chan = self._channels.get(chanid)
1985        if chan is None:
1986            self._log(WARNING, 'Success for unrequested channel! [??]')
1987            return
1988        self.lock.acquire()
1989        try:
1990            chan._set_remote_channel(server_chanid, server_window_size, server_max_packet_size)
1991            self._log(INFO, 'Secsh channel %d opened.' % chanid)
1992            if chanid in self.channel_events:
1993                self.channel_events[chanid].set()
1994                del self.channel_events[chanid]
1995        finally:
1996            self.lock.release()
1997        return
1998
1999    def _parse_channel_open_failure(self, m):
2000        chanid = m.get_int()
2001        reason = m.get_int()
2002        reason_str = m.get_string()
2003        lang = m.get_string()
2004        reason_text = CONNECTION_FAILED_CODE.get(reason, '(unknown code)')
2005        self._log(INFO, 'Secsh channel %d open FAILED: %s: %s' % (chanid, reason_str, reason_text))
2006        self.lock.acquire()
2007        try:
2008            self.saved_exception = ChannelException(reason, reason_text)
2009            if chanid in self.channel_events:
2010                self._channels.delete(chanid)
2011                if chanid in self.channel_events:
2012                    self.channel_events[chanid].set()
2013                    del self.channel_events[chanid]
2014        finally:
2015            self.lock.release()
2016        return
2017
2018    def _parse_channel_open(self, m):
2019        kind = m.get_string()
2020        chanid = m.get_int()
2021        initial_window_size = m.get_int()
2022        max_packet_size = m.get_int()
2023        reject = False
2024        if (kind == 'auth-agent@openssh.com') and (self._forward_agent_handler is not None):
2025            self._log(DEBUG, 'Incoming forward agent connection')
2026            self.lock.acquire()
2027            try:
2028                my_chanid = self._next_channel()
2029            finally:
2030                self.lock.release()
2031        elif (kind == 'x11') and (self._x11_handler is not None):
2032            origin_addr = m.get_string()
2033            origin_port = m.get_int()
2034            self._log(DEBUG, 'Incoming x11 connection from %s:%d' % (origin_addr, origin_port))
2035            self.lock.acquire()
2036            try:
2037                my_chanid = self._next_channel()
2038            finally:
2039                self.lock.release()
2040        elif (kind == 'forwarded-tcpip') and (self._tcp_handler is not None):
2041            server_addr = m.get_string()
2042            server_port = m.get_int()
2043            origin_addr = m.get_string()
2044            origin_port = m.get_int()
2045            self._log(DEBUG, 'Incoming tcp forwarded connection from %s:%d' % (origin_addr, origin_port))
2046            self.lock.acquire()
2047            try:
2048                my_chanid = self._next_channel()
2049            finally:
2050                self.lock.release()
2051        elif not self.server_mode:
2052            self._log(DEBUG, 'Rejecting "%s" channel request from server.' % kind)
2053            reject = True
2054            reason = OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
2055        else:
2056            self.lock.acquire()
2057            try:
2058                my_chanid = self._next_channel()
2059            finally:
2060                self.lock.release()
2061            if kind == 'direct-tcpip':
2062                # handle direct-tcpip requests comming from the client
2063                dest_addr = m.get_string()
2064                dest_port = m.get_int()
2065                origin_addr = m.get_string()
2066                origin_port = m.get_int()
2067                reason = self.server_object.check_channel_direct_tcpip_request(
2068                                my_chanid, (origin_addr, origin_port),
2069                                           (dest_addr, dest_port))
2070            else:
2071                reason = self.server_object.check_channel_request(kind, my_chanid)
2072            if reason != OPEN_SUCCEEDED:
2073                self._log(DEBUG, 'Rejecting "%s" channel request from client.' % kind)
2074                reject = True
2075        if reject:
2076            msg = Message()
2077            msg.add_byte(chr(MSG_CHANNEL_OPEN_FAILURE))
2078            msg.add_int(chanid)
2079            msg.add_int(reason)
2080            msg.add_string('')
2081            msg.add_string('en')
2082            self._send_message(msg)
2083            return
2084
2085        chan = Channel(my_chanid)
2086        self.lock.acquire()
2087        try:
2088            self._channels.put(my_chanid, chan)
2089            self.channels_seen[my_chanid] = True
2090            chan._set_transport(self)
2091            chan._set_window(self.window_size, self.max_packet_size)
2092            chan._set_remote_channel(chanid, initial_window_size, max_packet_size)
2093        finally:
2094            self.lock.release()
2095        m = Message()
2096        m.add_byte(chr(MSG_CHANNEL_OPEN_SUCCESS))
2097        m.add_int(chanid)
2098        m.add_int(my_chanid)
2099        m.add_int(self.window_size)
2100        m.add_int(self.max_packet_size)
2101        self._send_message(m)
2102        self._log(INFO, 'Secsh channel %d (%s) opened.', my_chanid, kind)
2103        if kind == 'auth-agent@openssh.com':
2104            self._forward_agent_handler(chan)
2105        elif kind == 'x11':
2106            self._x11_handler(chan, (origin_addr, origin_port))
2107        elif kind == 'forwarded-tcpip':
2108            chan.origin_addr = (origin_addr, origin_port)
2109            self._tcp_handler(chan, (origin_addr, origin_port), (server_addr, server_port))
2110        else:
2111            self._queue_incoming_channel(chan)
2112
2113    def _parse_debug(self, m):
2114        always_display = m.get_boolean()
2115        msg = m.get_string()
2116        lang = m.get_string()
2117        self._log(DEBUG, 'Debug msg: ' + util.safe_string(msg))
2118
2119    def _get_subsystem_handler(self, name):
2120        try:
2121            self.lock.acquire()
2122            if name not in self.subsystem_table:
2123                return (None, [], {})
2124            return self.subsystem_table[name]
2125        finally:
2126            self.lock.release()
2127
2128    _handler_table = {
2129        MSG_NEWKEYS: _parse_newkeys,
2130        MSG_GLOBAL_REQUEST: _parse_global_request,
2131        MSG_REQUEST_SUCCESS: _parse_request_success,
2132        MSG_REQUEST_FAILURE: _parse_request_failure,
2133        MSG_CHANNEL_OPEN_SUCCESS: _parse_channel_open_success,
2134        MSG_CHANNEL_OPEN_FAILURE: _parse_channel_open_failure,
2135        MSG_CHANNEL_OPEN: _parse_channel_open,
2136        MSG_KEXINIT: _negotiate_keys,
2137        }
2138
2139    _channel_handler_table = {
2140        MSG_CHANNEL_SUCCESS: Channel._request_success,
2141        MSG_CHANNEL_FAILURE: Channel._request_failed,
2142        MSG_CHANNEL_DATA: Channel._feed,
2143        MSG_CHANNEL_EXTENDED_DATA: Channel._feed_extended,
2144        MSG_CHANNEL_WINDOW_ADJUST: Channel._window_adjust,
2145        MSG_CHANNEL_REQUEST: Channel._handle_request,
2146        MSG_CHANNEL_EOF: Channel._handle_eof,
2147        MSG_CHANNEL_CLOSE: Channel._handle_close,
2148        }
2149