1 // Copyright 2016 Pierre-Étienne Meunier
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 use std;
17 use std::net::ToSocketAddrs;
18 use std::sync::Arc;
19
20 use futures::future::Future;
21 use thrussh_keys::key;
22 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
23 use tokio::net::TcpListener;
24 use tokio::pin;
25
26 use crate::session::*;
27 use crate::ssh_read::*;
28 use crate::sshbuffer::*;
29 use crate::*;
30
31 mod kex;
32 mod session;
33 pub use self::kex::*;
34 pub use self::session::*;
35 mod encrypted;
36
37 #[derive(Debug)]
38 /// Configuration of a server.
39 pub struct Config {
40 /// The server ID string sent at the beginning of the protocol.
41 pub server_id: String,
42 /// Authentication methods proposed to the client.
43 pub methods: auth::MethodSet,
44 /// The authentication banner, usually a warning message shown to the client.
45 pub auth_banner: Option<&'static str>,
46 /// Authentication rejections must happen in constant time for
47 /// security reasons. Thrussh does not handle this by default.
48 pub auth_rejection_time: std::time::Duration,
49 /// The server's keys. The first key pair in the client's preference order will be chosen.
50 pub keys: Vec<key::KeyPair>,
51 /// The bytes and time limits before key re-exchange.
52 pub limits: Limits,
53 /// The initial size of a channel (used for flow control).
54 pub window_size: u32,
55 /// The maximal size of a single packet.
56 pub maximum_packet_size: u32,
57 /// Lists of preferred algorithms.
58 pub preferred: Preferred,
59 /// Maximal number of allowed authentication attempts.
60 pub max_auth_attempts: usize,
61 /// Time after which the connection is garbage-collected.
62 pub connection_timeout: Option<std::time::Duration>,
63 }
64
65 impl Default for Config {
default() -> Config66 fn default() -> Config {
67 Config {
68 server_id: format!(
69 "SSH-2.0-{}_{}",
70 env!("CARGO_PKG_NAME"),
71 env!("CARGO_PKG_VERSION")
72 ),
73 methods: auth::MethodSet::all(),
74 auth_banner: None,
75 auth_rejection_time: std::time::Duration::from_secs(1),
76 keys: Vec::new(),
77 window_size: 2097152,
78 maximum_packet_size: 32768,
79 limits: Limits::default(),
80 preferred: Default::default(),
81 max_auth_attempts: 10,
82 connection_timeout: Some(std::time::Duration::from_secs(600)),
83 }
84 }
85 }
86
87 /// A client's response in a challenge-response authentication.
88 #[derive(Debug)]
89 pub struct Response<'a> {
90 pos: thrussh_keys::encoding::Position<'a>,
91 n: u32,
92 }
93
94 impl<'a> Iterator for Response<'a> {
95 type Item = &'a [u8];
next(&mut self) -> Option<Self::Item>96 fn next(&mut self) -> Option<Self::Item> {
97 if self.n == 0 {
98 None
99 } else {
100 self.n -= 1;
101 self.pos.read_string().ok()
102 }
103 }
104 }
105
106 use std::borrow::Cow;
107 /// An authentication result, in a challenge-response authentication.
108 #[derive(Debug, PartialEq, Eq)]
109 pub enum Auth {
110 /// Reject the authentication request.
111 Reject,
112 /// Accept the authentication request.
113 Accept,
114
115 /// Method was not accepted, but no other check was performed.
116 UnsupportedMethod,
117
118 /// Partially accept the challenge-response authentication
119 /// request, providing more instructions for the client to follow.
120 Partial {
121 /// Name of this challenge.
122 name: Cow<'static, str>,
123 /// Instructions for this challenge.
124 instructions: Cow<'static, str>,
125 /// A number of prompts to the user. Each prompt has a `bool`
126 /// indicating whether the terminal must echo the characters
127 /// typed by the user.
128 prompts: Cow<'static, [(Cow<'static, str>, bool)]>,
129 },
130 }
131
132 /// Server handler. Each client will have their own handler.
133 pub trait Handler: Sized {
134 type Error: From<crate::Error> + Send;
135 /// The type of authentications, which can be a future ultimately
136 /// resolving to
137 type FutureAuth: Future<Output = Result<(Self, Auth), Self::Error>> + Send;
138
139 /// The type of units returned by some parts of this handler.
140 type FutureUnit: Future<Output = Result<(Self, Session), Self::Error>> + Send;
141
142 /// The type of future bools returned by some parts of this handler.
143 type FutureBool: Future<Output = Result<(Self, Session, bool), Self::Error>> + Send;
144
145 /// Convert an `Auth` to `Self::FutureAuth`. This is used to
146 /// produce the default handlers.
finished_auth(self, auth: Auth) -> Self::FutureAuth147 fn finished_auth(self, auth: Auth) -> Self::FutureAuth;
148
149 /// Convert a `bool` to `Self::FutureBool`. This is used to
150 /// produce the default handlers.
finished_bool(self, b: bool, session: Session) -> Self::FutureBool151 fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool;
152
153 /// Produce a `Self::FutureUnit`. This is used to produce the
154 /// default handlers.
finished(self, session: Session) -> Self::FutureUnit155 fn finished(self, session: Session) -> Self::FutureUnit;
156
157 /// Check authentication using the "none" method. Thrussh makes
158 /// sure rejection happens in time `config.auth_rejection_time`,
159 /// except if this method takes more than that.
160 #[allow(unused_variables)]
auth_none(self, user: &str) -> Self::FutureAuth161 fn auth_none(self, user: &str) -> Self::FutureAuth {
162 self.finished_auth(Auth::Reject)
163 }
164
165 /// Check authentication using the "password" method. Thrussh
166 /// makes sure rejection happens in time
167 /// `config.auth_rejection_time`, except if this method takes more
168 /// than that.
169 #[allow(unused_variables)]
auth_password(self, user: &str, password: &str) -> Self::FutureAuth170 fn auth_password(self, user: &str, password: &str) -> Self::FutureAuth {
171 self.finished_auth(Auth::Reject)
172 }
173
174 /// Check authentication using the "publickey" method. This method
175 /// should just check whether the public key matches the
176 /// authorized ones. Thrussh then checks the signature. If the key
177 /// is unknown, or the signature is invalid, Thrussh guarantees
178 /// that rejection happens in constant time
179 /// `config.auth_rejection_time`, except if this method takes more
180 /// time than that.
181 #[allow(unused_variables)]
auth_publickey(self, user: &str, public_key: &key::PublicKey) -> Self::FutureAuth182 fn auth_publickey(self, user: &str, public_key: &key::PublicKey) -> Self::FutureAuth {
183 self.finished_auth(Auth::Reject)
184 }
185
186 /// Check authentication using the "keyboard-interactive"
187 /// method. Thrussh makes sure rejection happens in time
188 /// `config.auth_rejection_time`, except if this method takes more
189 /// than that.
190 #[allow(unused_variables)]
auth_keyboard_interactive( self, user: &str, submethods: &str, response: Option<Response>, ) -> Self::FutureAuth191 fn auth_keyboard_interactive(
192 self,
193 user: &str,
194 submethods: &str,
195 response: Option<Response>,
196 ) -> Self::FutureAuth {
197 self.finished_auth(Auth::Reject)
198 }
199
200 /// Called when the client closes a channel.
201 #[allow(unused_variables)]
channel_close(self, channel: ChannelId, session: Session) -> Self::FutureUnit202 fn channel_close(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
203 self.finished(session)
204 }
205
206 /// Called when the client sends EOF to a channel.
207 #[allow(unused_variables)]
channel_eof(self, channel: ChannelId, session: Session) -> Self::FutureUnit208 fn channel_eof(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
209 self.finished(session)
210 }
211
212 /// Called when a new session channel is created.
213 #[allow(unused_variables)]
channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit214 fn channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
215 self.finished(session)
216 }
217
218 /// Called when a new X11 channel is created.
219 #[allow(unused_variables)]
channel_open_x11( self, channel: ChannelId, originator_address: &str, originator_port: u32, session: Session, ) -> Self::FutureUnit220 fn channel_open_x11(
221 self,
222 channel: ChannelId,
223 originator_address: &str,
224 originator_port: u32,
225 session: Session,
226 ) -> Self::FutureUnit {
227 self.finished(session)
228 }
229
230 /// Called when a new channel is created.
231 #[allow(unused_variables)]
channel_open_direct_tcpip( self, channel: ChannelId, host_to_connect: &str, port_to_connect: u32, originator_address: &str, originator_port: u32, session: Session, ) -> Self::FutureUnit232 fn channel_open_direct_tcpip(
233 self,
234 channel: ChannelId,
235 host_to_connect: &str,
236 port_to_connect: u32,
237 originator_address: &str,
238 originator_port: u32,
239 session: Session,
240 ) -> Self::FutureUnit {
241 self.finished(session)
242 }
243
244 /// Called when a data packet is received. A response can be
245 /// written to the `response` argument.
246 #[allow(unused_variables)]
data(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit247 fn data(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
248 self.finished(session)
249 }
250
251 /// Called when an extended data packet is received. Code 1 means
252 /// that this packet comes from stderr, other codes are not
253 /// defined (see
254 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-5.2)).
255 #[allow(unused_variables)]
extended_data( self, channel: ChannelId, code: u32, data: &[u8], session: Session, ) -> Self::FutureUnit256 fn extended_data(
257 self,
258 channel: ChannelId,
259 code: u32,
260 data: &[u8],
261 session: Session,
262 ) -> Self::FutureUnit {
263 self.finished(session)
264 }
265
266 /// Called when the network window is adjusted, meaning that we
267 /// can send more bytes.
268 #[allow(unused_variables)]
window_adjusted( self, channel: ChannelId, new_window_size: usize, mut session: Session, ) -> Self::FutureUnit269 fn window_adjusted(
270 self,
271 channel: ChannelId,
272 new_window_size: usize,
273 mut session: Session,
274 ) -> Self::FutureUnit {
275 if let Some(ref mut enc) = session.common.encrypted {
276 enc.flush_pending(channel);
277 }
278 self.finished(session)
279 }
280
281 /// Called when this server adjusts the network window. Return the
282 /// next target window.
283 #[allow(unused_variables)]
adjust_window(&mut self, channel: ChannelId, current: u32) -> u32284 fn adjust_window(&mut self, channel: ChannelId, current: u32) -> u32 {
285 current
286 }
287
288 /// The client requests a pseudo-terminal with the given
289 /// specifications.
290 #[allow(unused_variables)]
pty_request( self, channel: ChannelId, term: &str, col_width: u32, row_height: u32, pix_width: u32, pix_height: u32, modes: &[(Pty, u32)], session: Session, ) -> Self::FutureUnit291 fn pty_request(
292 self,
293 channel: ChannelId,
294 term: &str,
295 col_width: u32,
296 row_height: u32,
297 pix_width: u32,
298 pix_height: u32,
299 modes: &[(Pty, u32)],
300 session: Session,
301 ) -> Self::FutureUnit {
302 self.finished(session)
303 }
304
305 /// The client requests an X11 connection.
306 #[allow(unused_variables)]
x11_request( self, channel: ChannelId, single_connection: bool, x11_auth_protocol: &str, x11_auth_cookie: &str, x11_screen_number: u32, session: Session, ) -> Self::FutureUnit307 fn x11_request(
308 self,
309 channel: ChannelId,
310 single_connection: bool,
311 x11_auth_protocol: &str,
312 x11_auth_cookie: &str,
313 x11_screen_number: u32,
314 session: Session,
315 ) -> Self::FutureUnit {
316 self.finished(session)
317 }
318
319 /// The client wants to set the given environment variable. Check
320 /// these carefully, as it is dangerous to allow any variable
321 /// environment to be set.
322 #[allow(unused_variables)]
env_request( self, channel: ChannelId, variable_name: &str, variable_value: &str, session: Session, ) -> Self::FutureUnit323 fn env_request(
324 self,
325 channel: ChannelId,
326 variable_name: &str,
327 variable_value: &str,
328 session: Session,
329 ) -> Self::FutureUnit {
330 self.finished(session)
331 }
332
333 /// The client requests a shell.
334 #[allow(unused_variables)]
shell_request(self, channel: ChannelId, session: Session) -> Self::FutureUnit335 fn shell_request(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
336 self.finished(session)
337 }
338
339 /// The client sends a command to execute, to be passed to a
340 /// shell. Make sure to check the command before doing so.
341 #[allow(unused_variables)]
exec_request(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit342 fn exec_request(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
343 self.finished(session)
344 }
345
346 /// The client asks to start the subsystem with the given name
347 /// (such as sftp).
348 #[allow(unused_variables)]
subsystem_request( self, channel: ChannelId, name: &str, session: Session, ) -> Self::FutureUnit349 fn subsystem_request(
350 self,
351 channel: ChannelId,
352 name: &str,
353 session: Session,
354 ) -> Self::FutureUnit {
355 self.finished(session)
356 }
357
358 /// The client's pseudo-terminal window size has changed.
359 #[allow(unused_variables)]
window_change_request( self, channel: ChannelId, col_width: u32, row_height: u32, pix_width: u32, pix_height: u32, session: Session, ) -> Self::FutureUnit360 fn window_change_request(
361 self,
362 channel: ChannelId,
363 col_width: u32,
364 row_height: u32,
365 pix_width: u32,
366 pix_height: u32,
367 session: Session,
368 ) -> Self::FutureUnit {
369 self.finished(session)
370 }
371
372 /// The client is sending a signal (usually to pass to the
373 /// currently running process).
374 #[allow(unused_variables)]
signal(self, channel: ChannelId, signal_name: Sig, session: Session) -> Self::FutureUnit375 fn signal(self, channel: ChannelId, signal_name: Sig, session: Session) -> Self::FutureUnit {
376 self.finished(session)
377 }
378
379 /// Used for reverse-forwarding ports, see
380 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
381 #[allow(unused_variables)]
tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool382 fn tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
383 self.finished_bool(false, session)
384 }
385 /// Used to stop the reverse-forwarding of a port, see
386 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
387 #[allow(unused_variables)]
cancel_tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool388 fn cancel_tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
389 self.finished_bool(false, session)
390 }
391 }
392
393 /// Trait used to create new handlers when clients connect.
394 pub trait Server {
395 /// The type of handlers.
396 type Handler: Handler + Send;
397 /// Called when a new client connects.
new(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler398 fn new(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler;
399 }
400
401 /// Run a server.
402 /// Create a new `Connection` from the server's configuration, a
403 /// stream and a [`Handler`](trait.Handler.html).
run<H: Server + Send + 'static>( config: Arc<Config>, addr: &str, mut server: H, ) -> Result<(), std::io::Error>404 pub async fn run<H: Server + Send + 'static>(
405 config: Arc<Config>,
406 addr: &str,
407 mut server: H,
408 ) -> Result<(), std::io::Error> {
409 let addr = addr.to_socket_addrs().unwrap().next().unwrap();
410 let socket = TcpListener::bind(&addr).await?;
411 if config.maximum_packet_size > 65535 {
412 error!(
413 "Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
414 config.maximum_packet_size
415 );
416 }
417 while let Ok((socket, _)) = socket.accept().await {
418 let config = config.clone();
419 let server = server.new(socket.peer_addr().ok());
420 tokio::spawn(run_stream(config, socket, server));
421 }
422 Ok(())
423 }
424
425 use std::cell::RefCell;
426 thread_local! {
427 static B1: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
428 static B2: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
429 }
430
timeout(delay: Option<std::time::Duration>)431 pub async fn timeout(delay: Option<std::time::Duration>) {
432 if let Some(delay) = delay {
433 tokio::time::sleep(delay).await
434 } else {
435 futures::future::pending().await
436 };
437 }
438
start_reading<R: AsyncRead + Unpin>( mut stream_read: R, mut buffer: SSHBuffer, cipher: Arc<crate::cipher::CipherPair>, ) -> Result<(usize, R, SSHBuffer), Error>439 async fn start_reading<R: AsyncRead + Unpin>(
440 mut stream_read: R,
441 mut buffer: SSHBuffer,
442 cipher: Arc<crate::cipher::CipherPair>,
443 ) -> Result<(usize, R, SSHBuffer), Error> {
444 buffer.buffer.clear();
445 let n = cipher::read(&mut stream_read, &mut buffer, &cipher).await?;
446 Ok((n, stream_read, buffer))
447 }
448
run_stream<H: Handler, R>( config: Arc<Config>, mut stream: R, handler: H, ) -> Result<H, H::Error> where R: AsyncRead + AsyncWrite + Unpin,449 pub async fn run_stream<H: Handler, R>(
450 config: Arc<Config>,
451 mut stream: R,
452 handler: H,
453 ) -> Result<H, H::Error>
454 where
455 R: AsyncRead + AsyncWrite + Unpin,
456 {
457 let mut handler = Some(handler);
458 let delay = config.connection_timeout;
459 // Writing SSH id.
460 let mut decomp = CryptoVec::new();
461 let mut write_buffer = SSHBuffer::new();
462 write_buffer.send_ssh_id(config.as_ref().server_id.as_bytes());
463 stream
464 .write_all(&write_buffer.buffer[..])
465 .await
466 .map_err(crate::Error::from)?;
467
468 // Reading SSH id and allocating a session.
469 let mut stream = SshRead::new(&mut stream);
470 let common = read_ssh_id(config, &mut stream).await?;
471 let (sender, receiver) = tokio::sync::mpsc::channel(10);
472 let mut session = Session {
473 target_window_size: common.config.window_size,
474 common,
475 receiver,
476 sender: server::session::Handle { sender },
477 pending_reads: Vec::new(),
478 pending_len: 0,
479 };
480 session.flush()?;
481 stream
482 .write_all(&session.common.write_buffer.buffer)
483 .await
484 .map_err(crate::Error::from)?;
485 session.common.write_buffer.buffer.clear();
486
487 let (stream_read, mut stream_write) = stream.split();
488 let buffer = SSHBuffer::new();
489 let reading = start_reading(stream_read, buffer, session.common.cipher.clone());
490 pin!(reading);
491 let mut is_reading = None;
492
493 while !session.common.disconnected {
494 tokio::select! {
495 r = &mut reading => {
496 let (stream_read, buffer) = match r {
497 Ok((_, stream_read, buffer)) => (stream_read, buffer),
498 Err(e) => return Err(e.into())
499 };
500 if buffer.buffer.len() < 5 {
501 is_reading = Some((stream_read, buffer));
502 break
503 }
504 let buf = if let Some(ref mut enc) = session.common.encrypted {
505 let d = enc.decompress.decompress(
506 &buffer.buffer[5..],
507 &mut decomp,
508 );
509 if let Ok(buf) = d {
510 buf
511 } else {
512 debug!("err = {:?}", d);
513 is_reading = Some((stream_read, buffer));
514 break
515 }
516 } else {
517 &buffer.buffer[5..]
518 };
519 if !buf.is_empty() {
520 if buf[0] == crate::msg::DISCONNECT {
521 debug!("break");
522 is_reading = Some((stream_read, buffer));
523 break;
524 } else if buf[0] > 4 {
525 match reply(session, &mut handler, &buf[..]).await {
526 Ok(s) => session = s,
527 Err(e) => return Err(e),
528 }
529 }
530 }
531 reading.set(start_reading(stream_read, buffer, session.common.cipher.clone()));
532 }
533 _ = timeout(delay) => {
534 debug!("timeout");
535 break
536 },
537 msg = session.receiver.recv(), if !session.is_rekeying() => {
538 match msg {
539 Some((id, ChannelMsg::Data { data })) => {
540 session.data(id, data);
541 }
542 Some((id, ChannelMsg::ExtendedData { ext, data })) => {
543 session.extended_data(id, ext, data);
544 }
545 Some((id, ChannelMsg::Eof)) => {
546 session.eof(id);
547 }
548 Some((id, ChannelMsg::Close)) => {
549 session.close(id);
550 }
551 Some((id, ChannelMsg::XonXoff { client_can_do })) => {
552 session.xon_xoff_request(id, client_can_do);
553 }
554 Some((id, ChannelMsg::ExitStatus { exit_status })) => {
555 session.exit_status_request(id, exit_status);
556 }
557 Some((id, ChannelMsg::ExitSignal { signal_name, core_dumped, error_message, lang_tag })) => {
558 session.exit_signal_request(id, signal_name, core_dumped, &error_message, &lang_tag);
559 }
560 Some((id, ChannelMsg::WindowAdjusted { new_size })) => {
561 debug!("window adjusted to {:?} for channel {:?}", new_size, id);
562 }
563 Some((id, ChannelMsg::Success)) => {
564 debug!("channel success {:?}", id);
565 }
566 None => {
567 debug!("session.receiver: received None");
568 }
569 }
570 }
571 }
572 session.flush()?;
573 stream_write
574 .write_all(&session.common.write_buffer.buffer)
575 .await
576 .map_err(crate::Error::from)?;
577 session.common.write_buffer.buffer.clear();
578 }
579 debug!("disconnected");
580 // Shutdown
581 stream_write.shutdown().await.map_err(crate::Error::from)?;
582 loop {
583 if let Some((stream_read, buffer)) = is_reading.take() {
584 reading.set(start_reading(
585 stream_read,
586 buffer,
587 session.common.cipher.clone(),
588 ));
589 }
590 let (n, r, b) = (&mut reading).await?;
591 is_reading = Some((r, b));
592 if n == 0 {
593 break;
594 }
595 }
596 Ok(handler.unwrap())
597 }
598
read_ssh_id<R: AsyncRead + Unpin>( config: Arc<Config>, read: &mut SshRead<R>, ) -> Result<CommonSession<Arc<Config>>, Error>599 async fn read_ssh_id<R: AsyncRead + Unpin>(
600 config: Arc<Config>,
601 read: &mut SshRead<R>,
602 ) -> Result<CommonSession<Arc<Config>>, Error> {
603 let sshid = if let Some(t) = config.connection_timeout {
604 tokio::time::timeout(t, read.read_ssh_id()).await??
605 } else {
606 read.read_ssh_id().await?
607 };
608 let mut exchange = Exchange::new();
609 exchange.client_id.extend(sshid);
610 // Preparing the response
611 exchange
612 .server_id
613 .extend(config.as_ref().server_id.as_bytes());
614 let mut kexinit = KexInit {
615 exchange: exchange,
616 algo: None,
617 sent: false,
618 session_id: None,
619 };
620 let cipher = Arc::new(cipher::CLEAR_PAIR);
621 let mut write_buffer = SSHBuffer::new();
622 kexinit.server_write(config.as_ref(), cipher.as_ref(), &mut write_buffer)?;
623 Ok(CommonSession {
624 write_buffer,
625 kex: Some(Kex::KexInit(kexinit)),
626 auth_user: String::new(),
627 auth_method: None, // Client only.
628 cipher,
629 encrypted: None,
630 config: config,
631 wants_reply: false,
632 disconnected: false,
633 buffer: CryptoVec::new(),
634 })
635 }
636
reply<H: Handler>( mut session: Session, handler: &mut Option<H>, buf: &[u8], ) -> Result<Session, H::Error>637 async fn reply<H: Handler>(
638 mut session: Session,
639 handler: &mut Option<H>,
640 buf: &[u8],
641 ) -> Result<Session, H::Error> {
642 // Handle key exchange/re-exchange.
643 if session.common.encrypted.is_none() {
644 match session.common.kex.take() {
645 Some(Kex::KexInit(kexinit)) => {
646 if kexinit.algo.is_some() || buf[0] == msg::KEXINIT {
647 session.common.kex = Some(kexinit.server_parse(
648 session.common.config.as_ref(),
649 &session.common.cipher,
650 &buf,
651 &mut session.common.write_buffer,
652 )?);
653 return Ok(session);
654 } else {
655 // Else, i.e. if the other side has not started
656 // the key exchange, process its packets by simple
657 // not returning.
658 session.common.kex = Some(Kex::KexInit(kexinit))
659 }
660 }
661 Some(Kex::KexDh(kexdh)) => {
662 session.common.kex = Some(kexdh.parse(
663 session.common.config.as_ref(),
664 &session.common.cipher,
665 buf,
666 &mut session.common.write_buffer,
667 )?);
668 return Ok(session);
669 }
670 Some(Kex::NewKeys(newkeys)) => {
671 if buf[0] != msg::NEWKEYS {
672 return Err(Error::Kex.into());
673 }
674 // Ok, NEWKEYS received, now encrypted.
675 session.common.encrypted(
676 EncryptedState::WaitingServiceRequest {
677 sent: false,
678 accepted: false,
679 },
680 newkeys,
681 );
682 return Ok(session);
683 }
684 Some(kex) => {
685 session.common.kex = Some(kex);
686 return Ok(session);
687 }
688 None => {}
689 }
690 Ok(session)
691 } else {
692 Ok(session.server_read_encrypted(handler, buf).await?)
693 }
694 }
695