1 // Copyright 2016 Pierre-Étienne Meunier
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 use std;
17 use std::net::ToSocketAddrs;
18 use std::sync::Arc;
19 
20 use futures::future::Future;
21 use thrussh_keys::key;
22 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
23 use tokio::net::TcpListener;
24 use tokio::pin;
25 
26 use crate::session::*;
27 use crate::ssh_read::*;
28 use crate::sshbuffer::*;
29 use crate::*;
30 
31 mod kex;
32 mod session;
33 pub use self::kex::*;
34 pub use self::session::*;
35 mod encrypted;
36 
37 #[derive(Debug)]
38 /// Configuration of a server.
39 pub struct Config {
40     /// The server ID string sent at the beginning of the protocol.
41     pub server_id: String,
42     /// Authentication methods proposed to the client.
43     pub methods: auth::MethodSet,
44     /// The authentication banner, usually a warning message shown to the client.
45     pub auth_banner: Option<&'static str>,
46     /// Authentication rejections must happen in constant time for
47     /// security reasons. Thrussh does not handle this by default.
48     pub auth_rejection_time: std::time::Duration,
49     /// The server's keys. The first key pair in the client's preference order will be chosen.
50     pub keys: Vec<key::KeyPair>,
51     /// The bytes and time limits before key re-exchange.
52     pub limits: Limits,
53     /// The initial size of a channel (used for flow control).
54     pub window_size: u32,
55     /// The maximal size of a single packet.
56     pub maximum_packet_size: u32,
57     /// Lists of preferred algorithms.
58     pub preferred: Preferred,
59     /// Maximal number of allowed authentication attempts.
60     pub max_auth_attempts: usize,
61     /// Time after which the connection is garbage-collected.
62     pub connection_timeout: Option<std::time::Duration>,
63 }
64 
65 impl Default for Config {
default() -> Config66     fn default() -> Config {
67         Config {
68             server_id: format!(
69                 "SSH-2.0-{}_{}",
70                 env!("CARGO_PKG_NAME"),
71                 env!("CARGO_PKG_VERSION")
72             ),
73             methods: auth::MethodSet::all(),
74             auth_banner: None,
75             auth_rejection_time: std::time::Duration::from_secs(1),
76             keys: Vec::new(),
77             window_size: 2097152,
78             maximum_packet_size: 32768,
79             limits: Limits::default(),
80             preferred: Default::default(),
81             max_auth_attempts: 10,
82             connection_timeout: Some(std::time::Duration::from_secs(600)),
83         }
84     }
85 }
86 
87 /// A client's response in a challenge-response authentication.
88 #[derive(Debug)]
89 pub struct Response<'a> {
90     pos: thrussh_keys::encoding::Position<'a>,
91     n: u32,
92 }
93 
94 impl<'a> Iterator for Response<'a> {
95     type Item = &'a [u8];
next(&mut self) -> Option<Self::Item>96     fn next(&mut self) -> Option<Self::Item> {
97         if self.n == 0 {
98             None
99         } else {
100             self.n -= 1;
101             self.pos.read_string().ok()
102         }
103     }
104 }
105 
106 use std::borrow::Cow;
107 /// An authentication result, in a challenge-response authentication.
108 #[derive(Debug, PartialEq, Eq)]
109 pub enum Auth {
110     /// Reject the authentication request.
111     Reject,
112     /// Accept the authentication request.
113     Accept,
114 
115     /// Method was not accepted, but no other check was performed.
116     UnsupportedMethod,
117 
118     /// Partially accept the challenge-response authentication
119     /// request, providing more instructions for the client to follow.
120     Partial {
121         /// Name of this challenge.
122         name: Cow<'static, str>,
123         /// Instructions for this challenge.
124         instructions: Cow<'static, str>,
125         /// A number of prompts to the user. Each prompt has a `bool`
126         /// indicating whether the terminal must echo the characters
127         /// typed by the user.
128         prompts: Cow<'static, [(Cow<'static, str>, bool)]>,
129     },
130 }
131 
132 /// Server handler. Each client will have their own handler.
133 pub trait Handler: Sized {
134     type Error: From<crate::Error> + Send;
135     /// The type of authentications, which can be a future ultimately
136     /// resolving to
137     type FutureAuth: Future<Output = Result<(Self, Auth), Self::Error>> + Send;
138 
139     /// The type of units returned by some parts of this handler.
140     type FutureUnit: Future<Output = Result<(Self, Session), Self::Error>> + Send;
141 
142     /// The type of future bools returned by some parts of this handler.
143     type FutureBool: Future<Output = Result<(Self, Session, bool), Self::Error>> + Send;
144 
145     /// Convert an `Auth` to `Self::FutureAuth`. This is used to
146     /// produce the default handlers.
finished_auth(self, auth: Auth) -> Self::FutureAuth147     fn finished_auth(self, auth: Auth) -> Self::FutureAuth;
148 
149     /// Convert a `bool` to `Self::FutureBool`. This is used to
150     /// produce the default handlers.
finished_bool(self, b: bool, session: Session) -> Self::FutureBool151     fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool;
152 
153     /// Produce a `Self::FutureUnit`. This is used to produce the
154     /// default handlers.
finished(self, session: Session) -> Self::FutureUnit155     fn finished(self, session: Session) -> Self::FutureUnit;
156 
157     /// Check authentication using the "none" method. Thrussh makes
158     /// sure rejection happens in time `config.auth_rejection_time`,
159     /// except if this method takes more than that.
160     #[allow(unused_variables)]
auth_none(self, user: &str) -> Self::FutureAuth161     fn auth_none(self, user: &str) -> Self::FutureAuth {
162         self.finished_auth(Auth::Reject)
163     }
164 
165     /// Check authentication using the "password" method. Thrussh
166     /// makes sure rejection happens in time
167     /// `config.auth_rejection_time`, except if this method takes more
168     /// than that.
169     #[allow(unused_variables)]
auth_password(self, user: &str, password: &str) -> Self::FutureAuth170     fn auth_password(self, user: &str, password: &str) -> Self::FutureAuth {
171         self.finished_auth(Auth::Reject)
172     }
173 
174     /// Check authentication using the "publickey" method. This method
175     /// should just check whether the public key matches the
176     /// authorized ones. Thrussh then checks the signature. If the key
177     /// is unknown, or the signature is invalid, Thrussh guarantees
178     /// that rejection happens in constant time
179     /// `config.auth_rejection_time`, except if this method takes more
180     /// time than that.
181     #[allow(unused_variables)]
auth_publickey(self, user: &str, public_key: &key::PublicKey) -> Self::FutureAuth182     fn auth_publickey(self, user: &str, public_key: &key::PublicKey) -> Self::FutureAuth {
183         self.finished_auth(Auth::Reject)
184     }
185 
186     /// Check authentication using the "keyboard-interactive"
187     /// method. Thrussh makes sure rejection happens in time
188     /// `config.auth_rejection_time`, except if this method takes more
189     /// than that.
190     #[allow(unused_variables)]
auth_keyboard_interactive( self, user: &str, submethods: &str, response: Option<Response>, ) -> Self::FutureAuth191     fn auth_keyboard_interactive(
192         self,
193         user: &str,
194         submethods: &str,
195         response: Option<Response>,
196     ) -> Self::FutureAuth {
197         self.finished_auth(Auth::Reject)
198     }
199 
200     /// Called when the client closes a channel.
201     #[allow(unused_variables)]
channel_close(self, channel: ChannelId, session: Session) -> Self::FutureUnit202     fn channel_close(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
203         self.finished(session)
204     }
205 
206     /// Called when the client sends EOF to a channel.
207     #[allow(unused_variables)]
channel_eof(self, channel: ChannelId, session: Session) -> Self::FutureUnit208     fn channel_eof(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
209         self.finished(session)
210     }
211 
212     /// Called when a new session channel is created.
213     #[allow(unused_variables)]
channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit214     fn channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
215         self.finished(session)
216     }
217 
218     /// Called when a new X11 channel is created.
219     #[allow(unused_variables)]
channel_open_x11( self, channel: ChannelId, originator_address: &str, originator_port: u32, session: Session, ) -> Self::FutureUnit220     fn channel_open_x11(
221         self,
222         channel: ChannelId,
223         originator_address: &str,
224         originator_port: u32,
225         session: Session,
226     ) -> Self::FutureUnit {
227         self.finished(session)
228     }
229 
230     /// Called when a new channel is created.
231     #[allow(unused_variables)]
channel_open_direct_tcpip( self, channel: ChannelId, host_to_connect: &str, port_to_connect: u32, originator_address: &str, originator_port: u32, session: Session, ) -> Self::FutureUnit232     fn channel_open_direct_tcpip(
233         self,
234         channel: ChannelId,
235         host_to_connect: &str,
236         port_to_connect: u32,
237         originator_address: &str,
238         originator_port: u32,
239         session: Session,
240     ) -> Self::FutureUnit {
241         self.finished(session)
242     }
243 
244     /// Called when a data packet is received. A response can be
245     /// written to the `response` argument.
246     #[allow(unused_variables)]
data(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit247     fn data(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
248         self.finished(session)
249     }
250 
251     /// Called when an extended data packet is received. Code 1 means
252     /// that this packet comes from stderr, other codes are not
253     /// defined (see
254     /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-5.2)).
255     #[allow(unused_variables)]
extended_data( self, channel: ChannelId, code: u32, data: &[u8], session: Session, ) -> Self::FutureUnit256     fn extended_data(
257         self,
258         channel: ChannelId,
259         code: u32,
260         data: &[u8],
261         session: Session,
262     ) -> Self::FutureUnit {
263         self.finished(session)
264     }
265 
266     /// Called when the network window is adjusted, meaning that we
267     /// can send more bytes.
268     #[allow(unused_variables)]
window_adjusted( self, channel: ChannelId, new_window_size: usize, mut session: Session, ) -> Self::FutureUnit269     fn window_adjusted(
270         self,
271         channel: ChannelId,
272         new_window_size: usize,
273         mut session: Session,
274     ) -> Self::FutureUnit {
275         if let Some(ref mut enc) = session.common.encrypted {
276             enc.flush_pending(channel);
277         }
278         self.finished(session)
279     }
280 
281     /// Called when this server adjusts the network window. Return the
282     /// next target window.
283     #[allow(unused_variables)]
adjust_window(&mut self, channel: ChannelId, current: u32) -> u32284     fn adjust_window(&mut self, channel: ChannelId, current: u32) -> u32 {
285         current
286     }
287 
288     /// The client requests a pseudo-terminal with the given
289     /// specifications.
290     #[allow(unused_variables)]
pty_request( self, channel: ChannelId, term: &str, col_width: u32, row_height: u32, pix_width: u32, pix_height: u32, modes: &[(Pty, u32)], session: Session, ) -> Self::FutureUnit291     fn pty_request(
292         self,
293         channel: ChannelId,
294         term: &str,
295         col_width: u32,
296         row_height: u32,
297         pix_width: u32,
298         pix_height: u32,
299         modes: &[(Pty, u32)],
300         session: Session,
301     ) -> Self::FutureUnit {
302         self.finished(session)
303     }
304 
305     /// The client requests an X11 connection.
306     #[allow(unused_variables)]
x11_request( self, channel: ChannelId, single_connection: bool, x11_auth_protocol: &str, x11_auth_cookie: &str, x11_screen_number: u32, session: Session, ) -> Self::FutureUnit307     fn x11_request(
308         self,
309         channel: ChannelId,
310         single_connection: bool,
311         x11_auth_protocol: &str,
312         x11_auth_cookie: &str,
313         x11_screen_number: u32,
314         session: Session,
315     ) -> Self::FutureUnit {
316         self.finished(session)
317     }
318 
319     /// The client wants to set the given environment variable. Check
320     /// these carefully, as it is dangerous to allow any variable
321     /// environment to be set.
322     #[allow(unused_variables)]
env_request( self, channel: ChannelId, variable_name: &str, variable_value: &str, session: Session, ) -> Self::FutureUnit323     fn env_request(
324         self,
325         channel: ChannelId,
326         variable_name: &str,
327         variable_value: &str,
328         session: Session,
329     ) -> Self::FutureUnit {
330         self.finished(session)
331     }
332 
333     /// The client requests a shell.
334     #[allow(unused_variables)]
shell_request(self, channel: ChannelId, session: Session) -> Self::FutureUnit335     fn shell_request(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
336         self.finished(session)
337     }
338 
339     /// The client sends a command to execute, to be passed to a
340     /// shell. Make sure to check the command before doing so.
341     #[allow(unused_variables)]
exec_request(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit342     fn exec_request(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
343         self.finished(session)
344     }
345 
346     /// The client asks to start the subsystem with the given name
347     /// (such as sftp).
348     #[allow(unused_variables)]
subsystem_request( self, channel: ChannelId, name: &str, session: Session, ) -> Self::FutureUnit349     fn subsystem_request(
350         self,
351         channel: ChannelId,
352         name: &str,
353         session: Session,
354     ) -> Self::FutureUnit {
355         self.finished(session)
356     }
357 
358     /// The client's pseudo-terminal window size has changed.
359     #[allow(unused_variables)]
window_change_request( self, channel: ChannelId, col_width: u32, row_height: u32, pix_width: u32, pix_height: u32, session: Session, ) -> Self::FutureUnit360     fn window_change_request(
361         self,
362         channel: ChannelId,
363         col_width: u32,
364         row_height: u32,
365         pix_width: u32,
366         pix_height: u32,
367         session: Session,
368     ) -> Self::FutureUnit {
369         self.finished(session)
370     }
371 
372     /// The client is sending a signal (usually to pass to the
373     /// currently running process).
374     #[allow(unused_variables)]
signal(self, channel: ChannelId, signal_name: Sig, session: Session) -> Self::FutureUnit375     fn signal(self, channel: ChannelId, signal_name: Sig, session: Session) -> Self::FutureUnit {
376         self.finished(session)
377     }
378 
379     /// Used for reverse-forwarding ports, see
380     /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
381     #[allow(unused_variables)]
tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool382     fn tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
383         self.finished_bool(false, session)
384     }
385     /// Used to stop the reverse-forwarding of a port, see
386     /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
387     #[allow(unused_variables)]
cancel_tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool388     fn cancel_tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
389         self.finished_bool(false, session)
390     }
391 }
392 
393 /// Trait used to create new handlers when clients connect.
394 pub trait Server {
395     /// The type of handlers.
396     type Handler: Handler + Send;
397     /// Called when a new client connects.
new(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler398     fn new(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler;
399 }
400 
401 /// Run a server.
402 /// Create a new `Connection` from the server's configuration, a
403 /// stream and a [`Handler`](trait.Handler.html).
run<H: Server + Send + 'static>( config: Arc<Config>, addr: &str, mut server: H, ) -> Result<(), std::io::Error>404 pub async fn run<H: Server + Send + 'static>(
405     config: Arc<Config>,
406     addr: &str,
407     mut server: H,
408 ) -> Result<(), std::io::Error> {
409     let addr = addr.to_socket_addrs().unwrap().next().unwrap();
410     let socket = TcpListener::bind(&addr).await?;
411     if config.maximum_packet_size > 65535 {
412         error!(
413             "Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
414             config.maximum_packet_size
415         );
416     }
417     while let Ok((socket, _)) = socket.accept().await {
418         let config = config.clone();
419         let server = server.new(socket.peer_addr().ok());
420         tokio::spawn(run_stream(config, socket, server));
421     }
422     Ok(())
423 }
424 
425 use std::cell::RefCell;
426 thread_local! {
427     static B1: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
428     static B2: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
429 }
430 
timeout(delay: Option<std::time::Duration>)431 pub async fn timeout(delay: Option<std::time::Duration>) {
432     if let Some(delay) = delay {
433         tokio::time::sleep(delay).await
434     } else {
435         futures::future::pending().await
436     };
437 }
438 
start_reading<R: AsyncRead + Unpin>( mut stream_read: R, mut buffer: SSHBuffer, cipher: Arc<crate::cipher::CipherPair>, ) -> Result<(usize, R, SSHBuffer), Error>439 async fn start_reading<R: AsyncRead + Unpin>(
440     mut stream_read: R,
441     mut buffer: SSHBuffer,
442     cipher: Arc<crate::cipher::CipherPair>,
443 ) -> Result<(usize, R, SSHBuffer), Error> {
444     buffer.buffer.clear();
445     let n = cipher::read(&mut stream_read, &mut buffer, &cipher).await?;
446     Ok((n, stream_read, buffer))
447 }
448 
run_stream<H: Handler, R>( config: Arc<Config>, mut stream: R, handler: H, ) -> Result<H, H::Error> where R: AsyncRead + AsyncWrite + Unpin,449 pub async fn run_stream<H: Handler, R>(
450     config: Arc<Config>,
451     mut stream: R,
452     handler: H,
453 ) -> Result<H, H::Error>
454 where
455     R: AsyncRead + AsyncWrite + Unpin,
456 {
457     let mut handler = Some(handler);
458     let delay = config.connection_timeout;
459     // Writing SSH id.
460     let mut decomp = CryptoVec::new();
461     let mut write_buffer = SSHBuffer::new();
462     write_buffer.send_ssh_id(config.as_ref().server_id.as_bytes());
463     stream
464         .write_all(&write_buffer.buffer[..])
465         .await
466         .map_err(crate::Error::from)?;
467 
468     // Reading SSH id and allocating a session.
469     let mut stream = SshRead::new(&mut stream);
470     let common = read_ssh_id(config, &mut stream).await?;
471     let (sender, receiver) = tokio::sync::mpsc::channel(10);
472     let mut session = Session {
473         target_window_size: common.config.window_size,
474         common,
475         receiver,
476         sender: server::session::Handle { sender },
477         pending_reads: Vec::new(),
478         pending_len: 0,
479     };
480     session.flush()?;
481     stream
482         .write_all(&session.common.write_buffer.buffer)
483         .await
484         .map_err(crate::Error::from)?;
485     session.common.write_buffer.buffer.clear();
486 
487     let (stream_read, mut stream_write) = stream.split();
488     let buffer = SSHBuffer::new();
489     let reading = start_reading(stream_read, buffer, session.common.cipher.clone());
490     pin!(reading);
491     let mut is_reading = None;
492 
493     while !session.common.disconnected {
494         tokio::select! {
495             r = &mut reading => {
496                 let (stream_read, buffer) = match r {
497                     Ok((_, stream_read, buffer)) => (stream_read, buffer),
498                     Err(e) => return Err(e.into())
499                 };
500                 if buffer.buffer.len() < 5 {
501                     is_reading = Some((stream_read, buffer));
502                     break
503                 }
504                 let buf = if let Some(ref mut enc) = session.common.encrypted {
505                     let d = enc.decompress.decompress(
506                         &buffer.buffer[5..],
507                         &mut decomp,
508                     );
509                     if let Ok(buf) = d {
510                         buf
511                     } else {
512                         debug!("err = {:?}", d);
513                         is_reading = Some((stream_read, buffer));
514                         break
515                     }
516                 } else {
517                     &buffer.buffer[5..]
518                 };
519                 if !buf.is_empty() {
520                     if buf[0] == crate::msg::DISCONNECT {
521                         debug!("break");
522                         is_reading = Some((stream_read, buffer));
523                         break;
524                     } else if buf[0] > 4 {
525                         match reply(session, &mut handler, &buf[..]).await {
526                             Ok(s) => session = s,
527                             Err(e) => return Err(e),
528                         }
529                     }
530                 }
531                 reading.set(start_reading(stream_read, buffer, session.common.cipher.clone()));
532             }
533             _ = timeout(delay) => {
534                 debug!("timeout");
535                 break
536             },
537             msg = session.receiver.recv(), if !session.is_rekeying() => {
538                 match msg {
539                     Some((id, ChannelMsg::Data { data })) => {
540                         session.data(id, data);
541                     }
542                     Some((id, ChannelMsg::ExtendedData { ext, data })) => {
543                         session.extended_data(id, ext, data);
544                     }
545                     Some((id, ChannelMsg::Eof)) => {
546                         session.eof(id);
547                     }
548                     Some((id, ChannelMsg::Close)) => {
549                         session.close(id);
550                     }
551                     Some((id, ChannelMsg::XonXoff { client_can_do })) => {
552                         session.xon_xoff_request(id, client_can_do);
553                     }
554                     Some((id, ChannelMsg::ExitStatus { exit_status })) => {
555                         session.exit_status_request(id, exit_status);
556                     }
557                     Some((id, ChannelMsg::ExitSignal { signal_name, core_dumped, error_message, lang_tag })) => {
558                         session.exit_signal_request(id, signal_name, core_dumped, &error_message, &lang_tag);
559                     }
560                     Some((id, ChannelMsg::WindowAdjusted { new_size })) => {
561                         debug!("window adjusted to {:?} for channel {:?}", new_size, id);
562                     }
563                     Some((id, ChannelMsg::Success)) => {
564                         debug!("channel success {:?}", id);
565                     }
566                     None => {
567                         debug!("session.receiver: received None");
568                     }
569                 }
570             }
571         }
572         session.flush()?;
573         stream_write
574             .write_all(&session.common.write_buffer.buffer)
575             .await
576             .map_err(crate::Error::from)?;
577         session.common.write_buffer.buffer.clear();
578     }
579     debug!("disconnected");
580     // Shutdown
581     stream_write.shutdown().await.map_err(crate::Error::from)?;
582     loop {
583         if let Some((stream_read, buffer)) = is_reading.take() {
584             reading.set(start_reading(
585                 stream_read,
586                 buffer,
587                 session.common.cipher.clone(),
588             ));
589         }
590         let (n, r, b) = (&mut reading).await?;
591         is_reading = Some((r, b));
592         if n == 0 {
593             break;
594         }
595     }
596     Ok(handler.unwrap())
597 }
598 
read_ssh_id<R: AsyncRead + Unpin>( config: Arc<Config>, read: &mut SshRead<R>, ) -> Result<CommonSession<Arc<Config>>, Error>599 async fn read_ssh_id<R: AsyncRead + Unpin>(
600     config: Arc<Config>,
601     read: &mut SshRead<R>,
602 ) -> Result<CommonSession<Arc<Config>>, Error> {
603     let sshid = if let Some(t) = config.connection_timeout {
604         tokio::time::timeout(t, read.read_ssh_id()).await??
605     } else {
606         read.read_ssh_id().await?
607     };
608     let mut exchange = Exchange::new();
609     exchange.client_id.extend(sshid);
610     // Preparing the response
611     exchange
612         .server_id
613         .extend(config.as_ref().server_id.as_bytes());
614     let mut kexinit = KexInit {
615         exchange: exchange,
616         algo: None,
617         sent: false,
618         session_id: None,
619     };
620     let cipher = Arc::new(cipher::CLEAR_PAIR);
621     let mut write_buffer = SSHBuffer::new();
622     kexinit.server_write(config.as_ref(), cipher.as_ref(), &mut write_buffer)?;
623     Ok(CommonSession {
624         write_buffer,
625         kex: Some(Kex::KexInit(kexinit)),
626         auth_user: String::new(),
627         auth_method: None, // Client only.
628         cipher,
629         encrypted: None,
630         config: config,
631         wants_reply: false,
632         disconnected: false,
633         buffer: CryptoVec::new(),
634     })
635 }
636 
reply<H: Handler>( mut session: Session, handler: &mut Option<H>, buf: &[u8], ) -> Result<Session, H::Error>637 async fn reply<H: Handler>(
638     mut session: Session,
639     handler: &mut Option<H>,
640     buf: &[u8],
641 ) -> Result<Session, H::Error> {
642     // Handle key exchange/re-exchange.
643     if session.common.encrypted.is_none() {
644         match session.common.kex.take() {
645             Some(Kex::KexInit(kexinit)) => {
646                 if kexinit.algo.is_some() || buf[0] == msg::KEXINIT {
647                     session.common.kex = Some(kexinit.server_parse(
648                         session.common.config.as_ref(),
649                         &session.common.cipher,
650                         &buf,
651                         &mut session.common.write_buffer,
652                     )?);
653                     return Ok(session);
654                 } else {
655                     // Else, i.e. if the other side has not started
656                     // the key exchange, process its packets by simple
657                     // not returning.
658                     session.common.kex = Some(Kex::KexInit(kexinit))
659                 }
660             }
661             Some(Kex::KexDh(kexdh)) => {
662                 session.common.kex = Some(kexdh.parse(
663                     session.common.config.as_ref(),
664                     &session.common.cipher,
665                     buf,
666                     &mut session.common.write_buffer,
667                 )?);
668                 return Ok(session);
669             }
670             Some(Kex::NewKeys(newkeys)) => {
671                 if buf[0] != msg::NEWKEYS {
672                     return Err(Error::Kex.into());
673                 }
674                 // Ok, NEWKEYS received, now encrypted.
675                 session.common.encrypted(
676                     EncryptedState::WaitingServiceRequest {
677                         sent: false,
678                         accepted: false,
679                     },
680                     newkeys,
681                 );
682                 return Ok(session);
683             }
684             Some(kex) => {
685                 session.common.kex = Some(kex);
686                 return Ok(session);
687             }
688             None => {}
689         }
690         Ok(session)
691     } else {
692         Ok(session.server_read_encrypted(handler, buf).await?)
693     }
694 }
695