1 //! Code to handle incoming cells on a channel.
2 //!
3 //! The role of this code is to run in a separate asynchronous task,
4 //! and routes cells to the right circuits.
5 //!
6 //! TODO: I have zero confidence in the close-and-cleanup behavior here,
7 //! or in the error handling behavior.
8 
9 use super::circmap::{CircEnt, CircMap};
10 use super::UniqId;
11 use crate::circuit::halfcirc::HalfCirc;
12 use crate::util::err::ReactorError;
13 use crate::{Error, Result};
14 use tor_cell::chancell::msg::{Destroy, DestroyReason};
15 use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
16 
17 use futures::channel::{mpsc, oneshot};
18 
19 use futures::sink::SinkExt;
20 use futures::stream::Stream;
21 use futures::Sink;
22 
23 use std::convert::TryInto;
24 use std::pin::Pin;
25 use std::sync::atomic::{AtomicBool, Ordering};
26 use std::sync::Arc;
27 use std::task::Poll;
28 
29 use crate::channel::unique_id;
30 use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
31 use tracing::{debug, trace};
32 
33 /// A boxed trait object that can provide `ChanCell`s.
34 pub(super) type BoxedChannelStream =
35     Box<dyn Stream<Item = std::result::Result<ChanCell, tor_cell::Error>> + Send + Unpin + 'static>;
36 /// A boxed trait object that can sink `ChanCell`s.
37 pub(super) type BoxedChannelSink =
38     Box<dyn Sink<ChanCell, Error = tor_cell::Error> + Send + Unpin + 'static>;
39 /// The type of a oneshot channel used to inform reactor users of the result of an operation.
40 pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
41 
42 /// A message telling the channel reactor to do something.
43 #[derive(Debug)]
44 pub(super) enum CtrlMsg {
45     /// Shut down the reactor.
46     Shutdown,
47     /// Tell the reactor that a given circuit has gone away.
48     CloseCircuit(CircId),
49     /// Allocate a new circuit in this channel's circuit map, generating an ID for it
50     /// and registering senders for messages received for the circuit.
51     AllocateCircuit {
52         /// Channel to send the circuit's `CreateResponse` down.
53         created_sender: oneshot::Sender<CreateResponse>,
54         /// Channel to send other messages from this circuit down.
55         sender: mpsc::Sender<ClientCircChanMsg>,
56         /// Oneshot channel to send the new circuit's identifiers down.
57         tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>,
58     },
59 }
60 
61 /// Object to handle incoming cells and background tasks on a channel.
62 ///
63 /// This type is returned when you finish a channel; you need to spawn a
64 /// new task that calls `run()` on it.
65 #[must_use = "If you don't call run() on a reactor, the channel won't work."]
66 pub struct Reactor {
67     /// A receiver for control messages from `Channel` objects.
68     pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
69     /// A receiver for cells to be sent on this reactor's sink.
70     ///
71     /// `Channel` objects have a sender that can send cells here.
72     pub(super) cells: mpsc::Receiver<ChanCell>,
73     /// A Stream from which we can read `ChanCell`s.
74     ///
75     /// This should be backed by a TLS connection if you want it to be secure.
76     pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
77     /// A Sink to which we can write `ChanCell`s.
78     ///
79     /// This should also be backed by a TLS connection if you want it to be secure.
80     pub(super) output: BoxedChannelSink,
81     /// A map from circuit ID to Sinks on which we can deliver cells.
82     pub(super) circs: CircMap,
83     /// Logging identifier for this channel
84     pub(super) unique_id: UniqId,
85     /// If true, this channel is closing.
86     pub(super) closed: Arc<AtomicBool>,
87     /// Context for allocating unique circuit log identifiers.
88     pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
89     /// What link protocol is the channel using?
90     #[allow(dead_code)] // We don't support protocols where this would matter
91     pub(super) link_protocol: u16,
92 }
93 
94 impl Reactor {
95     /// Launch the reactor, and run until the channel closes or we
96     /// encounter an error.
97     ///
98     /// Once this function returns, the channel is dead, and can't be
99     /// used again.
run(mut self) -> Result<()>100     pub async fn run(mut self) -> Result<()> {
101         if self.closed.load(Ordering::SeqCst) {
102             return Err(Error::ChannelClosed);
103         }
104         debug!("{}: Running reactor", self.unique_id);
105         let result: Result<()> = loop {
106             match self.run_once().await {
107                 Ok(()) => (),
108                 Err(ReactorError::Shutdown) => break Ok(()),
109                 Err(ReactorError::Err(e)) => break Err(e),
110             }
111         };
112         debug!("{}: Reactor stopped: {:?}", self.unique_id, result);
113         self.closed.store(true, Ordering::SeqCst);
114         result
115     }
116 
117     /// Helper for run(): handles only one action, and doesn't mark
118     /// the channel closed on finish.
run_once(&mut self) -> std::result::Result<(), ReactorError>119     async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
120         // This is written this way (manually calling poll) for a bunch of reasons:
121         //
122         // - We can only send things onto self.output if poll_ready has returned Ready, so
123         //   we need some custom logic to implement that.
124         // - We probably want to call poll_flush on every reactor iteration, to ensure it continues
125         //   to make progress flushing.
126         // - We also need to do the equivalent of select! between self.cells, self.control, and
127         //   self.input, but with the extra logic bits added above.
128         //
129         // In Rust 2021, it would theoretically be possible to do this with a hybrid mix of select!
130         // and manually implemented poll_fn, but we aren't using that yet. (also, arguably doing
131         // it this way is both less confusing and more flexible).
132         let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> {
133             // We've potentially got three types of thing to deal with in this reactor iteration:
134             let mut cell_to_send = None;
135             let mut control_message = None;
136             let mut input = None;
137 
138             // See if the output sink can have cells written to it yet.
139             if let Poll::Ready(ret) = Pin::new(&mut self.output).poll_ready(cx) {
140                 let _ = ret.map_err(Error::CellErr)?;
141                 // If it can, check whether we have any cells to send it from `Channel` senders.
142                 if let Poll::Ready(msg) = Pin::new(&mut self.cells).poll_next(cx) {
143                     match msg {
144                         x @ Some(..) => cell_to_send = x,
145                         None => {
146                             // cells sender dropped, shut down the reactor!
147                             return Poll::Ready(Err(ReactorError::Shutdown));
148                         }
149                     }
150                 }
151             }
152 
153             // Check whether we've got a control message pending.
154             if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) {
155                 match ret {
156                     None | Some(CtrlMsg::Shutdown) => {
157                         return Poll::Ready(Err(ReactorError::Shutdown))
158                     }
159                     x @ Some(..) => control_message = x,
160                 }
161             }
162 
163             // Check whether we've got any incoming cells.
164             if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) {
165                 match ret {
166                     None => return Poll::Ready(Err(ReactorError::Shutdown)),
167                     Some(r) => input = Some(r.map_err(Error::CellErr)?),
168                 }
169             }
170 
171             // Flush the output sink. We don't actually care about whether it's ready or not;
172             // we just want to keep flushing it (hence the _).
173             let _ = Pin::new(&mut self.output)
174                 .poll_flush(cx)
175                 .map_err(Error::CellErr)?;
176 
177             // If all three values aren't present, return Pending and wait to get polled again
178             // so that one of them is present.
179             if cell_to_send.is_none() && control_message.is_none() && input.is_none() {
180                 return Poll::Pending;
181             }
182             // Otherwise, return the three Options, one of which is going to be Some.
183             Poll::Ready(Ok((cell_to_send, control_message, input)))
184         });
185         let (cell_to_send, control_message, input) = fut.await?;
186         if let Some(ctrl) = control_message {
187             self.handle_control(ctrl).await?;
188         }
189         if let Some(item) = input {
190             crate::note_incoming_traffic();
191             self.handle_cell(item).await?;
192         }
193         if let Some(cts) = cell_to_send {
194             Pin::new(&mut self.output)
195                 .start_send(cts)
196                 .map_err(Error::CellErr)?;
197             // Give the sink a little flush, to make sure it actually starts doing things.
198             futures::future::poll_fn(|cx| Pin::new(&mut self.output).poll_flush(cx))
199                 .await
200                 .map_err(Error::CellErr)?;
201         }
202         Ok(()) // Run again.
203     }
204 
205     /// Handle a CtrlMsg other than Shutdown.
handle_control(&mut self, msg: CtrlMsg) -> Result<()>206     async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
207         trace!("{}: reactor received {:?}", self.unique_id, msg);
208         match msg {
209             CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
210             CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
211             CtrlMsg::AllocateCircuit {
212                 created_sender,
213                 sender,
214                 tx,
215             } => {
216                 let mut rng = rand::thread_rng();
217                 let my_unique_id = self.unique_id;
218                 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
219                 let ret: Result<_> = self
220                     .circs
221                     .add_ent(&mut rng, created_sender, sender)
222                     .map(|id| (id, circ_unique_id));
223                 let _ = tx.send(ret); // don't care about other side going away
224             }
225         }
226         Ok(())
227     }
228 
229     /// Helper: process a cell on a channel.  Most cell types get ignored
230     /// or rejected; a few get delivered to circuits.
handle_cell(&mut self, cell: ChanCell) -> Result<()>231     async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> {
232         let (circid, msg) = cell.into_circid_and_msg();
233         use ChanMsg::*;
234 
235         match msg {
236             Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
237             _ => trace!("{}: received {} for {}", self.unique_id, msg.cmd(), circid),
238         }
239 
240         match msg {
241             // These aren't allowed on clients.
242             Create(_) | CreateFast(_) | Create2(_) | RelayEarly(_) | PaddingNegotiate(_) => Err(
243                 Error::ChanProto(format!("{} cell on client channel", msg.cmd())),
244             ),
245 
246             // In theory this is allowed in clients, but we should never get
247             // one, since we don't use TAP.
248             Created(_) => Err(Error::ChanProto(format!(
249                 "{} cell received, but we never send CREATEs",
250                 msg.cmd()
251             ))),
252 
253             // These aren't allowed after handshaking is done.
254             Versions(_) | Certs(_) | Authorize(_) | Authenticate(_) | AuthChallenge(_)
255             | Netinfo(_) => Err(Error::ChanProto(format!(
256                 "{} cell after handshake is done",
257                 msg.cmd()
258             ))),
259 
260             // These are allowed, and need to be handled.
261             Relay(_) => self.deliver_relay(circid, msg).await,
262 
263             Destroy(_) => self.deliver_destroy(circid, msg).await,
264 
265             CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg).await,
266 
267             // These are always ignored.
268             Padding(_) | VPadding(_) => Ok(()),
269 
270             // Unrecognized cell types should be safe to allow _on channels_,
271             // since they can't propagate.
272             Unrecognized(_) => Ok(()),
273 
274             // tor_cells knows about this type, but we don't.
275             _ => Ok(()),
276         }
277     }
278 
279     /// Give the RELAY cell `msg` to the appropriate circuit.
deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>280     async fn deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
281         match self.circs.get_mut(circid) {
282             Some(CircEnt::Open(s)) => {
283                 // There's an open circuit; we can give it the RELAY cell.
284                 if s.send(msg.try_into()?).await.is_err() {
285                     // The circuit's receiver went away, so we should destroy the circuit.
286                     self.outbound_destroy_circ(circid).await?;
287                 }
288                 Ok(())
289             }
290             Some(CircEnt::Opening(_, _)) => Err(Error::ChanProto(
291                 "Relay cell on pending circuit before CREATED* received".into(),
292             )),
293             Some(CircEnt::DestroySent(hs)) => hs.receive_cell(),
294             None => Err(Error::ChanProto("Relay cell on nonexistent circuit".into())),
295         }
296     }
297 
298     /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
299     /// circuit, if that circuit is waiting for one.
deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>300     async fn deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
301         let target = self.circs.advance_from_opening(circid)?;
302         let created = msg.try_into()?;
303         // XXXX I think that this one actually means the other side
304         // is closed
305         target.send(created).map_err(|_| {
306             Error::InternalError(
307                 "Circuit queue rejected created message. Is it closing? XXX".into(),
308             )
309         })
310     }
311 
312     /// Handle a DESTROY cell by removing the corresponding circuit
313     /// from the map, and passing the destroy cell onward to the circuit.
deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()>314     async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
315         // Remove the circuit from the map: nothing more can be done with it.
316         let entry = self.circs.remove(circid);
317         match entry {
318             // If the circuit is waiting for CREATED, tell it that it
319             // won't get one.
320             Some(CircEnt::Opening(oneshot, _)) => {
321                 trace!(
322                     "{}: Passing destroy to pending circuit {}",
323                     self.unique_id,
324                     circid
325                 );
326                 oneshot
327                     .send(msg.try_into()?)
328                     // XXXX I think that this one actually means the other side
329                     // is closed
330                     .map_err(|_| {
331                         Error::InternalError(
332                             "pending circuit wasn't interested in Destroy cell?".into(),
333                         )
334                     })
335             }
336             // It's an open circuit: tell it that it got a DESTROY cell.
337             Some(CircEnt::Open(mut sink)) => {
338                 trace!(
339                     "{}: Passing destroy to open circuit {}",
340                     self.unique_id,
341                     circid
342                 );
343                 sink.send(msg.try_into()?)
344                     .await
345                     // XXXX I think that this one actually means the other side
346                     // is closed
347                     .map_err(|_| {
348                         Error::InternalError("circuit wasn't interested in destroy cell?".into())
349                     })
350             }
351             // We've sent a destroy; we can leave this circuit removed.
352             Some(CircEnt::DestroySent(_)) => Ok(()),
353             // Got a DESTROY cell for a circuit we don't have.
354             None => {
355                 trace!(
356                     "{}: Destroy for nonexistent circuit {}",
357                     self.unique_id,
358                     circid
359                 );
360                 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
361             }
362         }
363     }
364 
365     /// Helper: send a cell on the outbound sink.
send_cell(&mut self, cell: ChanCell) -> Result<()>366     async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
367         self.output.send(cell).await?;
368         Ok(())
369     }
370 
371     /// Called when a circuit goes away: sends a DESTROY cell and removes
372     /// the circuit.
outbound_destroy_circ(&mut self, id: CircId) -> Result<()>373     async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
374         trace!(
375             "{}: Circuit {} is gone; sending DESTROY",
376             self.unique_id,
377             id
378         );
379         // Remove the circuit's entry from the map: nothing more
380         // can be done with it.
381         // TODO: It would be great to have a tighter upper bound for
382         // the number of relay cells we'll receive.
383         self.circs.destroy_sent(id, HalfCirc::new(3000));
384         let destroy = Destroy::new(DestroyReason::NONE).into();
385         let cell = ChanCell::new(id, destroy);
386         self.send_cell(cell).await?;
387 
388         Ok(())
389     }
390 }
391 
392 #[cfg(test)]
393 pub(crate) mod test {
394     #![allow(clippy::unwrap_used)]
395     use super::*;
396     use crate::circuit::CircParameters;
397     use futures::sink::SinkExt;
398     use futures::stream::StreamExt;
399     use futures::task::SpawnExt;
400 
401     type CodecResult = std::result::Result<ChanCell, tor_cell::Error>;
402 
new_reactor() -> ( crate::channel::Channel, Reactor, mpsc::Receiver<ChanCell>, mpsc::Sender<CodecResult>, )403     pub(crate) fn new_reactor() -> (
404         crate::channel::Channel,
405         Reactor,
406         mpsc::Receiver<ChanCell>,
407         mpsc::Sender<CodecResult>,
408     ) {
409         let link_protocol = 4;
410         let (send1, recv1) = mpsc::channel(32);
411         let (send2, recv2) = mpsc::channel(32);
412         let unique_id = UniqId::new();
413         let ed_id = [6; 32].into();
414         let rsa_id = [10; 20].into();
415         let send1 = send1.sink_map_err(|e| {
416             trace!("got sink error: {}", e);
417             tor_cell::Error::ChanProto("dummy message".into())
418         });
419         let (chan, reactor) = crate::channel::Channel::new(
420             link_protocol,
421             Box::new(send1),
422             Box::new(recv2),
423             unique_id,
424             ed_id,
425             rsa_id,
426         );
427         (chan, reactor, recv1, send2)
428     }
429 
430     // Try shutdown from inside run_once..
431     #[test]
shutdown()432     fn shutdown() {
433         tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
434             let (chan, mut reactor, _output, _input) = new_reactor();
435 
436             chan.terminate();
437             let r = reactor.run_once().await;
438             assert!(matches!(r, Err(ReactorError::Shutdown)));
439         });
440     }
441 
442     // Try shutdown while reactor is running.
443     #[test]
shutdown2()444     fn shutdown2() {
445         tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
446             // TODO: Ask a rust person if this is how to do this.
447 
448             use futures::future::FutureExt;
449             use futures::join;
450 
451             let (chan, reactor, _output, _input) = new_reactor();
452             // Let's get the reactor running...
453             let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
454 
455             let rr = run_reactor.clone();
456 
457             let exit_then_check = async {
458                 assert!(rr.peek().is_none());
459                 // ... and terminate the channel while that's happening.
460                 chan.terminate();
461             };
462 
463             let (rr_s, _) = join!(run_reactor, exit_then_check);
464 
465             // Now let's see. The reactor should not _still_ be running.
466             assert!(rr_s);
467         });
468     }
469 
470     #[test]
new_circ_closed()471     fn new_circ_closed() {
472         tor_rtcompat::test_with_all_runtimes!(|rt| async move {
473             let (chan, mut reactor, mut output, _input) = new_reactor();
474 
475             let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
476             let (pending, circr) = ret.unwrap();
477             rt.spawn(async {
478                 let _ignore = circr.run().await;
479             })
480             .unwrap();
481             assert!(reac.is_ok());
482 
483             let id = pending.peek_circid();
484 
485             let ent = reactor.circs.get_mut(id);
486             assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
487             // Now drop the circuit; this should tell the reactor to remove
488             // the circuit from the map.
489             drop(pending);
490 
491             reactor.run_once().await.unwrap();
492             let ent = reactor.circs.get_mut(id);
493             assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
494             let cell = output.next().await.unwrap();
495             assert_eq!(cell.circid(), id);
496             assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
497         });
498     }
499 
500     // Test proper delivery of a created cell that doesn't make a channel
501     #[test]
502     #[ignore] // See bug #244: re-enable this test once it passes reliably.
new_circ_create_failure()503     fn new_circ_create_failure() {
504         use std::time::Duration;
505         use tor_rtcompat::SleepProvider;
506 
507         tor_rtcompat::test_with_all_runtimes!(|rt| async move {
508             use tor_cell::chancell::msg;
509             let (chan, mut reactor, mut output, mut input) = new_reactor();
510 
511             let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
512             let (pending, circr) = ret.unwrap();
513             rt.spawn(async {
514                 let _ignore = circr.run().await;
515             })
516             .unwrap();
517             assert!(reac.is_ok());
518 
519             let circparams = CircParameters::default();
520 
521             let id = pending.peek_circid();
522 
523             let ent = reactor.circs.get_mut(id);
524             assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
525 
526             #[allow(clippy::clone_on_copy)]
527             let rtc = rt.clone();
528             let send_response = async {
529                 rtc.sleep(Duration::from_millis(100)).await;
530                 trace!("sending createdfast");
531                 // We'll get a bad handshake result from this createdfast cell.
532                 let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
533                 input.send(Ok(created_cell)).await.unwrap();
534                 reactor.run_once().await.unwrap();
535             };
536 
537             let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
538             // Make sure statuses are as expected.
539             assert!(matches!(circ.err().unwrap(), Error::BadHandshake));
540 
541             reactor.run_once().await.unwrap();
542 
543             // Make sure that the createfast cell got sent
544             let cell_sent = output.next().await.unwrap();
545             dbg!(cell_sent.msg());
546             assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
547 
548             // But the next run if the reactor will make the circuit get closed.
549             let ent = reactor.circs.get_mut(id);
550             assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
551         });
552     }
553 
554     // Try incoming cells that shouldn't arrive on channels.
555     #[test]
bad_cells()556     fn bad_cells() {
557         tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
558             use tor_cell::chancell::msg;
559             let (_chan, mut reactor, _output, mut input) = new_reactor();
560 
561             // We shouldn't get create cells, ever.
562             let create_cell = msg::Create2::new(4, *b"hihi").into();
563             input
564                 .send(Ok(ChanCell::new(9.into(), create_cell)))
565                 .await
566                 .unwrap();
567 
568             // shouldn't get created2 cells for nonexistent circuits
569             let created2_cell = msg::Created2::new(*b"hihi").into();
570             input
571                 .send(Ok(ChanCell::new(7.into(), created2_cell)))
572                 .await
573                 .unwrap();
574 
575             let e = reactor.run_once().await.unwrap_err().unwrap_err();
576             assert_eq!(
577                 format!("{}", e),
578                 "channel protocol violation: CREATE2 cell on client channel"
579             );
580 
581             let e = reactor.run_once().await.unwrap_err().unwrap_err();
582             assert_eq!(
583                 format!("{}", e),
584                 "channel protocol violation: Unexpected CREATED* cell not on opening circuit"
585             );
586 
587             // Can't get a relay cell on a circuit we've never heard of.
588             let relay_cell = msg::Relay::new(b"abc").into();
589             input
590                 .send(Ok(ChanCell::new(4.into(), relay_cell)))
591                 .await
592                 .unwrap();
593             let e = reactor.run_once().await.unwrap_err().unwrap_err();
594             assert_eq!(
595                 format!("{}", e),
596                 "channel protocol violation: Relay cell on nonexistent circuit"
597             );
598 
599             // Can't get handshaking cells while channel is open.
600             let versions_cell = msg::Versions::new([3]).unwrap().into();
601             input
602                 .send(Ok(ChanCell::new(0.into(), versions_cell)))
603                 .await
604                 .unwrap();
605             let e = reactor.run_once().await.unwrap_err().unwrap_err();
606             assert_eq!(
607                 format!("{}", e),
608                 "channel protocol violation: VERSIONS cell after handshake is done"
609             );
610 
611             // We don't accept CREATED.
612             let created_cell = msg::Created::new(&b"xyzzy"[..]).into();
613             input
614                 .send(Ok(ChanCell::new(25.into(), created_cell)))
615                 .await
616                 .unwrap();
617             let e = reactor.run_once().await.unwrap_err().unwrap_err();
618             assert_eq!(
619                 format!("{}", e),
620                 "channel protocol violation: CREATED cell received, but we never send CREATEs"
621             );
622         });
623     }
624 
625     #[test]
deliver_relay()626     fn deliver_relay() {
627         tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
628             use crate::circuit::celltypes::ClientCircChanMsg;
629             use futures::channel::oneshot;
630             use tor_cell::chancell::msg;
631 
632             let (_chan, mut reactor, _output, mut input) = new_reactor();
633 
634             let (_circ_stream_7, mut circ_stream_13) = {
635                 let (snd1, _rcv1) = oneshot::channel();
636                 let (snd2, rcv2) = mpsc::channel(64);
637                 reactor
638                     .circs
639                     .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
640 
641                 let (snd3, rcv3) = mpsc::channel(64);
642                 reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
643 
644                 reactor
645                     .circs
646                     .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
647                 (rcv2, rcv3)
648             };
649 
650             // If a relay cell is sent on an open channel, the correct circuit
651             // should get it.
652             let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into();
653             input
654                 .send(Ok(ChanCell::new(13.into(), relaycell.clone())))
655                 .await
656                 .unwrap();
657             reactor.run_once().await.unwrap();
658             let got = circ_stream_13.next().await.unwrap();
659             assert!(matches!(got, ClientCircChanMsg::Relay(_)));
660 
661             // If a relay cell is sent on an opening channel, that's an error.
662             input
663                 .send(Ok(ChanCell::new(7.into(), relaycell.clone())))
664                 .await
665                 .unwrap();
666             let e = reactor.run_once().await.unwrap_err().unwrap_err();
667             assert_eq!(
668             format!("{}", e),
669             "channel protocol violation: Relay cell on pending circuit before CREATED* received"
670         );
671 
672             // If a relay cell is sent on a non-existent channel, that's an error.
673             input
674                 .send(Ok(ChanCell::new(101.into(), relaycell.clone())))
675                 .await
676                 .unwrap();
677             let e = reactor.run_once().await.unwrap_err().unwrap_err();
678             assert_eq!(
679                 format!("{}", e),
680                 "channel protocol violation: Relay cell on nonexistent circuit"
681             );
682 
683             // It's fine to get a relay cell on a DestroySent channel: that happens
684             // when the other side hasn't noticed the Destroy yet.
685 
686             // We can do this 25 more times according to our setup:
687             for _ in 0..25 {
688                 input
689                     .send(Ok(ChanCell::new(23.into(), relaycell.clone())))
690                     .await
691                     .unwrap();
692                 reactor.run_once().await.unwrap(); // should be fine.
693             }
694 
695             // This one will fail.
696             input
697                 .send(Ok(ChanCell::new(23.into(), relaycell.clone())))
698                 .await
699                 .unwrap();
700             let e = reactor.run_once().await.unwrap_err().unwrap_err();
701             assert_eq!(
702                 format!("{}", e),
703                 "channel protocol violation: Too many cells received on destroyed circuit"
704             );
705         });
706     }
707 
708     #[test]
deliver_destroy()709     fn deliver_destroy() {
710         tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
711             use crate::circuit::celltypes::*;
712             use futures::channel::oneshot;
713             use tor_cell::chancell::msg;
714 
715             let (_chan, mut reactor, _output, mut input) = new_reactor();
716 
717             let (circ_oneshot_7, mut circ_stream_13) = {
718                 let (snd1, rcv1) = oneshot::channel();
719                 let (snd2, _rcv2) = mpsc::channel(64);
720                 reactor
721                     .circs
722                     .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
723 
724                 let (snd3, rcv3) = mpsc::channel(64);
725                 reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
726 
727                 reactor
728                     .circs
729                     .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
730                 (rcv1, rcv3)
731             };
732 
733             // Destroying an opening circuit is fine.
734             let destroycell: ChanMsg = msg::Destroy::new(0.into()).into();
735             input
736                 .send(Ok(ChanCell::new(7.into(), destroycell.clone())))
737                 .await
738                 .unwrap();
739             reactor.run_once().await.unwrap();
740             let msg = circ_oneshot_7.await;
741             assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
742 
743             // Destroying an open circuit is fine.
744             input
745                 .send(Ok(ChanCell::new(13.into(), destroycell.clone())))
746                 .await
747                 .unwrap();
748             reactor.run_once().await.unwrap();
749             let msg = circ_stream_13.next().await.unwrap();
750             assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
751 
752             // Destroying a DestroySent circuit is fine.
753             input
754                 .send(Ok(ChanCell::new(23.into(), destroycell.clone())))
755                 .await
756                 .unwrap();
757             reactor.run_once().await.unwrap();
758 
759             // Destroying a nonexistent circuit is an error.
760             input
761                 .send(Ok(ChanCell::new(101.into(), destroycell.clone())))
762                 .await
763                 .unwrap();
764             let e = reactor.run_once().await.unwrap_err().unwrap_err();
765             assert_eq!(
766                 format!("{}", e),
767                 "channel protocol violation: Destroy for nonexistent circuit"
768             );
769         });
770     }
771 }
772