1 //! Code to handle incoming cells on a circuit. 2 use super::streammap::{ShouldSendEnd, StreamEnt}; 3 use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse}; 4 use crate::circuit::unique_id::UniqId; 5 use crate::circuit::{ 6 sendme, streammap, CircParameters, Create2Wrap, CreateFastWrap, CreateHandshakeWrap, 7 }; 8 use crate::crypto::cell::{ 9 ClientLayer, CryptInit, HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, 10 OutboundClientLayer, RelayCellBody, Tor1RelayCrypto, 11 }; 12 use crate::util::err::ReactorError; 13 use crate::{Error, Result}; 14 use std::collections::VecDeque; 15 use std::marker::PhantomData; 16 use std::pin::Pin; 17 use tor_cell::chancell::msg::{ChanMsg, Relay}; 18 use tor_cell::relaycell::msg::{End, RelayMsg, Sendme}; 19 use tor_cell::relaycell::{RelayCell, RelayCmd, StreamId}; 20 21 use futures::channel::{mpsc, oneshot}; 22 use futures::Sink; 23 use futures::Stream; 24 25 use std::sync::atomic::{AtomicU8, Ordering}; 26 use std::sync::Arc; 27 use std::task::{Context, Poll}; 28 29 use crate::channel::Channel; 30 #[cfg(test)] 31 use crate::circuit::sendme::CircTag; 32 use crate::circuit::sendme::StreamSendWindow; 33 use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey}; 34 use crate::crypto::handshake::{ClientHandshake, KeyGenerator}; 35 use tor_cell::chancell; 36 use tor_cell::chancell::{ChanCell, CircId}; 37 use tor_linkspec::LinkSpec; 38 use tor_llcrypto::pk; 39 use tracing::{debug, trace, warn}; 40 41 /// Initial value for outbound flow-control window on streams. 42 pub(super) const SEND_WINDOW_INIT: u16 = 500; 43 /// Initial value for inbound flow-control window on streams. 44 pub(super) const RECV_WINDOW_INIT: u16 = 500; 45 /// Size of the buffer used between the reactor and a `StreamReader`. 46 /// 47 /// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't 48 /// get sent more than the receive window anyway!). We might do due to things that 49 /// don't count towards the window though. 50 pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize; 51 52 /// The type of a oneshot channel used to inform reactor users of the result of an operation. 53 pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>; 54 55 /// A handshake type, to be used when creating circuit hops. 56 #[derive(Clone, Debug)] 57 pub(super) enum CircuitHandshake { 58 /// Use the CREATE_FAST handshake. 59 CreateFast, 60 /// Use the ntor handshake. 61 Ntor { 62 /// The public key of the relay. 63 public_key: NtorPublicKey, 64 /// The first hop's Ed25519 identity, which is verified against 65 /// the identity held in the circuit's channel. 66 ed_identity: pk::ed25519::Ed25519Identity, 67 }, 68 } 69 70 /// A message telling the reactor to do something. 71 #[derive(Debug)] 72 pub(super) enum CtrlMsg { 73 /// Create the first hop of this circuit. 74 Create { 75 /// A oneshot channel on which we'll receive the creation response. 76 recv_created: oneshot::Receiver<CreateResponse>, 77 /// The handshake type to use for the first hop. 78 handshake: CircuitHandshake, 79 /// Whether the hop supports authenticated SENDME cells. 80 supports_authenticated_sendme: bool, 81 /// Other parameters relevant for circuit creation. 82 params: CircParameters, 83 /// Oneshot channel to notify on completion. 84 done: ReactorResultChannel<()>, 85 }, 86 /// Extend a circuit by one hop, using the ntor handshake. 87 ExtendNtor { 88 /// The handshake type to use for this hop. 89 public_key: NtorPublicKey, 90 /// Information about how to connect to the relay we're extending to. 91 linkspecs: Vec<LinkSpec>, 92 /// Whether the hop supports authenticated SENDME cells. 93 supports_authenticated_sendme: bool, 94 /// Other parameters relevant for circuit extension. 95 params: CircParameters, 96 /// Oneshot channel to notify on completion. 97 done: ReactorResultChannel<()>, 98 }, 99 /// Begin a stream with the provided hop in this circuit. 100 /// 101 /// Allocates a stream ID, and sends the provided message to that hop. 102 BeginStream { 103 /// The hop number to begin the stream with. 104 hop_num: HopNum, 105 /// The message to send. 106 message: RelayMsg, 107 /// A channel to send messages on this stream down. 108 /// 109 /// This sender shouldn't ever block, because we use congestion control and only send 110 /// SENDME cells once we've read enough out of the other end. If it *does* block, we 111 /// can assume someone is trying to send us more cells than they should, and abort 112 /// the stream. 113 sender: mpsc::Sender<RelayMsg>, 114 /// A channel to receive messages to send on this stream from. 115 rx: mpsc::Receiver<RelayMsg>, 116 /// Oneshot channel to notify on completion, with the allocated stream ID. 117 done: ReactorResultChannel<StreamId>, 118 }, 119 /// Send a SENDME cell (used to ask for more data to be sent) on the given stream. 120 SendSendme { 121 /// The stream ID to send a SENDME for. 122 stream_id: StreamId, 123 /// The hop number the stream is on. 124 hop_num: HopNum, 125 }, 126 /// Shut down the reactor. 127 Shutdown, 128 /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography. 129 #[cfg(test)] 130 AddFakeHop { 131 supports_flowctrl_1: bool, 132 fwd_lasthop: bool, 133 rev_lasthop: bool, 134 params: CircParameters, 135 done: ReactorResultChannel<()>, 136 }, 137 /// (tests only) Get the send window and expected tags for a given hop. 138 #[cfg(test)] 139 QuerySendWindow { 140 hop: HopNum, 141 done: ReactorResultChannel<(u16, Vec<CircTag>)>, 142 }, 143 /// (tests only) Send a raw relay cell with send_relay_cell(). 144 #[cfg(test)] 145 SendRelayCell { 146 hop: HopNum, 147 early: bool, 148 cell: RelayCell, 149 }, 150 } 151 /// Represents the reactor's view of a single hop. 152 pub(super) struct CircHop { 153 /// Map from stream IDs to streams. 154 /// 155 /// We store this with the reactor instead of the circuit, since the 156 /// reactor needs it for every incoming cell on a stream, whereas 157 /// the circuit only needs it when allocating new streams. 158 map: streammap::StreamMap, 159 /// Window used to say how many cells we can receive. 160 recvwindow: sendme::CircRecvWindow, 161 /// If true, this hop is using an older link protocol and we 162 /// shouldn't expect good authenticated SENDMEs from it. 163 auth_sendme_optional: bool, 164 /// Window used to say how many cells we can send. 165 sendwindow: sendme::CircSendWindow, 166 /// Buffer for messages we can't send to this hop yet due to congestion control. 167 /// 168 /// Contains the tag we should give to the send window, and the cell to send. 169 /// 170 /// This shouldn't grow unboundedly: we try and pop things off it first before 171 /// doing things that would result in it growing (and stop before growing it 172 /// if popping things off it can't be done). 173 outbound: VecDeque<([u8; 20], ChanCell)>, 174 } 175 176 impl CircHop { 177 /// Create a new hop. new(auth_sendme_optional: bool, initial_window: u16) -> Self178 pub(super) fn new(auth_sendme_optional: bool, initial_window: u16) -> Self { 179 CircHop { 180 map: streammap::StreamMap::new(), 181 recvwindow: sendme::CircRecvWindow::new(1000), 182 auth_sendme_optional, 183 sendwindow: sendme::CircSendWindow::new(initial_window), 184 outbound: VecDeque::new(), 185 } 186 } 187 } 188 189 /// An object that's waiting for a meta cell (one not associated with a stream) in order to make 190 /// progress. 191 /// 192 /// # Background 193 /// 194 /// The `Reactor` can't have async functions that send and receive cells, because its job is to 195 /// send and receive cells: if one of its functions tried to do that, it would just hang forever. 196 /// 197 /// To get around this problem, the reactor can send some cells, and then make one of these 198 /// `MetaCellHandler` objects, which will be run when the reply arrives. 199 pub(super) trait MetaCellHandler: Send { 200 /// The hop we're expecting the message to come from. This is compared against the hop 201 /// from which we actually receive messages, and an error is thrown if the two don't match. expected_hop(&self) -> HopNum202 fn expected_hop(&self) -> HopNum; 203 /// Called when the message we were waiting for arrives. 204 /// 205 /// Gets a copy of the `Reactor` in order to do anything it likes there. finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>206 fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>; 207 } 208 209 /// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait. 210 /// 211 /// Yes, I know having trait bounds on structs is bad, but in this case it's necessary 212 /// since we want to be able to use `H::KeyType`. 213 struct CircuitExtender<H, L, FWD, REV> 214 where 215 H: ClientHandshake, 216 { 217 /// Handshake state. 218 state: Option<H::StateType>, 219 /// Whether the hop supports authenticated SENDME cells. 220 supports_flowctrl_1: bool, 221 /// Parameters used for this extension. 222 params: CircParameters, 223 /// An identifier for logging about this reactor's circuit. 224 unique_id: UniqId, 225 /// The hop we're expecting the EXTENDED2 cell to come back from. 226 expected_hop: HopNum, 227 /// `PhantomData` used to make the other type parameters required for a circuit extension 228 /// part of the `struct`, instead of having them be provided during a function call. 229 /// 230 /// This is done this way so we can implement `MetaCellHandler` for this type, which 231 /// doesn't include any generic type parameters; we need them to be part of the type 232 /// so we know what they are for that `impl` block. 233 phantom: PhantomData<(L, FWD, REV)>, 234 } 235 impl<H, L, FWD, REV> CircuitExtender<H, L, FWD, REV> 236 where 237 H: ClientHandshake, 238 H::KeyGen: KeyGenerator, 239 L: CryptInit + ClientLayer<FWD, REV>, 240 FWD: OutboundClientLayer + 'static + Send, 241 REV: InboundClientLayer + 'static + Send, 242 { 243 /// Start extending a circuit, sending the necessary EXTEND cell and returning a 244 /// new `CircuitExtender` to be called when the reply arrives. 245 /// 246 /// The `handshake_id` is the numeric identifier for what kind of 247 /// handshake we're doing. The `key` is the relay's onion key that 248 /// goes along with the handshake, and the `linkspecs` are the 249 /// link specifiers to include in the EXTEND cell to tell the 250 /// current last hop which relay to connect to. begin( cx: &mut Context<'_>, handshake_id: u16, key: &H::KeyType, linkspecs: Vec<LinkSpec>, supports_flowctrl_1: bool, params: CircParameters, reactor: &mut Reactor, ) -> Result<Self>251 fn begin( 252 cx: &mut Context<'_>, 253 handshake_id: u16, 254 key: &H::KeyType, 255 linkspecs: Vec<LinkSpec>, 256 supports_flowctrl_1: bool, 257 params: CircParameters, 258 reactor: &mut Reactor, 259 ) -> Result<Self> { 260 let mut rng = rand::thread_rng(); 261 let unique_id = reactor.unique_id; 262 263 use tor_cell::relaycell::msg::{Body, Extend2}; 264 // Perform the first part of the cryptographic handshake 265 let (state, msg) = H::client1(&mut rng, key)?; 266 267 let n_hops = reactor.crypto_out.n_layers(); 268 let hop = ((n_hops - 1) as u8).into(); 269 270 debug!( 271 "{}: Extending circuit to hop {} with {:?}", 272 unique_id, 273 n_hops + 1, 274 linkspecs 275 ); 276 277 let extend_msg = Extend2::new(linkspecs, handshake_id, msg); 278 let cell = RelayCell::new(0.into(), extend_msg.into_message()); 279 280 // Send the message to the last hop... 281 reactor.send_relay_cell( 282 cx, hop, true, // use a RELAY_EARLY cell 283 cell, 284 )?; 285 trace!("{}: waiting for EXTENDED2 cell", unique_id); 286 // ... and now we wait for a response. 287 288 Ok(Self { 289 state: Some(state), 290 supports_flowctrl_1, 291 params, 292 unique_id, 293 expected_hop: hop, 294 phantom: Default::default(), 295 }) 296 } 297 } 298 299 impl<H, L, FWD, REV> MetaCellHandler for CircuitExtender<H, L, FWD, REV> 300 where 301 H: ClientHandshake, 302 H::StateType: Send, 303 H::KeyGen: KeyGenerator, 304 L: CryptInit + ClientLayer<FWD, REV> + Send, 305 FWD: OutboundClientLayer + 'static + Send, 306 REV: InboundClientLayer + 'static + Send, 307 { expected_hop(&self) -> HopNum308 fn expected_hop(&self) -> HopNum { 309 self.expected_hop 310 } finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>311 fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()> { 312 // XXXX If two EXTEND cells are of these are launched on the 313 // same circuit at once, could they collide in this part of 314 // the function? I don't _think_ so, but it might be a good idea 315 // to have an "extending" bit that keeps two tasks from entering 316 // extend_impl at the same time. 317 // 318 // Also we could enforce that `hop` is still what we expect it 319 // to be at this point. 320 321 // Did we get the right response? 322 if msg.cmd() != RelayCmd::EXTENDED2 { 323 return Err(Error::CircProto(format!( 324 "wanted EXTENDED2; got {}", 325 msg.cmd(), 326 ))); 327 } 328 329 // ???? Do we need to shutdown the circuit for the remaining error 330 // ???? cases in this function? 331 332 let msg = match msg { 333 RelayMsg::Extended2(e) => e, 334 _ => return Err(Error::InternalError("Body didn't match cmd".into())), 335 }; 336 let relay_handshake = msg.into_body(); 337 338 trace!( 339 "{}: Received EXTENDED2 cell; completing handshake.", 340 self.unique_id 341 ); 342 // Now perform the second part of the handshake, and see if it 343 // succeeded. 344 let keygen = H::client2( 345 self.state 346 .take() 347 .expect("CircuitExtender::finish() called twice"), 348 relay_handshake, 349 )?; 350 let layer = L::construct(keygen)?; 351 352 debug!("{}: Handshake complete; circuit extended.", self.unique_id); 353 354 // If we get here, it succeeded. Add a new hop to the circuit. 355 let (layer_fwd, layer_back) = layer.split(); 356 reactor.add_hop( 357 self.supports_flowctrl_1, 358 Box::new(layer_fwd), 359 Box::new(layer_back), 360 &self.params, 361 ); 362 Ok(()) 363 } 364 } 365 366 /// Object to handle incoming cells and background tasks on a circuit 367 /// 368 /// This type is returned when you finish a circuit; you need to spawn a 369 /// new task that calls `run()` on it. 370 #[must_use = "If you don't call run() on a reactor, the circuit won't work."] 371 pub struct Reactor { 372 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects. 373 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>, 374 /// Buffer for cells we can't send out the channel yet due to it being full. 375 /// 376 /// This should be used very very rarely: see `send_msg_direct`'s comments for more 377 /// information. (in fact, using it will generate a warning!) 378 pub(super) outbound: VecDeque<ChanCell>, 379 /// The channel this circuit is using to send cells through. 380 pub(super) channel: Channel, 381 /// Input stream, on which we receive ChanMsg objects from this circuit's 382 /// channel. 383 // TODO: could use a SPSC channel here instead. 384 pub(super) input: mpsc::Receiver<ClientCircChanMsg>, 385 /// The cryptographic state for this circuit for inbound cells. 386 /// This object is divided into multiple layers, each of which is 387 /// shared with one hop of the circuit. 388 pub(super) crypto_in: InboundClientCrypt, 389 /// The cryptographic state for this circuit for outbound cells. 390 pub(super) crypto_out: OutboundClientCrypt, 391 /// List of hops state objects used by the reactor 392 pub(super) hops: Vec<CircHop>, 393 /// Shared atomic for the number of hops this circuit has. 394 pub(super) num_hops: Arc<AtomicU8>, 395 /// An identifier for logging about this reactor's circuit. 396 pub(super) unique_id: UniqId, 397 /// This circuit's identifier on the upstream channel. 398 pub(super) channel_id: CircId, 399 /// A handler for a meta cell, together with a result channel to notify on completion. 400 pub(super) meta_handler: Option<(Box<dyn MetaCellHandler>, ReactorResultChannel<()>)>, 401 } 402 403 impl Reactor { 404 /// Launch the reactor, and run until the circuit closes or we 405 /// encounter an error. 406 /// 407 /// Once this method returns, the circuit is dead and cannot be 408 /// used again. run(mut self) -> Result<()>409 pub async fn run(mut self) -> Result<()> { 410 trace!("{}: Running circuit reactor", self.unique_id); 411 let result: Result<()> = loop { 412 match self.run_once().await { 413 Ok(()) => (), 414 Err(ReactorError::Shutdown) => break Ok(()), 415 Err(ReactorError::Err(e)) => break Err(e), 416 } 417 }; 418 debug!("{}: Circuit reactor stopped: {:?}", self.unique_id, result); 419 result 420 } 421 422 /// Helper for run: doesn't mark the circuit closed on finish. Only 423 /// processes one cell or control message. run_once(&mut self) -> std::result::Result<(), ReactorError>424 pub(super) async fn run_once(&mut self) -> std::result::Result<(), ReactorError> { 425 #[allow(clippy::cognitive_complexity)] 426 let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> { 427 let mut create_message = None; 428 let mut did_things = false; 429 430 // Check whether we've got a control message pending. 431 if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) { 432 match ret { 433 None => { 434 trace!("{}: reactor shutdown due to control drop", self.unique_id); 435 return Poll::Ready(Err(ReactorError::Shutdown)); 436 } 437 Some(CtrlMsg::Shutdown) => { 438 trace!( 439 "{}: reactor shutdown due to explicit request", 440 self.unique_id 441 ); 442 return Poll::Ready(Err(ReactorError::Shutdown)); 443 } 444 // This message requires actually blocking, so we can't handle it inside 445 // this nonblocking poll_fn. 446 Some(x @ CtrlMsg::Create { .. }) => create_message = Some(x), 447 Some(msg) => { 448 self.handle_control(cx, msg)?; 449 did_things = true; 450 } 451 } 452 } 453 454 // Check whether we've got an input message pending. 455 if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) { 456 match ret { 457 None => { 458 trace!("{}: reactor shutdown due to input drop", self.unique_id); 459 return Poll::Ready(Err(ReactorError::Shutdown)); 460 } 461 Some(cell) => { 462 if self.handle_cell(cx, cell)? { 463 trace!("{}: reactor shutdown due to handled cell", self.unique_id); 464 return Poll::Ready(Err(ReactorError::Shutdown)); 465 } 466 did_things = true; 467 } 468 } 469 } 470 471 // Now for the tricky part. We want to grab some relay cells from all of our streams 472 // and forward them on to the channel, but we need to pay attention to both whether 473 // the channel can accept cells right now, and whether congestion control allows us 474 // to send them. 475 // 476 // We also have to do somewhat cursed things and call start_send inside this poll_fn, 477 // since we need to check whether the channel can still receive cells after each one 478 // that we send. 479 480 let mut streams_to_close = vec![]; 481 let mut stream_relaycells = vec![]; 482 483 // Is the channel ready to receive anything at all? 484 if self.channel.poll_ready(cx)? { 485 // (using this as a named block for early returns; not actually a loop) 486 #[allow(clippy::never_loop)] 487 'outer: loop { 488 // First, drain our queue of things we tried to send earlier, but couldn't. 489 while let Some(msg) = self.outbound.pop_front() { 490 trace!("{}: sending from enqueued: {:?}", self.unique_id, msg); 491 Pin::new(&mut self.channel).start_send(msg)?; 492 493 // `futures::Sink::start_send` dictates we need to call `poll_ready` before 494 // each `start_send` call. 495 if !self.channel.poll_ready(cx)? { 496 break 'outer; 497 } 498 } 499 500 // Let's look at our hops, and streams for each hop. 501 for (i, hop) in self.hops.iter_mut().enumerate() { 502 let hop_num = HopNum::from(i as u8); 503 // If we can, drain our queue of things we tried to send earlier, but 504 // couldn't due to congestion control. 505 if hop.sendwindow.window() > 0 { 506 'hop: while let Some((tag, msg)) = hop.outbound.pop_front() { 507 trace!( 508 "{}: sending from hop-{}-enqueued: {:?}", 509 self.unique_id, 510 i, 511 msg 512 ); 513 Pin::new(&mut self.channel).start_send(msg)?; 514 hop.sendwindow.take(&tag)?; 515 if !self.channel.poll_ready(cx)? { 516 break 'outer; 517 } 518 if hop.sendwindow.window() == 0 { 519 break 'hop; 520 } 521 } 522 } 523 // Look at all of the streams on this hop. 524 for (id, stream) in hop.map.inner().iter_mut() { 525 if let StreamEnt::Open { 526 rx, send_window, .. 527 } = stream 528 { 529 // Do the stream and hop send windows allow us to obtain and 530 // send something? 531 // 532 // FIXME(eta): not everything counts toward congestion control! 533 if send_window.window() > 0 && hop.sendwindow.window() > 0 { 534 match Pin::new(rx).poll_next(cx) { 535 Poll::Ready(Some(m)) => { 536 stream_relaycells 537 .push((hop_num, RelayCell::new(*id, m))); 538 } 539 Poll::Ready(None) => { 540 // Stream receiver was dropped; close the stream. 541 // We can't close it here though due to borrowck; that 542 // will happen later. 543 streams_to_close.push((hop_num, *id)); 544 } 545 Poll::Pending => {} 546 } 547 } 548 } 549 } 550 } 551 552 break; 553 } 554 } 555 556 // Close the streams we said we'd close. 557 for (hopn, id) in streams_to_close { 558 self.close_stream(cx, hopn, id)?; 559 did_things = true; 560 } 561 // Send messages we said we'd send. 562 for (hopn, rc) in stream_relaycells { 563 self.send_relay_cell(cx, hopn, false, rc)?; 564 did_things = true; 565 } 566 567 let _ = Pin::new(&mut self.channel) 568 .poll_flush(cx) 569 .map_err(|_| Error::ChannelClosed)?; 570 if create_message.is_some() { 571 Poll::Ready(Ok(create_message)) 572 } else if did_things { 573 Poll::Ready(Ok(None)) 574 } else { 575 Poll::Pending 576 } 577 }); 578 let create_message = fut.await?; 579 if let Some(CtrlMsg::Create { 580 recv_created, 581 handshake, 582 supports_authenticated_sendme, 583 params, 584 done, 585 }) = create_message 586 { 587 let ret = match handshake { 588 CircuitHandshake::CreateFast => { 589 self.create_firsthop_fast(recv_created, ¶ms).await 590 } 591 CircuitHandshake::Ntor { 592 public_key, 593 ed_identity, 594 } => { 595 self.create_firsthop_ntor( 596 recv_created, 597 ed_identity, 598 public_key, 599 supports_authenticated_sendme, 600 ¶ms, 601 ) 602 .await 603 } 604 }; 605 let _ = done.send(ret); // don't care if sender goes away 606 futures::future::poll_fn(|cx| -> Poll<Result<()>> { 607 let _ = Pin::new(&mut self.channel) 608 .poll_flush(cx) 609 .map_err(|_| Error::ChannelClosed)?; 610 Poll::Ready(Ok(())) 611 }) 612 .await?; 613 } 614 Ok(()) 615 } 616 617 /// Helper: create the first hop of a circuit. 618 /// 619 /// This is parameterized not just on the RNG, but a wrapper object to 620 /// build the right kind of create cell, a handshake object to perform 621 /// the cryptographic cryptographic handshake, and a layer type to 622 /// handle relay crypto after this hop is built. create_impl<L, FWD, REV, H, W>( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, wrap: &W, key: &H::KeyType, supports_flowctrl_1: bool, params: &CircParameters, ) -> Result<()> where L: CryptInit + ClientLayer<FWD, REV> + 'static + Send, FWD: OutboundClientLayer + 'static + Send, REV: InboundClientLayer + 'static + Send, H: ClientHandshake, W: CreateHandshakeWrap, H::KeyGen: KeyGenerator,623 async fn create_impl<L, FWD, REV, H, W>( 624 &mut self, 625 recvcreated: oneshot::Receiver<CreateResponse>, 626 wrap: &W, 627 key: &H::KeyType, 628 supports_flowctrl_1: bool, 629 params: &CircParameters, 630 ) -> Result<()> 631 where 632 L: CryptInit + ClientLayer<FWD, REV> + 'static + Send, // need all this?XXXX 633 FWD: OutboundClientLayer + 'static + Send, 634 REV: InboundClientLayer + 'static + Send, 635 H: ClientHandshake, 636 W: CreateHandshakeWrap, 637 H::KeyGen: KeyGenerator, 638 { 639 // We don't need to shut down the circuit on failure here, since this 640 // function consumes the PendingClientCirc and only returns 641 // a ClientCirc on success. 642 643 let (state, msg) = { 644 // done like this because holding the RNG across an await boundary makes the future 645 // non-Send 646 let mut rng = rand::thread_rng(); 647 H::client1(&mut rng, key)? 648 }; 649 let create_cell = wrap.to_chanmsg(msg); 650 debug!( 651 "{}: Extending to hop 1 with {}", 652 self.unique_id, 653 create_cell.cmd() 654 ); 655 self.send_msg(create_cell).await?; 656 657 let reply = recvcreated 658 .await 659 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?; 660 661 let relay_handshake = wrap.from_chanmsg(reply)?; 662 let keygen = H::client2(state, relay_handshake)?; 663 664 let layer = L::construct(keygen)?; 665 666 debug!("{}: Handshake complete; circuit created.", self.unique_id); 667 668 let (layer_fwd, layer_back) = layer.split(); 669 self.add_hop( 670 supports_flowctrl_1, 671 Box::new(layer_fwd), 672 Box::new(layer_back), 673 params, 674 ); 675 Ok(()) 676 } 677 678 /// Use the (questionable!) CREATE_FAST handshake to connect to the 679 /// first hop of this circuit. 680 /// 681 /// There's no authentication in CREATE_FAST, 682 /// so we don't need to know whom we're connecting to: we're just 683 /// connecting to whichever relay the channel is for. create_firsthop_fast( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, params: &CircParameters, ) -> Result<()>684 async fn create_firsthop_fast( 685 &mut self, 686 recvcreated: oneshot::Receiver<CreateResponse>, 687 params: &CircParameters, 688 ) -> Result<()> { 689 use crate::crypto::handshake::fast::CreateFastClient; 690 let wrap = CreateFastWrap; 691 self.create_impl::<Tor1RelayCrypto, _, _, CreateFastClient, _>( 692 recvcreated, 693 &wrap, 694 &(), 695 false, 696 params, 697 ) 698 .await 699 } 700 701 /// Use the ntor handshake to connect to the first hop of this circuit. 702 /// 703 /// Note that the provided 'target' must match the channel's target, 704 /// or the handshake will fail. create_firsthop_ntor( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, ed_identity: pk::ed25519::Ed25519Identity, pubkey: NtorPublicKey, supports_flowctrl_1: bool, params: &CircParameters, ) -> Result<()>705 async fn create_firsthop_ntor( 706 &mut self, 707 recvcreated: oneshot::Receiver<CreateResponse>, 708 ed_identity: pk::ed25519::Ed25519Identity, 709 pubkey: NtorPublicKey, 710 supports_flowctrl_1: bool, 711 params: &CircParameters, 712 ) -> Result<()> { 713 // Exit now if we have an Ed25519 or RSA identity mismatch. 714 // FIXME(eta): this is copypasta from Channel::check_match! 715 if self.channel.peer_rsa_id() != &pubkey.id { 716 return Err(Error::ChanMismatch(format!( 717 "Identity {} does not match target {}", 718 self.channel.peer_rsa_id(), 719 pubkey.id, 720 ))); 721 } 722 if self.channel.peer_ed25519_id() != &ed_identity { 723 return Err(Error::ChanMismatch(format!( 724 "Identity {} does not match target {}", 725 self.channel.peer_ed25519_id(), 726 ed_identity 727 ))); 728 } 729 730 let wrap = Create2Wrap { 731 handshake_type: 0x0002, // ntor 732 }; 733 self.create_impl::<Tor1RelayCrypto, _, _, NtorClient, _>( 734 recvcreated, 735 &wrap, 736 &pubkey, 737 supports_flowctrl_1, 738 params, 739 ) 740 .await 741 } 742 743 /// Add a hop to the end of this circuit. add_hop( &mut self, supports_flowctrl_1: bool, fwd: Box<dyn OutboundClientLayer + 'static + Send>, rev: Box<dyn InboundClientLayer + 'static + Send>, params: &CircParameters, )744 fn add_hop( 745 &mut self, 746 supports_flowctrl_1: bool, 747 fwd: Box<dyn OutboundClientLayer + 'static + Send>, 748 rev: Box<dyn InboundClientLayer + 'static + Send>, 749 params: &CircParameters, 750 ) { 751 let hop = crate::circuit::reactor::CircHop::new( 752 supports_flowctrl_1, 753 params.initial_send_window(), 754 ); 755 self.hops.push(hop); 756 self.crypto_in.add_layer(rev); 757 self.crypto_out.add_layer(fwd); 758 self.num_hops.fetch_add(1, Ordering::SeqCst); 759 } 760 761 /// Handle a RELAY cell on this circuit with stream ID 0. handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()>762 fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()> { 763 // SENDME cells and TRUNCATED get handled internally by the circuit. 764 if let RelayMsg::Sendme(s) = msg { 765 return self.handle_sendme(hopnum, s); 766 } 767 if let RelayMsg::Truncated(_) = msg { 768 // XXXX need to handle Truncated cells. This isn't the right 769 // way, but at least it's safe. 770 // TODO: If we ever do handle Truncate cells more 771 // correctly, we will need to audit all our use of HopNum 772 // to identify a layer. Otherwise we could confuse a 773 // message from the previous hop N with a message from the 774 // new hop N. 775 return Err(Error::CircuitClosed); 776 } 777 778 trace!("{}: Received meta-cell {:?}", self.unique_id, msg); 779 780 // For all other command types, we'll only get them in response 781 // to another command, which should have registered a responder. 782 // 783 // TODO: that means that service-introduction circuits will need 784 // a different implementation, but that should be okay. We'll work 785 // something out. 786 if let Some((mut handler, done)) = self.meta_handler.take() { 787 if handler.expected_hop() == hopnum { 788 // Somebody was waiting for a message -- maybe this message 789 let ret = handler.finish(msg, self); 790 trace!( 791 "{}: meta handler completed with result: {:?}", 792 self.unique_id, 793 ret 794 ); 795 let _ = done.send(ret); // don't care if sender goes away 796 Ok(()) 797 } else { 798 // Somebody wanted a message from a different hop! Put this 799 // one back. 800 self.meta_handler = Some((handler, done)); 801 Err(Error::CircProto(format!( 802 "Unexpected {} cell from hop {} on client circuit", 803 msg.cmd(), 804 hopnum, 805 ))) 806 } 807 } else { 808 // No need to call shutdown here, since this error will 809 // propagate to the reactor shut it down. 810 Err(Error::CircProto(format!( 811 "Unexpected {} cell on client circuit", 812 msg.cmd() 813 ))) 814 } 815 } 816 817 /// Handle a RELAY_SENDME cell on this circuit with stream ID 0. handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()>818 fn handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()> { 819 // No need to call "shutdown" on errors in this function; 820 // it's called from the reactor task and errors will propagate there. 821 let hop = self 822 .hop_mut(hopnum) 823 .ok_or_else(|| Error::CircProto(format!("Couldn't find {} hop", hopnum)))?; 824 825 let auth: Option<[u8; 20]> = match msg.into_tag() { 826 Some(v) if v.len() == 20 => { 827 // XXXX ugly code. 828 let mut tag = [0_u8; 20]; 829 (&mut tag).copy_from_slice(&v[..]); 830 Some(tag) 831 } 832 Some(_) => return Err(Error::CircProto("malformed tag on circuit sendme".into())), 833 None => { 834 if !hop.auth_sendme_optional { 835 return Err(Error::CircProto("missing tag on circuit sendme".into())); 836 } else { 837 None 838 } 839 } 840 }; 841 match hop.sendwindow.put(auth) { 842 Some(_) => Ok(()), 843 None => Err(Error::CircProto("bad auth tag on circuit sendme".into())), 844 } 845 } 846 847 /// Send a message onto the circuit's channel (to be called with a `Context`) 848 /// 849 /// If the channel is ready to accept messages, it will be sent immediately. If not, the message 850 /// will be enqueued for sending at a later iteration of the reactor loop. 851 /// 852 /// # Note 853 /// 854 /// Making use of the enqueuing capabilities of this function is discouraged! You should first 855 /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and 856 /// ideally use this to implement backpressure (such that you do not read from other sources 857 /// that would send here while you know you're unable to forward the messages on). send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()>858 fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> { 859 let cell = ChanCell::new(self.channel_id, msg); 860 if self.channel.poll_ready(cx)? { 861 Pin::new(&mut self.channel).start_send(cell)?; 862 } else { 863 // This case shouldn't actually happen that often, if ever. We generally check whether 864 // the channel can be sent to before calling this function (the one exception at the 865 // time of writing is in circuit creation). 866 // 867 // If this is suddenly getting hit and it wasn't before, maybe you added something that 868 // doesn't bother to check the channel (`self.channel.poll_ready(cx)`) before calling 869 // this function, and that's getting used a lot? 870 // 871 // We don't want to drop cells on the floor, though, so this is good to have. 872 warn!( 873 "{}: having to enqueue cell due to backpressure: {:?}", 874 self.unique_id, cell 875 ); 876 self.outbound.push_back(cell); 877 } 878 Ok(()) 879 } 880 881 /// Wrapper around `send_msg_direct` that uses `futures::future::poll_fn` to get a `Context`. send_msg(&mut self, msg: ChanMsg) -> Result<()>882 async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> { 883 // HACK(eta): technically the closure passed to `poll_fn` is a `FnMut` closure, since it 884 // can be polled multiple times. 885 // We're going to return Ready immediately since we're only using `poll_fn` to 886 // get a `Context`, but the compiler doesn't know that, so use an `Option` 887 // which we can `take()` in order to move out of it. 888 // (if we do get polled again this'll panic, but that shouldn't happen!) 889 let mut msg = Some(msg); 890 futures::future::poll_fn(|cx| -> Poll<Result<()>> { 891 self.send_msg_direct(cx, msg.take().expect("poll_fn called twice?"))?; 892 Poll::Ready(Ok(())) 893 }) 894 .await?; 895 Ok(()) 896 } 897 898 /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop. 899 /// 900 /// Does not check whether the cell is well-formed or reasonable. send_relay_cell( &mut self, cx: &mut Context<'_>, hop: HopNum, early: bool, cell: RelayCell, ) -> Result<()>901 fn send_relay_cell( 902 &mut self, 903 cx: &mut Context<'_>, 904 hop: HopNum, 905 early: bool, 906 cell: RelayCell, 907 ) -> Result<()> { 908 let c_t_w = sendme::cell_counts_towards_windows(&cell); 909 let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into(); 910 let tag = self.crypto_out.encrypt(&mut body, hop)?; 911 let msg = chancell::msg::Relay::from_raw(body.into()); 912 let msg = if early { 913 ChanMsg::RelayEarly(msg) 914 } else { 915 ChanMsg::Relay(msg) 916 }; 917 // If the cell counted towards our sendme window, decrement 918 // that window, and maybe remember the authentication tag. 919 if c_t_w { 920 let hop_num = Into::<usize>::into(hop); 921 let hop = &mut self.hops[hop_num]; 922 if hop.sendwindow.window() == 0 { 923 let cell = ChanCell::new(self.channel_id, msg); 924 // Send window is empty! Push this cell onto the hop's outbound queue, and it'll 925 // get sent later. 926 trace!( 927 "{}: having to use onto hop {} queue for cell: {:?}", 928 self.unique_id, 929 hop_num, 930 cell 931 ); 932 hop.outbound.push_back((*tag, cell)); 933 return Ok(()); 934 } 935 hop.sendwindow.take(tag)?; 936 } 937 self.send_msg_direct(cx, msg) 938 } 939 940 /// Handle a CtrlMsg other than Shutdown. handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()>941 fn handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()> { 942 trace!("{}: reactor received {:?}", self.unique_id, msg); 943 match msg { 944 // This is handled earlier, since it requires blocking. 945 CtrlMsg::Create { .. } => panic!("got a CtrlMsg::Create in handle_control"), 946 // This is handled earlier, since it requires generating a ReactorError. 947 CtrlMsg::Shutdown => panic!("got a CtrlMsg::Shutdown in handle_control"), 948 CtrlMsg::ExtendNtor { 949 public_key, 950 linkspecs, 951 supports_authenticated_sendme, 952 params, 953 done, 954 } => { 955 match CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin( 956 cx, 957 0x02, 958 &public_key, 959 linkspecs, 960 supports_authenticated_sendme, 961 params, 962 self, 963 ) { 964 Ok(e) => { 965 self.meta_handler = Some((Box::new(e), done)); 966 } 967 Err(e) => { 968 let _ = done.send(Err(e)); 969 } 970 }; 971 } 972 CtrlMsg::BeginStream { 973 hop_num, 974 message, 975 sender, 976 rx, 977 done, 978 } => { 979 let ret = self.begin_stream(cx, hop_num, message, sender, rx); 980 let _ = done.send(ret); // don't care if sender goes away 981 } 982 CtrlMsg::SendSendme { stream_id, hop_num } => { 983 let sendme = Sendme::new_empty(); 984 let cell = RelayCell::new(stream_id, sendme.into()); 985 self.send_relay_cell(cx, hop_num, false, cell)?; 986 } 987 #[cfg(test)] 988 CtrlMsg::AddFakeHop { 989 supports_flowctrl_1, 990 fwd_lasthop, 991 rev_lasthop, 992 params, 993 done, 994 } => { 995 use crate::circuit::test::DummyCrypto; 996 997 let fwd = Box::new(DummyCrypto::new(fwd_lasthop)); 998 let rev = Box::new(DummyCrypto::new(rev_lasthop)); 999 self.add_hop(supports_flowctrl_1, fwd, rev, ¶ms); 1000 let _ = done.send(Ok(())); 1001 } 1002 #[cfg(test)] 1003 CtrlMsg::QuerySendWindow { hop, done } => { 1004 let _ = done.send(if let Some(hop) = self.hop_mut(hop) { 1005 Ok(hop.sendwindow.window_and_expected_tags()) 1006 } else { 1007 Err(Error::InternalError( 1008 "received QuerySendWindow for unknown hop".into(), 1009 )) 1010 }); 1011 } 1012 #[cfg(test)] 1013 CtrlMsg::SendRelayCell { hop, early, cell } => { 1014 self.send_relay_cell(cx, hop, early, cell)?; 1015 } 1016 } 1017 Ok(()) 1018 } 1019 1020 /// Start a stream. Creates an entry in the stream map with the given channels, and sends the 1021 /// `message` to the provided hop. begin_stream( &mut self, cx: &mut Context<'_>, hopnum: HopNum, message: RelayMsg, sender: mpsc::Sender<RelayMsg>, rx: mpsc::Receiver<RelayMsg>, ) -> Result<StreamId>1022 fn begin_stream( 1023 &mut self, 1024 cx: &mut Context<'_>, 1025 hopnum: HopNum, 1026 message: RelayMsg, 1027 sender: mpsc::Sender<RelayMsg>, 1028 rx: mpsc::Receiver<RelayMsg>, 1029 ) -> Result<StreamId> { 1030 let hop = self 1031 .hop_mut(hopnum) 1032 .ok_or_else(|| Error::InternalError(format!("No such hop {:?}", hopnum)))?; 1033 let send_window = StreamSendWindow::new(SEND_WINDOW_INIT); 1034 let r = hop.map.add_ent(sender, rx, send_window)?; 1035 let cell = RelayCell::new(r, message); 1036 self.send_relay_cell(cx, hopnum, false, cell)?; 1037 Ok(r) 1038 } 1039 1040 /// Close the stream associated with `id` because the stream was 1041 /// dropped. 1042 /// 1043 /// If we have not already received an END cell on this stream, send one. close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()>1044 fn close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()> { 1045 // Mark the stream as closing. 1046 let hop = self.hop_mut(hopnum).ok_or_else(|| { 1047 Error::InternalError("Tried to close a stream on a hop that wasn't there?".into()) 1048 })?; 1049 1050 let should_send_end = hop.map.terminate(id)?; 1051 trace!( 1052 "{}: Ending stream {}; should_send_end={:?}", 1053 self.unique_id, 1054 id, 1055 should_send_end 1056 ); 1057 // TODO: I am about 80% sure that we only send an END cell if 1058 // we didn't already get an END cell. But I should double-check! 1059 if should_send_end == ShouldSendEnd::Send { 1060 let end_cell = RelayCell::new(id, End::new_misc().into()); 1061 self.send_relay_cell(cx, hopnum, false, end_cell)?; 1062 } 1063 Ok(()) 1064 } 1065 1066 /// Helper: process a cell on a channel. Most cells get ignored 1067 /// or rejected; a few get delivered to circuits. 1068 /// 1069 /// Return true if we should exit. handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result<bool>1070 fn handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result<bool> { 1071 trace!("{}: handling cell: {:?}", self.unique_id, cell); 1072 use ClientCircChanMsg::*; 1073 match cell { 1074 Relay(r) => { 1075 self.handle_relay_cell(cx, r)?; 1076 Ok(false) 1077 } 1078 Destroy(_) => { 1079 self.handle_destroy_cell()?; 1080 Ok(true) 1081 } 1082 } 1083 } 1084 1085 /// React to a Relay or RelayEarly cell. handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<()>1086 fn handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<()> { 1087 let mut body = cell.into_relay_body().into(); 1088 1089 // Decrypt the cell. If it's recognized, then find the 1090 // corresponding hop. 1091 let (hopnum, tag) = self.crypto_in.decrypt(&mut body)?; 1092 // Make a copy of the authentication tag. TODO: I'd rather not 1093 // copy it, but I don't see a way around it right now. 1094 let tag = { 1095 let mut tag_copy = [0_u8; 20]; 1096 // XXXX This could crash if the tag length changes. We'll 1097 // have to refactor it then. 1098 (&mut tag_copy).copy_from_slice(tag); 1099 tag_copy 1100 }; 1101 // Decode the cell. 1102 let msg = RelayCell::decode(body.into())?; 1103 1104 let c_t_w = sendme::cell_counts_towards_windows(&msg); 1105 1106 // Decrement the circuit sendme windows, and see if we need to 1107 // send a sendme cell. 1108 let send_circ_sendme = if c_t_w { 1109 let hop = self 1110 .hop_mut(hopnum) 1111 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?; 1112 hop.recvwindow.take()? 1113 } else { 1114 false 1115 }; 1116 // If we do need to send a circuit-level SENDME cell, do so. 1117 if send_circ_sendme { 1118 let sendme = Sendme::new_tag(tag); 1119 let cell = RelayCell::new(0.into(), sendme.into()); 1120 self.send_relay_cell(cx, hopnum, false, cell)?; 1121 self.hop_mut(hopnum) 1122 .ok_or_else(|| { 1123 Error::InternalError("Trying to send SENDME to nonexistent hop".to_string()) 1124 })? 1125 .recvwindow 1126 .put(); 1127 } 1128 1129 // Break the message apart into its streamID and message. 1130 let (streamid, msg) = msg.into_streamid_and_msg(); 1131 1132 // If this cell wants/refuses to have a Stream ID, does it 1133 // have/not have one? 1134 if !msg.cmd().accepts_streamid_val(streamid) { 1135 return Err(Error::CircProto(format!( 1136 "Invalid stream ID {} for relay command {}", 1137 streamid, 1138 msg.cmd() 1139 ))); 1140 } 1141 1142 // If this has a reasonable streamID value of 0, it's a meta cell, 1143 // not meant for a particular stream. 1144 if streamid.is_zero() { 1145 return self.handle_meta_cell(hopnum, msg); 1146 } 1147 1148 let hop = self 1149 .hop_mut(hopnum) 1150 .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?; 1151 match hop.map.get_mut(streamid) { 1152 Some(StreamEnt::Open { 1153 sink, 1154 send_window, 1155 dropped, 1156 .. 1157 }) => { 1158 // The stream for this message exists, and is open. 1159 1160 if let RelayMsg::Sendme(_) = msg { 1161 // We need to handle sendmes here, not in the stream's 1162 // recv() method, or else we'd never notice them if the 1163 // stream isn't reading. 1164 // FIXME(eta): I think ignoring the must_use return value here is okay, since 1165 // the tag is () anyway? or something??? 1166 let _ = send_window.put(Some(())); 1167 return Ok(()); 1168 } 1169 1170 // Remember whether this was an end cell: if so we should 1171 // close the stream. 1172 let is_end_cell = matches!(msg, RelayMsg::End(_)); 1173 1174 // TODO: Add a wrapper type here to reject cells that should 1175 // never go to a client, like BEGIN. 1176 if let Err(e) = sink.try_send(msg) { 1177 if e.is_full() { 1178 // If we get here, we either have a logic bug (!), or an attacker 1179 // is sending us more cells than we asked for via congestion control. 1180 return Err(Error::CircProto(format!( 1181 "Stream sink would block; received too many cells on stream ID {}", 1182 streamid, 1183 ))); 1184 } 1185 if e.is_disconnected() && c_t_w { 1186 // the other side of the stream has gone away; remember 1187 // that we received a cell that we couldn't queue for it. 1188 // 1189 // Later this value will be recorded in a half-stream. 1190 *dropped += 1; 1191 } 1192 } 1193 if is_end_cell { 1194 hop.map.end_received(streamid)?; 1195 } 1196 } 1197 Some(StreamEnt::EndSent(halfstream)) => { 1198 // We sent an end but maybe the other side hasn't heard. 1199 1200 if matches!(msg, RelayMsg::End(_)) { 1201 hop.map.end_received(streamid)?; 1202 } else { 1203 halfstream.handle_msg(&msg)?; 1204 } 1205 } 1206 _ => { 1207 // No stream wants this message. 1208 return Err(Error::CircProto( 1209 "Cell received on nonexistent stream!?".into(), 1210 )); 1211 } 1212 } 1213 Ok(()) 1214 } 1215 1216 /// Helper: process a destroy cell. 1217 #[allow(clippy::unnecessary_wraps)] handle_destroy_cell(&mut self) -> Result<()>1218 fn handle_destroy_cell(&mut self) -> Result<()> { 1219 // I think there is nothing more to do here. 1220 Ok(()) 1221 } 1222 1223 /// Return the hop corresponding to `hopnum`, if there is one. hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop>1224 fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> { 1225 self.hops.get_mut(Into::<usize>::into(hopnum)) 1226 } 1227 } 1228 1229 impl Drop for Reactor { drop(&mut self)1230 fn drop(&mut self) { 1231 let _ = self.channel.close_circuit(self.channel_id); 1232 } 1233 } 1234 1235 #[cfg(test)] 1236 mod test {} 1237