1 //! Code to handle incoming cells on a channel. 2 //! 3 //! The role of this code is to run in a separate asynchronous task, 4 //! and routes cells to the right circuits. 5 //! 6 //! TODO: I have zero confidence in the close-and-cleanup behavior here, 7 //! or in the error handling behavior. 8 9 use super::circmap::{CircEnt, CircMap}; 10 use super::UniqId; 11 use crate::circuit::halfcirc::HalfCirc; 12 use crate::util::err::ReactorError; 13 use crate::{Error, Result}; 14 use tor_cell::chancell::msg::{Destroy, DestroyReason}; 15 use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId}; 16 17 use futures::channel::{mpsc, oneshot}; 18 19 use futures::sink::SinkExt; 20 use futures::stream::Stream; 21 use futures::Sink; 22 23 use std::convert::TryInto; 24 use std::pin::Pin; 25 use std::sync::atomic::{AtomicBool, Ordering}; 26 use std::sync::Arc; 27 use std::task::Poll; 28 29 use crate::channel::unique_id; 30 use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse}; 31 use tracing::{debug, trace}; 32 33 /// A boxed trait object that can provide `ChanCell`s. 34 pub(super) type BoxedChannelStream = 35 Box<dyn Stream<Item = std::result::Result<ChanCell, tor_cell::Error>> + Send + Unpin + 'static>; 36 /// A boxed trait object that can sink `ChanCell`s. 37 pub(super) type BoxedChannelSink = 38 Box<dyn Sink<ChanCell, Error = tor_cell::Error> + Send + Unpin + 'static>; 39 /// The type of a oneshot channel used to inform reactor users of the result of an operation. 40 pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>; 41 42 /// A message telling the channel reactor to do something. 43 #[derive(Debug)] 44 pub(super) enum CtrlMsg { 45 /// Shut down the reactor. 46 Shutdown, 47 /// Tell the reactor that a given circuit has gone away. 48 CloseCircuit(CircId), 49 /// Allocate a new circuit in this channel's circuit map, generating an ID for it 50 /// and registering senders for messages received for the circuit. 51 AllocateCircuit { 52 /// Channel to send the circuit's `CreateResponse` down. 53 created_sender: oneshot::Sender<CreateResponse>, 54 /// Channel to send other messages from this circuit down. 55 sender: mpsc::Sender<ClientCircChanMsg>, 56 /// Oneshot channel to send the new circuit's identifiers down. 57 tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>, 58 }, 59 } 60 61 /// Object to handle incoming cells and background tasks on a channel. 62 /// 63 /// This type is returned when you finish a channel; you need to spawn a 64 /// new task that calls `run()` on it. 65 #[must_use = "If you don't call run() on a reactor, the channel won't work."] 66 pub struct Reactor { 67 /// A receiver for control messages from `Channel` objects. 68 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>, 69 /// A receiver for cells to be sent on this reactor's sink. 70 /// 71 /// `Channel` objects have a sender that can send cells here. 72 pub(super) cells: mpsc::Receiver<ChanCell>, 73 /// A Stream from which we can read `ChanCell`s. 74 /// 75 /// This should be backed by a TLS connection if you want it to be secure. 76 pub(super) input: futures::stream::Fuse<BoxedChannelStream>, 77 /// A Sink to which we can write `ChanCell`s. 78 /// 79 /// This should also be backed by a TLS connection if you want it to be secure. 80 pub(super) output: BoxedChannelSink, 81 /// A map from circuit ID to Sinks on which we can deliver cells. 82 pub(super) circs: CircMap, 83 /// Logging identifier for this channel 84 pub(super) unique_id: UniqId, 85 /// If true, this channel is closing. 86 pub(super) closed: Arc<AtomicBool>, 87 /// Context for allocating unique circuit log identifiers. 88 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext, 89 /// What link protocol is the channel using? 90 #[allow(dead_code)] // We don't support protocols where this would matter 91 pub(super) link_protocol: u16, 92 } 93 94 impl Reactor { 95 /// Launch the reactor, and run until the channel closes or we 96 /// encounter an error. 97 /// 98 /// Once this function returns, the channel is dead, and can't be 99 /// used again. run(mut self) -> Result<()>100 pub async fn run(mut self) -> Result<()> { 101 if self.closed.load(Ordering::SeqCst) { 102 return Err(Error::ChannelClosed); 103 } 104 debug!("{}: Running reactor", self.unique_id); 105 let result: Result<()> = loop { 106 match self.run_once().await { 107 Ok(()) => (), 108 Err(ReactorError::Shutdown) => break Ok(()), 109 Err(ReactorError::Err(e)) => break Err(e), 110 } 111 }; 112 debug!("{}: Reactor stopped: {:?}", self.unique_id, result); 113 self.closed.store(true, Ordering::SeqCst); 114 result 115 } 116 117 /// Helper for run(): handles only one action, and doesn't mark 118 /// the channel closed on finish. run_once(&mut self) -> std::result::Result<(), ReactorError>119 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> { 120 // This is written this way (manually calling poll) for a bunch of reasons: 121 // 122 // - We can only send things onto self.output if poll_ready has returned Ready, so 123 // we need some custom logic to implement that. 124 // - We probably want to call poll_flush on every reactor iteration, to ensure it continues 125 // to make progress flushing. 126 // - We also need to do the equivalent of select! between self.cells, self.control, and 127 // self.input, but with the extra logic bits added above. 128 // 129 // In Rust 2021, it would theoretically be possible to do this with a hybrid mix of select! 130 // and manually implemented poll_fn, but we aren't using that yet. (also, arguably doing 131 // it this way is both less confusing and more flexible). 132 let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> { 133 // We've potentially got three types of thing to deal with in this reactor iteration: 134 let mut cell_to_send = None; 135 let mut control_message = None; 136 let mut input = None; 137 138 // See if the output sink can have cells written to it yet. 139 if let Poll::Ready(ret) = Pin::new(&mut self.output).poll_ready(cx) { 140 let _ = ret.map_err(Error::CellErr)?; 141 // If it can, check whether we have any cells to send it from `Channel` senders. 142 if let Poll::Ready(msg) = Pin::new(&mut self.cells).poll_next(cx) { 143 match msg { 144 x @ Some(..) => cell_to_send = x, 145 None => { 146 // cells sender dropped, shut down the reactor! 147 return Poll::Ready(Err(ReactorError::Shutdown)); 148 } 149 } 150 } 151 } 152 153 // Check whether we've got a control message pending. 154 if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) { 155 match ret { 156 None | Some(CtrlMsg::Shutdown) => { 157 return Poll::Ready(Err(ReactorError::Shutdown)) 158 } 159 x @ Some(..) => control_message = x, 160 } 161 } 162 163 // Check whether we've got any incoming cells. 164 if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) { 165 match ret { 166 None => return Poll::Ready(Err(ReactorError::Shutdown)), 167 Some(r) => input = Some(r.map_err(Error::CellErr)?), 168 } 169 } 170 171 // Flush the output sink. We don't actually care about whether it's ready or not; 172 // we just want to keep flushing it (hence the _). 173 let _ = Pin::new(&mut self.output) 174 .poll_flush(cx) 175 .map_err(Error::CellErr)?; 176 177 // If all three values aren't present, return Pending and wait to get polled again 178 // so that one of them is present. 179 if cell_to_send.is_none() && control_message.is_none() && input.is_none() { 180 return Poll::Pending; 181 } 182 // Otherwise, return the three Options, one of which is going to be Some. 183 Poll::Ready(Ok((cell_to_send, control_message, input))) 184 }); 185 let (cell_to_send, control_message, input) = fut.await?; 186 if let Some(ctrl) = control_message { 187 self.handle_control(ctrl).await?; 188 } 189 if let Some(item) = input { 190 crate::note_incoming_traffic(); 191 self.handle_cell(item).await?; 192 } 193 if let Some(cts) = cell_to_send { 194 Pin::new(&mut self.output) 195 .start_send(cts) 196 .map_err(Error::CellErr)?; 197 // Give the sink a little flush, to make sure it actually starts doing things. 198 futures::future::poll_fn(|cx| Pin::new(&mut self.output).poll_flush(cx)) 199 .await 200 .map_err(Error::CellErr)?; 201 } 202 Ok(()) // Run again. 203 } 204 205 /// Handle a CtrlMsg other than Shutdown. handle_control(&mut self, msg: CtrlMsg) -> Result<()>206 async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> { 207 trace!("{}: reactor received {:?}", self.unique_id, msg); 208 match msg { 209 CtrlMsg::Shutdown => panic!(), // was handled in reactor loop. 210 CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?, 211 CtrlMsg::AllocateCircuit { 212 created_sender, 213 sender, 214 tx, 215 } => { 216 let mut rng = rand::thread_rng(); 217 let my_unique_id = self.unique_id; 218 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id); 219 let ret: Result<_> = self 220 .circs 221 .add_ent(&mut rng, created_sender, sender) 222 .map(|id| (id, circ_unique_id)); 223 let _ = tx.send(ret); // don't care about other side going away 224 } 225 } 226 Ok(()) 227 } 228 229 /// Helper: process a cell on a channel. Most cell types get ignored 230 /// or rejected; a few get delivered to circuits. handle_cell(&mut self, cell: ChanCell) -> Result<()>231 async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> { 232 let (circid, msg) = cell.into_circid_and_msg(); 233 use ChanMsg::*; 234 235 match msg { 236 Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log. 237 _ => trace!("{}: received {} for {}", self.unique_id, msg.cmd(), circid), 238 } 239 240 match msg { 241 // These aren't allowed on clients. 242 Create(_) | CreateFast(_) | Create2(_) | RelayEarly(_) | PaddingNegotiate(_) => Err( 243 Error::ChanProto(format!("{} cell on client channel", msg.cmd())), 244 ), 245 246 // In theory this is allowed in clients, but we should never get 247 // one, since we don't use TAP. 248 Created(_) => Err(Error::ChanProto(format!( 249 "{} cell received, but we never send CREATEs", 250 msg.cmd() 251 ))), 252 253 // These aren't allowed after handshaking is done. 254 Versions(_) | Certs(_) | Authorize(_) | Authenticate(_) | AuthChallenge(_) 255 | Netinfo(_) => Err(Error::ChanProto(format!( 256 "{} cell after handshake is done", 257 msg.cmd() 258 ))), 259 260 // These are allowed, and need to be handled. 261 Relay(_) => self.deliver_relay(circid, msg).await, 262 263 Destroy(_) => self.deliver_destroy(circid, msg).await, 264 265 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg).await, 266 267 // These are always ignored. 268 Padding(_) | VPadding(_) => Ok(()), 269 270 // Unrecognized cell types should be safe to allow _on channels_, 271 // since they can't propagate. 272 Unrecognized(_) => Ok(()), 273 274 // tor_cells knows about this type, but we don't. 275 _ => Ok(()), 276 } 277 } 278 279 /// Give the RELAY cell `msg` to the appropriate circuit. deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>280 async fn deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> { 281 match self.circs.get_mut(circid) { 282 Some(CircEnt::Open(s)) => { 283 // There's an open circuit; we can give it the RELAY cell. 284 if s.send(msg.try_into()?).await.is_err() { 285 // The circuit's receiver went away, so we should destroy the circuit. 286 self.outbound_destroy_circ(circid).await?; 287 } 288 Ok(()) 289 } 290 Some(CircEnt::Opening(_, _)) => Err(Error::ChanProto( 291 "Relay cell on pending circuit before CREATED* received".into(), 292 )), 293 Some(CircEnt::DestroySent(hs)) => hs.receive_cell(), 294 None => Err(Error::ChanProto("Relay cell on nonexistent circuit".into())), 295 } 296 } 297 298 /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate 299 /// circuit, if that circuit is waiting for one. deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>300 async fn deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> { 301 let target = self.circs.advance_from_opening(circid)?; 302 let created = msg.try_into()?; 303 // XXXX I think that this one actually means the other side 304 // is closed 305 target.send(created).map_err(|_| { 306 Error::InternalError( 307 "Circuit queue rejected created message. Is it closing? XXX".into(), 308 ) 309 }) 310 } 311 312 /// Handle a DESTROY cell by removing the corresponding circuit 313 /// from the map, and passing the destroy cell onward to the circuit. deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>314 async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> { 315 // Remove the circuit from the map: nothing more can be done with it. 316 let entry = self.circs.remove(circid); 317 match entry { 318 // If the circuit is waiting for CREATED, tell it that it 319 // won't get one. 320 Some(CircEnt::Opening(oneshot, _)) => { 321 trace!( 322 "{}: Passing destroy to pending circuit {}", 323 self.unique_id, 324 circid 325 ); 326 oneshot 327 .send(msg.try_into()?) 328 // XXXX I think that this one actually means the other side 329 // is closed 330 .map_err(|_| { 331 Error::InternalError( 332 "pending circuit wasn't interested in Destroy cell?".into(), 333 ) 334 }) 335 } 336 // It's an open circuit: tell it that it got a DESTROY cell. 337 Some(CircEnt::Open(mut sink)) => { 338 trace!( 339 "{}: Passing destroy to open circuit {}", 340 self.unique_id, 341 circid 342 ); 343 sink.send(msg.try_into()?) 344 .await 345 // XXXX I think that this one actually means the other side 346 // is closed 347 .map_err(|_| { 348 Error::InternalError("circuit wasn't interested in destroy cell?".into()) 349 }) 350 } 351 // We've sent a destroy; we can leave this circuit removed. 352 Some(CircEnt::DestroySent(_)) => Ok(()), 353 // Got a DESTROY cell for a circuit we don't have. 354 None => { 355 trace!( 356 "{}: Destroy for nonexistent circuit {}", 357 self.unique_id, 358 circid 359 ); 360 Err(Error::ChanProto("Destroy for nonexistent circuit".into())) 361 } 362 } 363 } 364 365 /// Helper: send a cell on the outbound sink. send_cell(&mut self, cell: ChanCell) -> Result<()>366 async fn send_cell(&mut self, cell: ChanCell) -> Result<()> { 367 self.output.send(cell).await?; 368 Ok(()) 369 } 370 371 /// Called when a circuit goes away: sends a DESTROY cell and removes 372 /// the circuit. outbound_destroy_circ(&mut self, id: CircId) -> Result<()>373 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> { 374 trace!( 375 "{}: Circuit {} is gone; sending DESTROY", 376 self.unique_id, 377 id 378 ); 379 // Remove the circuit's entry from the map: nothing more 380 // can be done with it. 381 // TODO: It would be great to have a tighter upper bound for 382 // the number of relay cells we'll receive. 383 self.circs.destroy_sent(id, HalfCirc::new(3000)); 384 let destroy = Destroy::new(DestroyReason::NONE).into(); 385 let cell = ChanCell::new(id, destroy); 386 self.send_cell(cell).await?; 387 388 Ok(()) 389 } 390 } 391 392 #[cfg(test)] 393 pub(crate) mod test { 394 #![allow(clippy::unwrap_used)] 395 use super::*; 396 use crate::circuit::CircParameters; 397 use futures::sink::SinkExt; 398 use futures::stream::StreamExt; 399 use futures::task::SpawnExt; 400 401 type CodecResult = std::result::Result<ChanCell, tor_cell::Error>; 402 new_reactor() -> ( crate::channel::Channel, Reactor, mpsc::Receiver<ChanCell>, mpsc::Sender<CodecResult>, )403 pub(crate) fn new_reactor() -> ( 404 crate::channel::Channel, 405 Reactor, 406 mpsc::Receiver<ChanCell>, 407 mpsc::Sender<CodecResult>, 408 ) { 409 let link_protocol = 4; 410 let (send1, recv1) = mpsc::channel(32); 411 let (send2, recv2) = mpsc::channel(32); 412 let unique_id = UniqId::new(); 413 let ed_id = [6; 32].into(); 414 let rsa_id = [10; 20].into(); 415 let send1 = send1.sink_map_err(|e| { 416 trace!("got sink error: {}", e); 417 tor_cell::Error::ChanProto("dummy message".into()) 418 }); 419 let (chan, reactor) = crate::channel::Channel::new( 420 link_protocol, 421 Box::new(send1), 422 Box::new(recv2), 423 unique_id, 424 ed_id, 425 rsa_id, 426 ); 427 (chan, reactor, recv1, send2) 428 } 429 430 // Try shutdown from inside run_once.. 431 #[test] shutdown()432 fn shutdown() { 433 tor_rtcompat::test_with_all_runtimes!(|_rt| async move { 434 let (chan, mut reactor, _output, _input) = new_reactor(); 435 436 chan.terminate(); 437 let r = reactor.run_once().await; 438 assert!(matches!(r, Err(ReactorError::Shutdown))); 439 }); 440 } 441 442 // Try shutdown while reactor is running. 443 #[test] shutdown2()444 fn shutdown2() { 445 tor_rtcompat::test_with_all_runtimes!(|_rt| async move { 446 // TODO: Ask a rust person if this is how to do this. 447 448 use futures::future::FutureExt; 449 use futures::join; 450 451 let (chan, reactor, _output, _input) = new_reactor(); 452 // Let's get the reactor running... 453 let run_reactor = reactor.run().map(|x| x.is_ok()).shared(); 454 455 let rr = run_reactor.clone(); 456 457 let exit_then_check = async { 458 assert!(rr.peek().is_none()); 459 // ... and terminate the channel while that's happening. 460 chan.terminate(); 461 }; 462 463 let (rr_s, _) = join!(run_reactor, exit_then_check); 464 465 // Now let's see. The reactor should not _still_ be running. 466 assert!(rr_s); 467 }); 468 } 469 470 #[test] new_circ_closed()471 fn new_circ_closed() { 472 tor_rtcompat::test_with_all_runtimes!(|rt| async move { 473 let (chan, mut reactor, mut output, _input) = new_reactor(); 474 475 let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); 476 let (pending, circr) = ret.unwrap(); 477 rt.spawn(async { 478 let _ignore = circr.run().await; 479 }) 480 .unwrap(); 481 assert!(reac.is_ok()); 482 483 let id = pending.peek_circid(); 484 485 let ent = reactor.circs.get_mut(id); 486 assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); 487 // Now drop the circuit; this should tell the reactor to remove 488 // the circuit from the map. 489 drop(pending); 490 491 reactor.run_once().await.unwrap(); 492 let ent = reactor.circs.get_mut(id); 493 assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); 494 let cell = output.next().await.unwrap(); 495 assert_eq!(cell.circid(), id); 496 assert!(matches!(cell.msg(), ChanMsg::Destroy(_))); 497 }); 498 } 499 500 // Test proper delivery of a created cell that doesn't make a channel 501 #[test] 502 #[ignore] // See bug #244: re-enable this test once it passes reliably. new_circ_create_failure()503 fn new_circ_create_failure() { 504 use std::time::Duration; 505 use tor_rtcompat::SleepProvider; 506 507 tor_rtcompat::test_with_all_runtimes!(|rt| async move { 508 use tor_cell::chancell::msg; 509 let (chan, mut reactor, mut output, mut input) = new_reactor(); 510 511 let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); 512 let (pending, circr) = ret.unwrap(); 513 rt.spawn(async { 514 let _ignore = circr.run().await; 515 }) 516 .unwrap(); 517 assert!(reac.is_ok()); 518 519 let circparams = CircParameters::default(); 520 521 let id = pending.peek_circid(); 522 523 let ent = reactor.circs.get_mut(id); 524 assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); 525 526 #[allow(clippy::clone_on_copy)] 527 let rtc = rt.clone(); 528 let send_response = async { 529 rtc.sleep(Duration::from_millis(100)).await; 530 trace!("sending createdfast"); 531 // We'll get a bad handshake result from this createdfast cell. 532 let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into()); 533 input.send(Ok(created_cell)).await.unwrap(); 534 reactor.run_once().await.unwrap(); 535 }; 536 537 let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response); 538 // Make sure statuses are as expected. 539 assert!(matches!(circ.err().unwrap(), Error::BadHandshake)); 540 541 reactor.run_once().await.unwrap(); 542 543 // Make sure that the createfast cell got sent 544 let cell_sent = output.next().await.unwrap(); 545 dbg!(cell_sent.msg()); 546 assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_))); 547 548 // But the next run if the reactor will make the circuit get closed. 549 let ent = reactor.circs.get_mut(id); 550 assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); 551 }); 552 } 553 554 // Try incoming cells that shouldn't arrive on channels. 555 #[test] bad_cells()556 fn bad_cells() { 557 tor_rtcompat::test_with_all_runtimes!(|_rt| async move { 558 use tor_cell::chancell::msg; 559 let (_chan, mut reactor, _output, mut input) = new_reactor(); 560 561 // We shouldn't get create cells, ever. 562 let create_cell = msg::Create2::new(4, *b"hihi").into(); 563 input 564 .send(Ok(ChanCell::new(9.into(), create_cell))) 565 .await 566 .unwrap(); 567 568 // shouldn't get created2 cells for nonexistent circuits 569 let created2_cell = msg::Created2::new(*b"hihi").into(); 570 input 571 .send(Ok(ChanCell::new(7.into(), created2_cell))) 572 .await 573 .unwrap(); 574 575 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 576 assert_eq!( 577 format!("{}", e), 578 "channel protocol violation: CREATE2 cell on client channel" 579 ); 580 581 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 582 assert_eq!( 583 format!("{}", e), 584 "channel protocol violation: Unexpected CREATED* cell not on opening circuit" 585 ); 586 587 // Can't get a relay cell on a circuit we've never heard of. 588 let relay_cell = msg::Relay::new(b"abc").into(); 589 input 590 .send(Ok(ChanCell::new(4.into(), relay_cell))) 591 .await 592 .unwrap(); 593 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 594 assert_eq!( 595 format!("{}", e), 596 "channel protocol violation: Relay cell on nonexistent circuit" 597 ); 598 599 // Can't get handshaking cells while channel is open. 600 let versions_cell = msg::Versions::new([3]).unwrap().into(); 601 input 602 .send(Ok(ChanCell::new(0.into(), versions_cell))) 603 .await 604 .unwrap(); 605 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 606 assert_eq!( 607 format!("{}", e), 608 "channel protocol violation: VERSIONS cell after handshake is done" 609 ); 610 611 // We don't accept CREATED. 612 let created_cell = msg::Created::new(&b"xyzzy"[..]).into(); 613 input 614 .send(Ok(ChanCell::new(25.into(), created_cell))) 615 .await 616 .unwrap(); 617 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 618 assert_eq!( 619 format!("{}", e), 620 "channel protocol violation: CREATED cell received, but we never send CREATEs" 621 ); 622 }); 623 } 624 625 #[test] deliver_relay()626 fn deliver_relay() { 627 tor_rtcompat::test_with_all_runtimes!(|_rt| async move { 628 use crate::circuit::celltypes::ClientCircChanMsg; 629 use futures::channel::oneshot; 630 use tor_cell::chancell::msg; 631 632 let (_chan, mut reactor, _output, mut input) = new_reactor(); 633 634 let (_circ_stream_7, mut circ_stream_13) = { 635 let (snd1, _rcv1) = oneshot::channel(); 636 let (snd2, rcv2) = mpsc::channel(64); 637 reactor 638 .circs 639 .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); 640 641 let (snd3, rcv3) = mpsc::channel(64); 642 reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); 643 644 reactor 645 .circs 646 .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); 647 (rcv2, rcv3) 648 }; 649 650 // If a relay cell is sent on an open channel, the correct circuit 651 // should get it. 652 let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into(); 653 input 654 .send(Ok(ChanCell::new(13.into(), relaycell.clone()))) 655 .await 656 .unwrap(); 657 reactor.run_once().await.unwrap(); 658 let got = circ_stream_13.next().await.unwrap(); 659 assert!(matches!(got, ClientCircChanMsg::Relay(_))); 660 661 // If a relay cell is sent on an opening channel, that's an error. 662 input 663 .send(Ok(ChanCell::new(7.into(), relaycell.clone()))) 664 .await 665 .unwrap(); 666 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 667 assert_eq!( 668 format!("{}", e), 669 "channel protocol violation: Relay cell on pending circuit before CREATED* received" 670 ); 671 672 // If a relay cell is sent on a non-existent channel, that's an error. 673 input 674 .send(Ok(ChanCell::new(101.into(), relaycell.clone()))) 675 .await 676 .unwrap(); 677 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 678 assert_eq!( 679 format!("{}", e), 680 "channel protocol violation: Relay cell on nonexistent circuit" 681 ); 682 683 // It's fine to get a relay cell on a DestroySent channel: that happens 684 // when the other side hasn't noticed the Destroy yet. 685 686 // We can do this 25 more times according to our setup: 687 for _ in 0..25 { 688 input 689 .send(Ok(ChanCell::new(23.into(), relaycell.clone()))) 690 .await 691 .unwrap(); 692 reactor.run_once().await.unwrap(); // should be fine. 693 } 694 695 // This one will fail. 696 input 697 .send(Ok(ChanCell::new(23.into(), relaycell.clone()))) 698 .await 699 .unwrap(); 700 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 701 assert_eq!( 702 format!("{}", e), 703 "channel protocol violation: Too many cells received on destroyed circuit" 704 ); 705 }); 706 } 707 708 #[test] deliver_destroy()709 fn deliver_destroy() { 710 tor_rtcompat::test_with_all_runtimes!(|_rt| async move { 711 use crate::circuit::celltypes::*; 712 use futures::channel::oneshot; 713 use tor_cell::chancell::msg; 714 715 let (_chan, mut reactor, _output, mut input) = new_reactor(); 716 717 let (circ_oneshot_7, mut circ_stream_13) = { 718 let (snd1, rcv1) = oneshot::channel(); 719 let (snd2, _rcv2) = mpsc::channel(64); 720 reactor 721 .circs 722 .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); 723 724 let (snd3, rcv3) = mpsc::channel(64); 725 reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); 726 727 reactor 728 .circs 729 .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); 730 (rcv1, rcv3) 731 }; 732 733 // Destroying an opening circuit is fine. 734 let destroycell: ChanMsg = msg::Destroy::new(0.into()).into(); 735 input 736 .send(Ok(ChanCell::new(7.into(), destroycell.clone()))) 737 .await 738 .unwrap(); 739 reactor.run_once().await.unwrap(); 740 let msg = circ_oneshot_7.await; 741 assert!(matches!(msg, Ok(CreateResponse::Destroy(_)))); 742 743 // Destroying an open circuit is fine. 744 input 745 .send(Ok(ChanCell::new(13.into(), destroycell.clone()))) 746 .await 747 .unwrap(); 748 reactor.run_once().await.unwrap(); 749 let msg = circ_stream_13.next().await.unwrap(); 750 assert!(matches!(msg, ClientCircChanMsg::Destroy(_))); 751 752 // Destroying a DestroySent circuit is fine. 753 input 754 .send(Ok(ChanCell::new(23.into(), destroycell.clone()))) 755 .await 756 .unwrap(); 757 reactor.run_once().await.unwrap(); 758 759 // Destroying a nonexistent circuit is an error. 760 input 761 .send(Ok(ChanCell::new(101.into(), destroycell.clone()))) 762 .await 763 .unwrap(); 764 let e = reactor.run_once().await.unwrap_err().unwrap_err(); 765 assert_eq!( 766 format!("{}", e), 767 "channel protocol violation: Destroy for nonexistent circuit" 768 ); 769 }); 770 } 771 } 772