1# Copyright (c) 2016-2020 by Ron Frederick <ronf@timeheart.net> and others.
2#
3# This program and the accompanying materials are made available under
4# the terms of the Eclipse Public License v2.0 which accompanies this
5# distribution and is available at:
6#
7#     http://www.eclipse.org/legal/epl-2.0/
8#
9# This program may also be made available under the following secondary
10# licenses when the conditions for such availability set forth in the
11# Eclipse Public License v2.0 are satisfied:
12#
13#    GNU General Public License, Version 2.0, or any later versions of
14#    that license
15#
16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
17#
18# Contributors:
19#     Ron Frederick - initial implementation, API, and documentation
20
21"""Unit tests for AsyncSSH channel API"""
22
23import asyncio
24import os
25import tempfile
26from signal import SIGINT
27
28from unittest.mock import patch
29
30import asyncssh
31
32from asyncssh.constants import DEFAULT_LANG, MSG_USERAUTH_REQUEST
33from asyncssh.constants import MSG_CHANNEL_OPEN_CONFIRMATION
34from asyncssh.constants import MSG_CHANNEL_OPEN_FAILURE
35from asyncssh.constants import MSG_CHANNEL_WINDOW_ADJUST
36from asyncssh.constants import MSG_CHANNEL_DATA
37from asyncssh.constants import MSG_CHANNEL_EXTENDED_DATA
38from asyncssh.constants import MSG_CHANNEL_EOF, MSG_CHANNEL_CLOSE
39from asyncssh.constants import MSG_CHANNEL_SUCCESS
40from asyncssh.packet import Byte, String, UInt32
41from asyncssh.public_key import CERT_TYPE_USER
42from asyncssh.stream import SSHTCPStreamSession, SSHUNIXStreamSession
43
44from .server import Server, ServerTestCase
45from .util import asynctest, echo, make_certificate
46
47PTY_OP_PARTIAL = 158
48PTY_OP_NO_END = 159
49
50class _ClientChannel(asyncssh.SSHClientChannel):
51    """Patched SSH client channel for unit testing"""
52
53    def _send_request(self, request, *args, want_reply=False):
54        """Send a channel request"""
55
56        if request == b'env' and args[1] == String('invalid'):
57            args = args[:1] + (String(b'\xff'),)
58        elif request == b'pty-req':
59            if args[5][-6:-5] == Byte(PTY_OP_PARTIAL):
60                args = args[:5] + (String(args[5][4:-5]),)
61            elif args[5][-6:-5] == Byte(PTY_OP_NO_END):
62                args = args[:5] + (String(args[5][4:-6]),)
63
64        super()._send_request(request, *args, want_reply=want_reply)
65
66    def send_request(self, request, *args):
67        """Send a custom request (for unit testing)"""
68
69        self._send_request(request, *args)
70
71    async def make_request(self, request, *args):
72        """Make a custom request (for unit testing)"""
73
74        return await self._make_request(request, *args)
75
76
77class _ClientSession(asyncssh.SSHClientSession):
78    """Unit test SSH client session"""
79
80    def __init__(self):
81        self._chan = None
82
83        self.recv_buf = {None: [], asyncssh.EXTENDED_DATA_STDERR: []}
84        self.xon_xoff = None
85        self.exit_status = None
86        self.exit_signal_msg = None
87        self.exc = None
88
89    def connection_made(self, chan):
90        """Handle connection open"""
91
92        self._chan = chan
93
94    def connection_lost(self, exc):
95        """Handle connection close"""
96
97        self.exc = exc
98        self._chan = None
99
100    def data_received(self, data, datatype):
101        """Handle data from the channel"""
102
103        self.recv_buf[datatype].append(data)
104
105    def xon_xoff_requested(self, client_can_do):
106        """Handle request to enable/disable XON/XOFF flow control"""
107
108        self.xon_xoff = client_can_do
109
110    def exit_status_received(self, status):
111        """Handle remote exit status"""
112
113        # pylint: disable=unused-argument
114
115        self.exit_status = status
116
117    def exit_signal_received(self, signal, core_dumped, msg, lang):
118        """Handle remote exit signal"""
119
120        # pylint: disable=unused-argument
121
122        self.exit_signal_msg = msg
123
124
125async def _create_session(conn, command=None, *, subsystem=None, **kwargs):
126    """Create a client session"""
127
128    return await conn.create_session(_ClientSession, command,
129                                     subsystem=subsystem, **kwargs)
130
131
132class _ServerChannel(asyncssh.SSHServerChannel):
133    """Patched SSH server channel class for unit testing"""
134
135    def _send_request(self, request, *args, want_reply=False):
136        """Send a channel request"""
137
138        if request == b'exit-signal':
139            if args[0] == String('invalid'):
140                args = (String(b'\xff'),) + args[1:]
141
142            if args[3] == String('invalid'):
143                args = args[:3] + (String(b'\xff'),)
144
145        super()._send_request(request, *args, want_reply=want_reply)
146
147    def _process_delayed_request(self, packet):
148        """Process a request that delays before responding"""
149
150        packet.check_end()
151
152        asyncio.get_event_loop().call_later(0.1, self._report_response, True)
153
154    async def open_session(self):
155        """Attempt to open a session on the client"""
156
157        return await self._open(b'session')
158
159
160class _EchoServerSession(asyncssh.SSHServerSession):
161    """A shell session which echos data from stdin to stdout/stderr"""
162
163    def __init__(self):
164        self._chan = None
165
166    def connection_made(self, chan):
167        """Handle session open"""
168
169        self._chan = chan
170
171        username = self._chan.get_extra_info('username')
172
173        if username == 'close':
174            self._chan.close()
175        elif username == 'task_error':
176            raise RuntimeError('Exception handler test')
177
178    def shell_requested(self):
179        """Handle shell request"""
180
181        return True
182
183    def data_received(self, data, datatype):
184        """Handle data from the channel"""
185
186        self._chan.write(data[:1])
187        self._chan.writelines([data[1:]])
188        self._chan.write_stderr(data[:1])
189        self._chan.writelines_stderr([data[1:]])
190
191    def eof_received(self):
192        """Handle EOF on the channel"""
193
194        self._chan.write_eof()
195        self._chan.close()
196
197
198class _PTYServerSession(asyncssh.SSHServerSession):
199    """Server for testing PTY requests"""
200
201    def __init__(self):
202        self._chan = None
203        self._pty_ok = True
204
205    def connection_made(self, chan):
206        """Handle session open"""
207
208        self._chan = chan
209
210        username = self._chan.get_extra_info('username')
211
212        if username == 'no_pty':
213            self._pty_ok = False
214
215    def pty_requested(self, term_type, term_size, term_modes):
216        """Handle pseudo-terminal request"""
217
218        self._chan.set_extra_info(
219            pty_args=(term_type, term_size,
220                      term_modes.get(asyncssh.PTY_OP_OSPEED)))
221
222        return self._pty_ok
223
224    def shell_requested(self):
225        """Handle shell request"""
226
227        return True
228
229    def session_started(self):
230        """Handle session start"""
231
232        chan = self._chan
233
234
235        chan.write(f'Req: {chan.get_extra_info("pty_args")}\n')
236        chan.close()
237
238
239class _ChannelServer(Server):
240    """Server for testing the AsyncSSH channel API"""
241
242    async def _begin_session(self, stdin, stdout, stderr):
243        """Begin processing a new session"""
244
245        # pylint: disable=too-many-statements
246
247        action = stdin.channel.get_command() or stdin.channel.get_subsystem()
248        if not action:
249            action = 'echo'
250
251        if action == 'echo':
252            await echo(stdin, stdout, stderr)
253        elif action == 'conn_close':
254            await stdin.read(1)
255            stdout.write('\n')
256            self._conn.close()
257        elif action == 'close':
258            await stdin.read(1)
259            stdout.write('\n')
260        elif action == 'agent':
261            try:
262                async with asyncssh.connect_agent(self._conn) as agent:
263                    stdout.write(str(len((await agent.get_keys()))) + '\n')
264            except (OSError, asyncssh.ChannelOpenError):
265                stdout.channel.exit(1)
266        elif action == 'agent_sock':
267            agent_path = stdin.channel.get_agent_path()
268
269            if agent_path:
270                async with asyncssh.connect_agent(agent_path) as agent:
271                    await asyncio.sleep(0.1)
272                    stdout.write(str(len((await agent.get_keys()))) + '\n')
273            else:
274                stdout.channel.exit(1)
275        elif action == 'rejected_agent':
276            agent_path = stdin.channel.get_agent_path()
277            stdout.write(str(bool(agent_path)) + '\n')
278
279            chan = self._conn.create_agent_channel()
280
281            try:
282                await chan.open(SSHUNIXStreamSession)
283            except asyncssh.ChannelOpenError:
284                stdout.channel.exit(1)
285        elif action == 'rejected_session':
286            chan = _ServerChannel(self._conn, asyncio.get_event_loop(), False,
287                                  False, 0, 1024, None, 'strict', 1, 32768)
288
289            try:
290                await chan.open_session()
291            except asyncssh.ChannelOpenError:
292                stdout.channel.exit(1)
293        elif action == 'rejected_tcpip_direct':
294            chan = self._conn.create_tcp_channel()
295
296            try:
297                await chan.connect(SSHTCPStreamSession, '', 0, '', 0)
298            except asyncssh.ChannelOpenError:
299                stdout.channel.exit(1)
300        elif action == 'unknown_tcpip_listener':
301            chan = self._conn.create_tcp_channel()
302
303            try:
304                await chan.accept(SSHTCPStreamSession, 'xxx', 0, '', 0)
305            except asyncssh.ChannelOpenError:
306                stdout.channel.exit(1)
307        elif action == 'invalid_tcpip_listener':
308            chan = self._conn.create_tcp_channel()
309
310            try:
311                await chan.accept(SSHTCPStreamSession, b'\xff', 0, '', 0)
312            except asyncssh.ChannelOpenError:
313                stdout.channel.exit(1)
314        elif action == 'rejected_unix_direct':
315            chan = self._conn.create_unix_channel()
316
317            try:
318                await chan.connect(SSHUNIXStreamSession, '')
319            except asyncssh.ChannelOpenError:
320                stdout.channel.exit(1)
321        elif action == 'unknown_unix_listener':
322            chan = self._conn.create_unix_channel()
323
324            try:
325                await chan.accept(SSHUNIXStreamSession, 'xxx')
326            except asyncssh.ChannelOpenError:
327                stdout.channel.exit(1)
328        elif action == 'invalid_unix_listener':
329            chan = self._conn.create_unix_channel()
330
331            try:
332                await chan.accept(SSHUNIXStreamSession, b'\xff')
333            except asyncssh.ChannelOpenError:
334                stdout.channel.exit(1)
335        elif action == 'late_auth_banner':
336            try:
337                self._conn.send_auth_banner('auth banner')
338            except OSError:
339                stdin.channel.exit(1)
340        elif action == 'invalid_open_confirm':
341            stdin.channel.send_packet(MSG_CHANNEL_OPEN_CONFIRMATION,
342                                      UInt32(0), UInt32(0), UInt32(0))
343        elif action == 'invalid_open_failure':
344            stdin.channel.send_packet(MSG_CHANNEL_OPEN_FAILURE,
345                                      UInt32(0), String(''), String(''))
346        elif action == 'env':
347            value = stdin.channel.get_environment().get('TEST', '')
348            stdout.write(value + '\n')
349        elif action == 'term':
350            chan = stdin.channel
351            info = str((chan.get_terminal_type(), chan.get_terminal_size(),
352                        chan.get_terminal_mode(asyncssh.PTY_OP_OSPEED)))
353            stdout.write(info + '\n')
354        elif action == 'xon_xoff':
355            stdin.channel.set_xon_xoff(True)
356        elif action == 'no_xon_xoff':
357            stdin.channel.set_xon_xoff(False)
358        elif action == 'signals':
359            try:
360                await stdin.readline()
361            except asyncssh.BreakReceived as exc:
362                stdin.channel.exit_with_signal('ABRT', False, str(exc.msec))
363            except asyncssh.SignalReceived as exc:
364                stdin.channel.exit_with_signal('ABRT', False, exc.signal)
365            except asyncssh.TerminalSizeChanged as exc:
366                size = (exc.width, exc.height, exc.pixwidth, exc.pixheight)
367                stdin.channel.exit_with_signal('ABRT', False, str(size))
368        elif action == 'exit_status':
369            stdin.channel.exit(1)
370        elif action == 'closed_status':
371            stdin.channel.close()
372            stdin.channel.exit(1)
373        elif action == 'exit_signal':
374            stdin.channel.exit_with_signal('INT', False, 'exit_signal')
375        elif action == 'unknown_signal':
376            stdin.channel.exit_with_signal('unknown', False, 'unknown_signal')
377        elif action == 'closed_signal':
378            stdin.channel.close()
379            stdin.channel.exit_with_signal('INT', False, 'closed_signal')
380        elif action == 'invalid_exit_signal':
381            stdin.channel.exit_with_signal('invalid')
382        elif action == 'invalid_exit_lang':
383            stdin.channel.exit_with_signal('INT', False, '', 'invalid')
384        elif action == 'window_after_close':
385            stdin.channel.send_packet(MSG_CHANNEL_CLOSE)
386            stdin.channel.send_packet(MSG_CHANNEL_WINDOW_ADJUST, UInt32(0))
387        elif action == 'empty_data':
388            stdin.channel.send_packet(MSG_CHANNEL_DATA, String(''))
389        elif action == 'partial_unicode':
390            data = '\xff\xff'.encode('utf-8')
391            stdin.channel.send_packet(MSG_CHANNEL_DATA, String(data[:3]))
392            stdin.channel.send_packet(MSG_CHANNEL_DATA, String(data[3:]))
393        elif action == 'partial_unicode_at_eof':
394            data = '\xff\xff'.encode('utf-8')
395            stdin.channel.send_packet(MSG_CHANNEL_DATA, String(data[:3]))
396        elif action == 'unicode_error':
397            stdin.channel.send_packet(MSG_CHANNEL_DATA, String(b'\xff'))
398        elif action == 'data_past_window':
399            stdin.channel.send_packet(MSG_CHANNEL_DATA,
400                                      String(2*1025*1024*'\0'))
401        elif action == 'data_after_eof':
402            stdin.channel.send_packet(MSG_CHANNEL_EOF)
403            stdout.write('xxx')
404        elif action == 'data_after_close':
405            await asyncio.sleep(0.1)
406            stdout.write('xxx')
407        elif action == 'ext_data_after_eof':
408            stdin.channel.send_packet(MSG_CHANNEL_EOF)
409            stdin.channel.write_stderr('xxx')
410        elif action == 'invalid_datatype':
411            stdin.channel.send_packet(MSG_CHANNEL_EXTENDED_DATA,
412                                      UInt32(255), String(''))
413        elif action == 'double_eof':
414            await asyncio.sleep(0.1)
415            stdin.channel.send_packet(MSG_CHANNEL_EOF)
416            stdin.channel.write_eof()
417        elif action == 'double_close':
418            await asyncio.sleep(0.1)
419            stdout.write('xxx')
420            stdin.channel.send_packet(MSG_CHANNEL_CLOSE)
421        elif action == 'request_after_close':
422            stdin.channel.send_packet(MSG_CHANNEL_CLOSE)
423            stdin.channel.exit(1)
424        elif action == 'unexpected_auth':
425            self._conn.send_packet(MSG_USERAUTH_REQUEST, String('guest'),
426                                   String('ssh-connection'), String('none'))
427        elif action == 'invalid_response':
428            stdin.channel.send_packet(MSG_CHANNEL_SUCCESS)
429        else:
430            stdin.channel.exit(255)
431
432        stdin.channel.close()
433        await stdin.channel.wait_closed()
434
435    async def _conn_close(self):
436        """Close the connection during a channel open"""
437
438        self._conn.close()
439        await asyncio.sleep(0.1)
440        return _EchoServerSession()
441
442    def begin_auth(self, username):
443        """Handle client authentication request"""
444
445        return username not in {'guest', 'conn_close_startup',
446                                'conn_close_open', 'close', 'echo',
447                                'no_channels', 'no_pty', 'request_pty',
448                                'task_error'}
449
450    def session_requested(self):
451        """Handle a request to create a new session"""
452
453        username = self._conn.get_extra_info('username')
454
455        with patch('asyncssh.connection.SSHServerChannel', _ServerChannel):
456            channel = self._conn.create_server_channel()
457
458            if username == 'conn_close_startup':
459                self._conn.close()
460                return False
461            elif username == 'conn_close_open':
462                return (channel, self._conn_close())
463            elif username in {'close', 'echo', 'task_error'}:
464                return (channel, _EchoServerSession())
465            elif username in {'request_pty', 'no_pty'}:
466                return (channel, _PTYServerSession())
467            elif username != 'no_channels':
468                return (channel, self._begin_session)
469            else:
470                return False
471
472
473class _TestChannel(ServerTestCase):
474    """Unit tests for AsyncSSH channel API"""
475
476    # pylint: disable=too-many-public-methods
477
478    @classmethod
479    async def start_server(cls):
480        """Start an SSH server for the tests to use"""
481
482        return (await cls.create_server(
483            _ChannelServer, authorized_client_keys='authorized_keys'))
484
485    async def _check_action(self, command, expected_result):
486        """Run a command on a remote session and check for a specific result"""
487
488        async with self.connect() as conn:
489            chan, session = await _create_session(conn, command)
490
491            await chan.wait_closed()
492
493            self.assertEqual(session.exit_status, expected_result)
494
495    async def _check_session(self, conn, command=(), *,
496                             large_block=False, **kwargs):
497        """Open a session and test if an input line is echoed back"""
498
499        chan, session = await _create_session(conn, command, **kwargs)
500
501        if large_block:
502            data = 4 * [1025*1024*'\0']
503        else:
504            data = [str(id(self))]
505
506        chan.writelines(data)
507
508        self.assertTrue(chan.can_write_eof())
509        self.assertFalse(chan.is_closing())
510        chan.write_eof()
511        self.assertTrue(chan.is_closing())
512
513        await chan.wait_closed()
514
515        data = ''.join(data)
516
517        for buf in session.recv_buf.values():
518            self.assertEqual(data, ''.join(buf))
519
520        chan.close()
521
522    @asynctest
523    async def test_shell(self):
524        """Test starting a shell"""
525
526        async with self.connect(username='echo') as conn:
527            await self._check_session(conn)
528
529    @asynctest
530    async def test_shell_failure(self):
531        """Test failure to start a shell"""
532
533        async with self.connect(username='no_channels') as conn:
534            with self.assertRaises(asyncssh.ChannelOpenError):
535                await _create_session(conn)
536
537    @asynctest
538    async def test_shell_internal_error(self):
539        """Test internal error in callback to start a shell"""
540
541        async with self.connect(username='task_error') as conn:
542            with self.assertRaises((OSError, asyncssh.ConnectionLost)):
543                await _create_session(conn)
544
545    @asynctest
546    async def test_shell_large_block(self):
547        """Test starting a shell and sending a large block of data"""
548
549        async with self.connect(username='echo') as conn:
550            await self._check_session(conn, large_block=True)
551
552    @asynctest
553    async def test_exec(self):
554        """Test execution of a remote command"""
555
556        async with self.connect() as conn:
557            await self._check_session(conn, 'echo', window=1024*1024,
558                                      max_pktsize=16384)
559
560    @asynctest
561    async def test_exec_from_connect(self):
562        """Test execution of a remote command set on connection"""
563
564        async with self.connect(command='echo') as conn:
565            await self._check_session(conn)
566
567    @asynctest
568    async def test_forced_exec(self):
569        """Test execution of a forced remote command"""
570
571        ckey = asyncssh.read_private_key('ckey')
572        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
573                                CERT_TYPE_USER, ckey, ckey, ['ckey'],
574                                options={'force-command': String('echo')})
575
576        async with self.connect(username='ckey', client_keys=[(ckey, cert)],
577                                agent_path=None) as conn:
578            await self._check_session(conn)
579
580    @asynctest
581    async def test_invalid_exec(self):
582        """Test execution of an invalid remote command"""
583
584        async with self.connect() as conn:
585            with self.assertRaises(asyncssh.ChannelOpenError):
586                await _create_session(conn, b'\xff')
587
588    @asynctest
589    async def test_exec_failure(self):
590        """Test failure to execute a remote command"""
591
592        async with self.connect(username='no_channels') as conn:
593            with self.assertRaises(asyncssh.ChannelOpenError):
594                await _create_session(conn, 'echo')
595
596    @asynctest
597    async def test_subsystem(self):
598        """Test starting a subsystem"""
599
600        async with self.connect() as conn:
601            await self._check_session(conn, subsystem='echo')
602
603    @asynctest
604    async def test_invalid_subsystem(self):
605        """Test starting an invalid subsystem"""
606
607        async with self.connect() as conn:
608            with self.assertRaises(asyncssh.ChannelOpenError):
609                await _create_session(conn, subsystem=b'\xff')
610
611    @asynctest
612    async def test_subsystem_failure(self):
613        """Test failure to start a subsystem"""
614
615        async with self.connect(username='no_channels') as conn:
616            with self.assertRaises(asyncssh.ChannelOpenError):
617                await _create_session(conn, subsystem='echo')
618
619    @asynctest
620    async def test_conn_close_during_startup(self):
621        """Test connection close during channel startup"""
622
623        async with self.connect(username='conn_close_startup') as conn:
624            with self.assertRaises(asyncssh.ChannelOpenError):
625                await _create_session(conn)
626
627    @asynctest
628    async def test_conn_close_during_open(self):
629        """Test connection close during channel open"""
630
631        async with self.connect(username='conn_close_open') as conn:
632            with self.assertRaises(asyncssh.ChannelOpenError):
633                await _create_session(conn)
634
635    @asynctest
636    async def test_close_during_startup(self):
637        """Test channel close during startup"""
638
639        async with self.connect(username='close') as conn:
640            with self.assertRaises(asyncssh.ChannelOpenError):
641                await _create_session(conn)
642
643    @asynctest
644    async def test_inbound_conn_close_while_read_paused(self):
645        """Test inbound connection close while reading is paused"""
646
647        async with self.connect() as conn:
648            chan, _ = await _create_session(conn, 'conn_close')
649
650            chan.pause_reading()
651            chan.write('\n')
652            await asyncio.sleep(0.1)
653            conn.close()
654
655            await chan.wait_closed()
656
657    @asynctest
658    async def test_outbound_conn_close_while_read_paused(self):
659        """Test outbound connection close while reading is paused"""
660
661        async with self.connect() as conn:
662            chan, _ = await _create_session(conn, 'close')
663
664            chan.pause_reading()
665            chan.write('\n')
666            await asyncio.sleep(0.1)
667            conn.close()
668
669            await chan.wait_closed()
670
671    @asynctest
672    async def test_close_while_read_paused(self):
673        """Test closing a remotely closed channel while reading is paused"""
674
675        async with self.connect() as conn:
676            chan, _ = await _create_session(conn, 'close')
677
678            chan.pause_reading()
679            chan.write('\n')
680            await asyncio.sleep(0.1)
681            chan.close()
682
683            await chan.wait_closed()
684
685    @asynctest
686    async def test_keepalive(self):
687        """Test keepalive channel requests"""
688
689        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
690            async with self.connect() as conn:
691                chan, _ = await _create_session(conn)
692
693                result = await chan.make_request(b'keepalive@openssh.com')
694                self.assertTrue(result)
695
696    @asynctest
697    async def test_invalid_open_confirmation(self):
698        """Test receiving an open confirmation on already open channel"""
699
700        async with self.connect() as conn:
701            chan, _ = await _create_session(conn, 'invalid_open_confirm')
702
703            await chan.wait_closed()
704
705    @asynctest
706    async def test_invalid_open_failure(self):
707        """Test receiving an open failure on already open channel"""
708
709        async with self.connect() as conn:
710            chan, _ = await _create_session(conn, 'invalid_open_failure')
711
712            await chan.wait_closed()
713
714    @asynctest
715    async def test_unknown_channel_request(self):
716        """Test sending unknown channel request"""
717
718        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
719            async with self.connect() as conn:
720                chan, _ = await _create_session(conn)
721
722                self.assertFalse((await chan.make_request('unknown')))
723
724    @asynctest
725    async def test_invalid_channel_request(self):
726        """Test sending non-ASCII channel request"""
727
728        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
729            async with self.connect() as conn:
730                chan, _ = await _create_session(conn)
731
732                with self.assertRaises(asyncssh.ProtocolError):
733                    await chan.make_request('\xff')
734
735    @asynctest
736    async def test_delayed_channel_request(self):
737        """Test queuing channel requests with delayed response"""
738
739        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
740            async with self.connect() as conn:
741                chan, _ = await _create_session(conn)
742
743                chan.send_request(b'delayed')
744                chan.send_request(b'delayed')
745
746    @asynctest
747    async def test_invalid_channel_response(self):
748        """Test receiving response for non-existent channel request"""
749
750        async with self.connect() as conn:
751            chan, _ = await _create_session(conn, 'invalid_response')
752
753            chan.close()
754
755    @asynctest
756    async def test_already_open(self):
757        """Test connect on an already open channel"""
758
759        async with self.connect() as conn:
760            chan, _ = await _create_session(conn)
761
762            with self.assertRaises(OSError):
763                await chan.create(None, None, None, {}, False, None, None,
764                                  None, False, None, None, False, False)
765
766            chan.close()
767
768    @asynctest
769    async def test_write_buffer(self):
770        """Test setting write buffer limits"""
771
772        async with self.connect() as conn:
773            chan, _ = await _create_session(conn)
774
775            chan.set_write_buffer_limits()
776            chan.set_write_buffer_limits(low=8192)
777            chan.set_write_buffer_limits(high=32768)
778            chan.set_write_buffer_limits(32768, 8192)
779
780            with self.assertRaises(ValueError):
781                chan.set_write_buffer_limits(8192, 32768)
782
783            self.assertEqual(chan.get_write_buffer_size(), 0)
784
785            chan.close()
786
787    @asynctest
788    async def test_empty_write(self):
789        """Test writing an empty block of data"""
790
791        async with self.connect() as conn:
792            chan, _ = await _create_session(conn)
793            chan.write('')
794            chan.close()
795
796    @asynctest
797    async def test_invalid_write_extended(self):
798        """Test writing using an invalid extended data type"""
799
800        async with self.connect() as conn:
801            chan, _ = await _create_session(conn)
802
803            with self.assertRaises(OSError):
804                chan.write('test', -1)
805
806    @asynctest
807    async def test_unneeded_resume_reading(self):
808        """Test resume reading when not paused"""
809
810        async with self.connect() as conn:
811            chan, _ = await _create_session(conn)
812            await asyncio.sleep(0.1)
813            chan.resume_reading()
814            chan.close()
815
816    @asynctest
817    async def test_agent_forwarding(self):
818        """Test SSH agent forwarding"""
819
820        if not self.agent_available(): # pragma: no cover
821            self.skipTest('ssh-agent not available')
822
823        async with self.connect(username='ckey',
824                                agent_forwarding=True) as conn:
825            chan, session = await _create_session(conn, 'agent')
826
827            await chan.wait_closed()
828
829            result = ''.join(session.recv_buf[None])
830            self.assertEqual(result, '3\n')
831
832            chan, session = await _create_session(conn, 'agent')
833
834            await chan.wait_closed()
835
836            result = ''.join(session.recv_buf[None])
837            self.assertEqual(result, '3\n')
838
839    @asynctest
840    async def test_agent_forwarding_sock(self):
841        """Test SSH agent forwarding via UNIX domain socket"""
842
843        if not self.agent_available(): # pragma: no cover
844            self.skipTest('ssh-agent not available')
845
846        async with self.connect(username='ckey',
847                                agent_forwarding=True) as conn:
848            chan, session = await _create_session(conn, 'agent_sock')
849
850            await chan.wait_closed()
851
852            result = ''.join(session.recv_buf[None])
853            self.assertEqual(result, '3\n')
854
855    @asynctest
856    async def test_rejected_session(self):
857        """Test receiving inbound session request"""
858
859        await self._check_action('rejected_session', 1)
860
861    @asynctest
862    async def test_rejected_tcpip_direct(self):
863        """Test receiving inbound direct TCP/IP connection"""
864
865        await self._check_action('rejected_tcpip_direct', 1)
866
867    @asynctest
868    async def test_unknown_tcpip_listener(self):
869        """Test receiving connection on unknown TCP/IP listener"""
870
871        await self._check_action('unknown_tcpip_listener', 1)
872
873    @asynctest
874    async def test_invalid_tcpip_listener(self):
875        """Test receiving connection on invalid TCP/IP listener path"""
876
877        await self._check_action('invalid_tcpip_listener', None)
878
879    @asynctest
880    async def test_rejected_unix_direct(self):
881        """Test receiving inbound direct UNIX connection"""
882
883        await self._check_action('rejected_unix_direct', 1)
884
885    @asynctest
886    async def test_unknown_unix_listener(self):
887        """Test receiving connection on unknown UNIX listener"""
888
889        await self._check_action('unknown_unix_listener', 1)
890
891    @asynctest
892    async def test_invalid_unix_listener(self):
893        """Test receiving connection on invalid UNIX listener path"""
894
895        await self._check_action('invalid_unix_listener', None)
896
897    @asynctest
898    async def test_agent_forwarding_failure(self):
899        """Test failure of SSH agent forwarding"""
900
901        ckey = asyncssh.read_private_key('ckey')
902        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
903                                CERT_TYPE_USER, ckey, ckey, ['ckey'],
904                                extensions={'no-agent-forwarding': ''})
905
906        async with self.connect(username='ckey', client_keys=[(ckey, cert)],
907                                agent_path=None, agent_forwarding=True) as conn:
908            chan, session = await _create_session(conn, 'agent')
909
910            await chan.wait_closed()
911
912            self.assertEqual(session.exit_status, 1)
913
914    @asynctest
915    async def test_agent_forwarding_sock_failure(self):
916        """Test failure to create SSH agent forwarding socket"""
917
918        old_tempdir = tempfile.tempdir
919
920        try:
921            tempfile.tempdir = 'xxx'
922
923            async with self.connect(username='ckey',
924                                    agent_forwarding=True) as conn:
925                chan, session = await _create_session(conn, 'agent_sock')
926
927                await chan.wait_closed()
928
929                self.assertEqual(session.exit_status, 1)
930        finally:
931            tempfile.tempdir = old_tempdir
932
933    @asynctest
934    async def test_agent_forwarding_not_offered(self):
935        """Test SSH agent forwarding not offered by client"""
936
937        async with self.connect() as conn:
938            chan, session = await _create_session(conn, 'agent')
939
940            await chan.wait_closed()
941
942            self.assertEqual(session.exit_status, 1)
943
944    @asynctest
945    async def test_agent_forwarding_rejected(self):
946        """Test rejection of SSH agent forwarding by client"""
947
948        async with self.connect() as conn:
949            chan, session = await _create_session(conn, 'rejected_agent')
950
951            await chan.wait_closed()
952
953            result = ''.join(session.recv_buf[None])
954            self.assertEqual(result, 'False\n')
955
956            self.assertEqual(session.exit_status, 1)
957
958    @asynctest
959    async def test_request_pty(self):
960        """Test reuquesting a PTY with terminal information"""
961
962        modes = {asyncssh.PTY_OP_OSPEED: 9600}
963
964        async with self.connect(username='request_pty') as conn:
965            chan, session = await _create_session(conn, term_type='ansi',
966                                                  term_size=(80, 24),
967                                                  term_modes=modes)
968
969            await chan.wait_closed()
970
971            result = ''.join(session.recv_buf[None])
972            self.assertEqual(result, "Req: ('ansi', (80, 24, 0, 0), 9600)\r\n")
973
974    @asynctest
975    async def test_terminal_full_size(self):
976        """Test sending terminal information with full size"""
977
978        modes = {asyncssh.PTY_OP_OSPEED: 9600}
979
980        async with self.connect() as conn:
981            chan, session = await _create_session(conn, 'term',
982                                                  term_type='ansi',
983                                                  term_size=(80, 24, 480, 240),
984                                                  term_modes=modes)
985
986            await chan.wait_closed()
987
988            result = ''.join(session.recv_buf[None])
989            self.assertEqual(result, "('ansi', (80, 24, 480, 240), 9600)\r\n")
990
991    @asynctest
992    async def test_pty_without_term_type(self):
993        """Test requesting a PTY without setting the terminal type"""
994
995        async with self.connect() as conn:
996            chan, session = await _create_session(conn, 'term',
997                                                  request_pty='force')
998
999            await chan.wait_closed()
1000
1001            result = ''.join(session.recv_buf[None])
1002            self.assertEqual(result, "('', (0, 0, 0, 0), None)\n")
1003
1004    @asynctest
1005    async def test_invalid_terminal_size(self):
1006        """Test sending invalid terminal size"""
1007
1008        async with self.connect() as conn:
1009            with self.assertRaises(ValueError):
1010                await _create_session(conn, 'term', term_type='ansi',
1011                                      term_size=(0, 0, 0))
1012
1013    @asynctest
1014    async def test_invalid_terminal_modes(self):
1015        """Test sending invalid terminal modes"""
1016
1017        modes = {asyncssh.PTY_OP_RESERVED: 0}
1018
1019        async with self.connect() as conn:
1020            with self.assertRaises(ValueError):
1021                await _create_session(conn, 'term', term_type='ansi',
1022                                      term_modes=modes)
1023
1024    @asynctest
1025    async def test_pty_disallowed_by_cert(self):
1026        """Test rejection of pty request by certificate"""
1027
1028        ckey = asyncssh.read_private_key('ckey')
1029        cert = make_certificate('ssh-rsa-cert-v01@openssh.com',
1030                                CERT_TYPE_USER, ckey, ckey, ['ckey'],
1031                                extensions={'no-pty': ''})
1032
1033        async with self.connect(username='ckey', client_keys=[(ckey, cert)],
1034                                agent_path=None) as conn:
1035            with self.assertRaises(asyncssh.ChannelOpenError):
1036                await _create_session(conn, 'term', term_type='ansi')
1037
1038    @asynctest
1039    async def test_pty_disallowed_by_session(self):
1040        """Test rejection of pty request by session"""
1041
1042        async with self.connect(username='no_pty') as conn:
1043            with self.assertRaises(asyncssh.ChannelOpenError):
1044                await _create_session(conn, term_type='ansi')
1045
1046    @asynctest
1047    async def test_invalid_term_type(self):
1048        """Test requesting an invalid terminal type"""
1049
1050        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
1051            async with self.connect() as conn:
1052                with self.assertRaises(asyncssh.ProtocolError):
1053                    await _create_session(conn, term_type=b'\xff')
1054
1055    @asynctest
1056    async def test_term_modes_missing_end(self):
1057        """Test sending terminal modes without PTY_OP_END"""
1058
1059        modes = {asyncssh.PTY_OP_OSPEED: 9600, PTY_OP_NO_END: 0}
1060
1061        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
1062            async with self.connect() as conn:
1063                chan, session = await _create_session(conn, 'term',
1064                                                      term_type='ansi',
1065                                                      term_modes=modes)
1066
1067                await chan.wait_closed()
1068
1069                result = ''.join(session.recv_buf[None])
1070                self.assertEqual(result, "('ansi', (0, 0, 0, 0), 9600)\r\n")
1071
1072    @asynctest
1073    async def test_term_modes_incomplete(self):
1074        """Test sending terminal modes with incomplete value"""
1075
1076        modes = {asyncssh.PTY_OP_OSPEED: 9600, PTY_OP_PARTIAL: 0}
1077
1078        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
1079            async with self.connect() as conn:
1080                with self.assertRaises(asyncssh.ProtocolError):
1081                    await _create_session(conn, 'term', term_type='ansi',
1082                                          term_modes=modes)
1083
1084    @asynctest
1085    async def test_env(self):
1086        """Test setting environment"""
1087
1088        async with self.connect() as conn:
1089            chan, session = await _create_session(conn, 'env',
1090                                                  env={'TEST': 'test'})
1091
1092            await chan.wait_closed()
1093
1094            result = ''.join(session.recv_buf[None])
1095            self.assertEqual(result, 'test\n')
1096
1097    @asynctest
1098    async def test_env_from_connect(self):
1099        """Test setting environment on connection"""
1100
1101        async with self.connect(env={'TEST': 'test'}) as conn:
1102            chan, session = await _create_session(conn, 'env')
1103
1104            await chan.wait_closed()
1105
1106            result = ''.join(session.recv_buf[None])
1107            self.assertEqual(result, 'test\n')
1108
1109    @asynctest
1110    async def test_env_list(self):
1111        """Test setting environment using a list of name=value strings"""
1112
1113        async with self.connect() as conn:
1114            chan, session = await _create_session(conn, 'env',
1115                                                  env=['TEST=test'])
1116
1117            await chan.wait_closed()
1118
1119            result = ''.join(session.recv_buf[None])
1120            self.assertEqual(result, 'test\n')
1121
1122    @asynctest
1123    async def test_invalid_env_list(self):
1124        """Test setting environment using an invalid string"""
1125
1126        with self.assertRaises(ValueError):
1127            async with self.connect() as conn:
1128                await _create_session(conn, 'env', env=['XXX'])
1129
1130    @asynctest
1131    async def test_send_env(self):
1132        """Test sending local environment"""
1133
1134        async with self.connect() as conn:
1135            try:
1136                os.environ['TEST'] = 'test'
1137                chan, session = await _create_session(conn, 'env',
1138                                                      send_env=['TEST'])
1139            finally:
1140                del os.environ['TEST']
1141
1142            await chan.wait_closed()
1143
1144            result = ''.join(session.recv_buf[None])
1145            self.assertEqual(result, 'test\n')
1146
1147    @asynctest
1148    async def test_send_env_from_connect(self):
1149        """Test sending local environment on connection"""
1150
1151        try:
1152            os.environ['TEST'] = 'test'
1153
1154            async with self.connect(send_env=['TEST']) as conn:
1155                chan, session = await _create_session(conn, 'env')
1156
1157                await chan.wait_closed()
1158
1159                result = ''.join(session.recv_buf[None])
1160                self.assertEqual(result, 'test\n')
1161        finally:
1162            del os.environ['TEST']
1163
1164    @asynctest
1165    async def test_mixed_env(self):
1166        """Test sending a mix of local environment and new values"""
1167
1168        async with self.connect() as conn:
1169            try:
1170                os.environ['TEST'] = '1'
1171                chan, session = await _create_session(conn, 'env',
1172                                                      env={'TEST': 2},
1173                                                      send_env='TEST')
1174            finally:
1175                del os.environ['TEST']
1176
1177            await chan.wait_closed()
1178
1179            result = ''.join(session.recv_buf[None])
1180            self.assertEqual(result, '2\n')
1181
1182    @asynctest
1183    async def test_invalid_env(self):
1184        """Test sending invalid environment"""
1185
1186        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
1187            async with self.connect() as conn:
1188                chan, session = await _create_session(
1189                    conn, 'env', env={'TEST': 'invalid'})
1190
1191                await chan.wait_closed()
1192
1193                result = ''.join(session.recv_buf[None])
1194                self.assertEqual(result, '\n')
1195
1196    @asynctest
1197    async def test_xon_xoff_enable(self):
1198        """Test enabling XON/XOFF flow control"""
1199
1200        async with self.connect() as conn:
1201            chan, session = await _create_session(conn, 'xon_xoff')
1202
1203            await chan.wait_closed()
1204            self.assertEqual(session.xon_xoff, True)
1205
1206    @asynctest
1207    async def test_xon_xoff_disable(self):
1208        """Test disabling XON/XOFF flow control"""
1209
1210        async with self.connect() as conn:
1211            chan, session = await _create_session(conn, 'no_xon_xoff')
1212
1213            await chan.wait_closed()
1214            self.assertEqual(session.xon_xoff, False)
1215
1216    @asynctest
1217    async def test_break(self):
1218        """Test sending a break"""
1219
1220        async with self.connect() as conn:
1221            chan, session = await _create_session(conn, 'signals')
1222
1223            chan.send_break(1000)
1224            await chan.wait_closed()
1225            self.assertEqual(session.exit_signal_msg, '1000')
1226
1227    @asynctest
1228    async def test_signal(self):
1229        """Test sending a signal"""
1230
1231        async with self.connect() as conn:
1232            chan, session = await _create_session(conn, 'signals')
1233
1234            chan.send_signal('INT')
1235            await chan.wait_closed()
1236            self.assertEqual(session.exit_signal_msg, 'INT')
1237
1238    @asynctest
1239    async def test_numeric_signal(self):
1240        """Test sending a signal using a numeric value"""
1241
1242        async with self.connect() as conn:
1243            chan, session = await _create_session(conn, 'signals')
1244
1245            chan.send_signal(SIGINT)
1246            await chan.wait_closed()
1247            self.assertEqual(session.exit_signal_msg, 'INT')
1248
1249    @asynctest
1250    async def test_unknown_signal(self):
1251        """Test sending a signal with an unknown numeric value"""
1252
1253        async with self.connect() as conn:
1254            chan, _ = await _create_session(conn, 'signals')
1255
1256            with self.assertRaises(ValueError):
1257                chan.send_signal(123)
1258
1259            chan.close()
1260
1261    @asynctest
1262    async def test_terminate(self):
1263        """Test sending a terminate signal"""
1264
1265        async with self.connect() as conn:
1266            chan, session = await _create_session(conn, 'signals')
1267
1268            chan.terminate()
1269            await chan.wait_closed()
1270            self.assertEqual(session.exit_signal_msg, 'TERM')
1271
1272    @asynctest
1273    async def test_kill(self):
1274        """Test sending a kill signal"""
1275
1276        async with self.connect() as conn:
1277            chan, session = await _create_session(conn, 'signals')
1278
1279            chan.kill()
1280            await chan.wait_closed()
1281            self.assertEqual(session.exit_signal_msg, 'KILL')
1282
1283    @asynctest
1284    async def test_invalid_signal(self):
1285        """Test sending an invalid signal"""
1286
1287        with patch('asyncssh.connection.SSHClientChannel', _ClientChannel):
1288            async with self.connect() as conn:
1289                chan, session = await _create_session(conn, 'signals')
1290
1291                chan.send_signal(b'\xff')
1292                chan.write('\n')
1293                await chan.wait_closed()
1294                self.assertEqual(session.exit_status, None)
1295
1296    @asynctest
1297    async def test_terminal_size_change(self):
1298        """Test sending terminal size change"""
1299
1300        async with self.connect() as conn:
1301            chan, session = await _create_session(conn, 'signals',
1302                                                  term_type='ansi')
1303
1304            chan.change_terminal_size(80, 24)
1305            await chan.wait_closed()
1306            self.assertEqual(session.exit_signal_msg, '(80, 24, 0, 0)')
1307
1308    @asynctest
1309    async def test_full_terminal_size_change(self):
1310        """Test sending full terminal size change"""
1311
1312        async with self.connect() as conn:
1313            chan, session = await _create_session(conn, 'signals',
1314                                                  term_type='ansi')
1315
1316            chan.change_terminal_size(80, 24, 480, 240)
1317            await chan.wait_closed()
1318            self.assertEqual(session.exit_signal_msg, '(80, 24, 480, 240)')
1319
1320    @asynctest
1321    async def test_exit_status(self):
1322        """Test receiving exit status"""
1323
1324        async with self.connect() as conn:
1325            chan, session = await _create_session(conn, 'exit_status')
1326
1327            await chan.wait_closed()
1328            self.assertEqual(session.exit_status, 1)
1329            self.assertEqual(chan.get_exit_status(), 1)
1330            self.assertIsNone(chan.get_exit_signal())
1331            self.assertEqual(chan.get_returncode(), 1)
1332
1333    @asynctest
1334    async def test_exit_status_after_close(self):
1335        """Test delivery of exit status after remote close"""
1336
1337        async with self.connect() as conn:
1338            chan, session = await _create_session(conn, 'closed_status')
1339
1340            await chan.wait_closed()
1341            self.assertIsNone(session.exit_status)
1342            self.assertIsNone(chan.get_exit_status())
1343            self.assertIsNone(chan.get_exit_signal())
1344            self.assertIsNone(chan.get_returncode())
1345
1346    @asynctest
1347    async def test_exit_signal(self):
1348        """Test receiving exit signal"""
1349
1350        async with self.connect() as conn:
1351            chan, session = await _create_session(conn, 'exit_signal')
1352
1353            await chan.wait_closed()
1354            self.assertEqual(session.exit_signal_msg, 'exit_signal')
1355            self.assertEqual(chan.get_exit_status(), -1)
1356            self.assertEqual(chan.get_exit_signal(), ('INT', False,
1357                                                      'exit_signal',
1358                                                      DEFAULT_LANG))
1359            self.assertEqual(chan.get_returncode(), -SIGINT)
1360
1361    @asynctest
1362    async def test_exit_signal_after_close(self):
1363        """Test delivery of exit signal after remote close"""
1364
1365        async with self.connect() as conn:
1366            chan, session = await _create_session(conn, 'closed_signal')
1367
1368            await chan.wait_closed()
1369            self.assertIsNone(session.exit_signal_msg)
1370            self.assertIsNone(chan.get_exit_status())
1371            self.assertIsNone(chan.get_exit_signal())
1372            self.assertIsNone(chan.get_returncode())
1373
1374    @asynctest
1375    async def test_unknown_exit_signal(self):
1376        """Test receiving unknown exit signal"""
1377
1378        async with self.connect() as conn:
1379            chan, session = await _create_session(conn, 'unknown_signal')
1380
1381            await chan.wait_closed()
1382            self.assertEqual(session.exit_signal_msg, 'unknown_signal')
1383            self.assertEqual(chan.get_exit_status(), -1)
1384            self.assertEqual(chan.get_exit_signal(), ('unknown', False,
1385                                                      'unknown_signal',
1386                                                      DEFAULT_LANG))
1387            self.assertEqual(chan.get_returncode(), -99)
1388
1389    @asynctest
1390    async def test_invalid_exit_signal(self):
1391        """Test delivery of invalid exit signal"""
1392
1393        async with self.connect() as conn:
1394            chan, _ = await _create_session(conn, 'invalid_exit_signal')
1395
1396            await chan.wait_closed()
1397
1398    @asynctest
1399    async def test_invalid_exit_lang(self):
1400        """Test delivery of invalid exit signal language"""
1401
1402        async with self.connect() as conn:
1403            chan, _ = await _create_session(conn, 'invalid_exit_lang')
1404
1405            await chan.wait_closed()
1406
1407    @asynctest
1408    async def test_window_adjust_after_eof(self):
1409        """Test receiving window adjust after EOF"""
1410
1411        async with self.connect() as conn:
1412            chan, _ = await _create_session(conn, 'window_after_close')
1413
1414            await chan.wait_closed()
1415
1416    @asynctest
1417    async def test_empty_data(self):
1418        """Test receiving empty data packet"""
1419
1420        async with self.connect() as conn:
1421            chan, _ = await _create_session(conn, 'empty_data')
1422
1423            chan.close()
1424
1425    @asynctest
1426    async def test_partial_unicode(self):
1427        """Test receiving Unicode data spread across two packets"""
1428
1429        async with self.connect() as conn:
1430            chan, session = await _create_session(conn, 'partial_unicode')
1431
1432            await chan.wait_closed()
1433
1434            result = ''.join(session.recv_buf[None])
1435            self.assertEqual(result, '\xff\xff')
1436
1437    @asynctest
1438    async def test_partial_unicode_at_eof(self):
1439        """Test receiving partial Unicode data and then EOF"""
1440
1441        async with self.connect() as conn:
1442            chan, session = await _create_session(
1443                conn, 'partial_unicode_at_eof')
1444
1445            await chan.wait_closed()
1446            self.assertIsInstance(session.exc, asyncssh.ProtocolError)
1447
1448    @asynctest
1449    async def test_unicode_error(self):
1450        """Test receiving bad Unicode data"""
1451
1452        async with self.connect() as conn:
1453            chan, session = await _create_session(conn, 'unicode_error')
1454
1455            await chan.wait_closed()
1456            self.assertIsInstance(session.exc, asyncssh.ProtocolError)
1457
1458    @asynctest
1459    async def test_data_past_window(self):
1460        """Test receiving a data packet past the advertised window"""
1461
1462        async with self.connect() as conn:
1463            chan, _ = await _create_session(conn, 'data_past_window')
1464
1465            await chan.wait_closed()
1466
1467    @asynctest
1468    async def test_data_after_eof(self):
1469        """Test receiving data after EOF"""
1470
1471        async with self.connect() as conn:
1472            chan, _ = await _create_session(conn, 'data_after_eof')
1473
1474            await chan.wait_closed()
1475
1476    @asynctest
1477    async def test_data_after_close(self):
1478        """Test receiving data after close"""
1479
1480        async with self.connect() as conn:
1481            chan, _ = await _create_session(conn, 'data_after_close')
1482
1483            chan.write(4*1025*1024*'\0')
1484            chan.close()
1485            await asyncio.sleep(0.2)
1486            await chan.wait_closed()
1487
1488    @asynctest
1489    async def test_extended_data_after_eof(self):
1490        """Test receiving extended data after EOF"""
1491
1492        async with self.connect() as conn:
1493            chan, _ = await _create_session(conn, 'ext_data_after_eof')
1494
1495            await chan.wait_closed()
1496
1497    @asynctest
1498    async def test_invalid_datatype(self):
1499        """Test receiving data with invalid data type"""
1500
1501        async with self.connect() as conn:
1502            chan, _ = await _create_session(conn, 'invalid_datatype')
1503
1504            await chan.wait_closed()
1505
1506    @asynctest
1507    async def test_double_eof(self):
1508        """Test receiving two EOF messages"""
1509
1510        async with self.connect() as conn:
1511            chan, _ = await _create_session(conn, 'double_eof')
1512
1513            await chan.wait_closed()
1514
1515    @asynctest
1516    async def test_double_close(self):
1517        """Test receiving two close messages"""
1518
1519        async with self.connect() as conn:
1520            chan, _ = await _create_session(conn, 'double_close')
1521            chan.pause_reading()
1522            await asyncio.sleep(0.2)
1523            chan.resume_reading()
1524
1525            await chan.wait_closed()
1526
1527    @asynctest
1528    async def test_request_after_close(self):
1529        """Test receiving a channel request after a close"""
1530
1531        async with self.connect() as conn:
1532            chan, _ = await _create_session(conn, 'request_after_close')
1533
1534            await chan.wait_closed()
1535
1536    @asynctest
1537    async def test_late_auth_banner(self):
1538        """Test server sending authentication banner after auth completes"""
1539
1540        async with self.connect() as conn:
1541            chan, session = await _create_session(conn, 'late_auth_banner')
1542
1543            await chan.wait_closed()
1544            self.assertEqual(session.exit_status, 1)
1545
1546    @asynctest
1547    async def test_unexpected_userauth_request(self):
1548        """Test userauth request sent to client"""
1549
1550        async with self.connect() as conn:
1551            chan, _ = await _create_session(conn, 'unexpected_auth')
1552
1553            await chan.wait_closed()
1554
1555    @asynctest
1556    async def test_unknown_action(self):
1557        """Test unknown action"""
1558
1559        async with self.connect() as conn:
1560            chan, session = await _create_session(conn, 'unknown')
1561
1562            await chan.wait_closed()
1563            self.assertEqual(session.exit_status, 255)
1564
1565
1566class _TestChannelNoPTY(ServerTestCase):
1567    """Unit tests for AsyncSSH channel module with PTYs disallowed"""
1568
1569    @classmethod
1570    async def start_server(cls):
1571        """Start an SSH server for the tests to use"""
1572
1573        return (await cls.create_server(
1574            _ChannelServer, authorized_client_keys='authorized_keys',
1575            allow_pty=False))
1576
1577    @asynctest
1578    async def test_shell_pty(self):
1579        """Test starting a shell that request a PTY"""
1580
1581        async with self.connect() as conn:
1582            with self.assertRaises(asyncssh.ChannelOpenError):
1583                await conn.run(term_type='ansi')
1584
1585    @asynctest
1586    async def test_shell_no_pty(self):
1587        """Test starting a shell that doesn't request a PTY"""
1588
1589        async with self.connect() as conn:
1590            await conn.run(request_pty=False, stdin=asyncssh.DEVNULL)
1591
1592    @asynctest
1593    async def test_exec_pty(self):
1594        """Test execution of a remote command that requests a PTY"""
1595
1596        async with self.connect() as conn:
1597            with self.assertRaises(asyncssh.ChannelOpenError):
1598                await conn.run('echo', request_pty='force')
1599
1600    @asynctest
1601    async def test_exec_pty_from_connect(self):
1602        """Test execution of a command that requests a PTY on the connection"""
1603
1604        async with self.connect(request_pty='force') as conn:
1605            with self.assertRaises(asyncssh.ChannelOpenError):
1606                await conn.run('echo')
1607
1608    @asynctest
1609    async def test_exec_no_pty(self):
1610        """Test execution of a remote command that doesn't request a PTY"""
1611
1612        async with self.connect() as conn:
1613            await conn.run('echo', term_type='ansi', request_pty='auto',
1614                           stdin=asyncssh.DEVNULL)
1615
1616
1617class _TestChannelNoAgentForwarding(ServerTestCase):
1618    """Unit tests for channel module with agent forwarding disallowed"""
1619
1620    @classmethod
1621    async def start_server(cls):
1622        """Start an SSH server with agent forwarding disabled"""
1623
1624        return (await cls.create_server(
1625            _ChannelServer, authorized_client_keys='authorized_keys',
1626            agent_forwarding=False))
1627
1628    @asynctest
1629    async def test_agent_forwarding_disallowed(self):
1630        """Test starting a shell that request a PTY"""
1631
1632        async with self.connect(agent_forwarding=True) as conn:
1633            result = await conn.run('agent')
1634
1635        self.assertEqual(result.exit_status, 1)
1636