1 //! Code to handle incoming cells on a circuit.
2 use super::streammap::{ShouldSendEnd, StreamEnt};
3 use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
4 use crate::circuit::unique_id::UniqId;
5 use crate::circuit::{
6     sendme, streammap, CircParameters, Create2Wrap, CreateFastWrap, CreateHandshakeWrap,
7 };
8 use crate::crypto::cell::{
9     ClientLayer, CryptInit, HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt,
10     OutboundClientLayer, RelayCellBody, Tor1RelayCrypto,
11 };
12 use crate::util::err::ReactorError;
13 use crate::{Error, Result};
14 use std::collections::VecDeque;
15 use std::marker::PhantomData;
16 use std::pin::Pin;
17 use tor_cell::chancell::msg::{ChanMsg, Relay};
18 use tor_cell::relaycell::msg::{End, RelayMsg, Sendme};
19 use tor_cell::relaycell::{RelayCell, RelayCmd, StreamId};
20 
21 use futures::channel::{mpsc, oneshot};
22 use futures::Sink;
23 use futures::Stream;
24 
25 use std::sync::atomic::{AtomicU8, Ordering};
26 use std::sync::Arc;
27 use std::task::{Context, Poll};
28 
29 use crate::channel::Channel;
30 #[cfg(test)]
31 use crate::circuit::sendme::CircTag;
32 use crate::circuit::sendme::StreamSendWindow;
33 use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
34 use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
35 use tor_cell::chancell;
36 use tor_cell::chancell::{ChanCell, CircId};
37 use tor_linkspec::LinkSpec;
38 use tor_llcrypto::pk;
39 use tracing::{debug, trace, warn};
40 
41 /// Initial value for outbound flow-control window on streams.
42 pub(super) const SEND_WINDOW_INIT: u16 = 500;
43 /// Initial value for inbound flow-control window on streams.
44 pub(super) const RECV_WINDOW_INIT: u16 = 500;
45 /// Size of the buffer used between the reactor and a `StreamReader`.
46 ///
47 /// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
48 ///             get sent more than the receive window anyway!). We might do due to things that
49 ///             don't count towards the window though.
50 pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
51 
52 /// The type of a oneshot channel used to inform reactor users of the result of an operation.
53 pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
54 
55 /// A handshake type, to be used when creating circuit hops.
56 #[derive(Clone, Debug)]
57 pub(super) enum CircuitHandshake {
58     /// Use the CREATE_FAST handshake.
59     CreateFast,
60     /// Use the ntor handshake.
61     Ntor {
62         /// The public key of the relay.
63         public_key: NtorPublicKey,
64         /// The first hop's Ed25519 identity, which is verified against
65         /// the identity held in the circuit's channel.
66         ed_identity: pk::ed25519::Ed25519Identity,
67     },
68 }
69 
70 /// A message telling the reactor to do something.
71 #[derive(Debug)]
72 pub(super) enum CtrlMsg {
73     /// Create the first hop of this circuit.
74     Create {
75         /// A oneshot channel on which we'll receive the creation response.
76         recv_created: oneshot::Receiver<CreateResponse>,
77         /// The handshake type to use for the first hop.
78         handshake: CircuitHandshake,
79         /// Whether the hop supports authenticated SENDME cells.
80         supports_authenticated_sendme: bool,
81         /// Other parameters relevant for circuit creation.
82         params: CircParameters,
83         /// Oneshot channel to notify on completion.
84         done: ReactorResultChannel<()>,
85     },
86     /// Extend a circuit by one hop, using the ntor handshake.
87     ExtendNtor {
88         /// The handshake type to use for this hop.
89         public_key: NtorPublicKey,
90         /// Information about how to connect to the relay we're extending to.
91         linkspecs: Vec<LinkSpec>,
92         /// Whether the hop supports authenticated SENDME cells.
93         supports_authenticated_sendme: bool,
94         /// Other parameters relevant for circuit extension.
95         params: CircParameters,
96         /// Oneshot channel to notify on completion.
97         done: ReactorResultChannel<()>,
98     },
99     /// Begin a stream with the provided hop in this circuit.
100     ///
101     /// Allocates a stream ID, and sends the provided message to that hop.
102     BeginStream {
103         /// The hop number to begin the stream with.
104         hop_num: HopNum,
105         /// The message to send.
106         message: RelayMsg,
107         /// A channel to send messages on this stream down.
108         ///
109         /// This sender shouldn't ever block, because we use congestion control and only send
110         /// SENDME cells once we've read enough out of the other end. If it *does* block, we
111         /// can assume someone is trying to send us more cells than they should, and abort
112         /// the stream.
113         sender: mpsc::Sender<RelayMsg>,
114         /// A channel to receive messages to send on this stream from.
115         rx: mpsc::Receiver<RelayMsg>,
116         /// Oneshot channel to notify on completion, with the allocated stream ID.
117         done: ReactorResultChannel<StreamId>,
118     },
119     /// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
120     SendSendme {
121         /// The stream ID to send a SENDME for.
122         stream_id: StreamId,
123         /// The hop number the stream is on.
124         hop_num: HopNum,
125     },
126     /// Shut down the reactor.
127     Shutdown,
128     /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
129     #[cfg(test)]
130     AddFakeHop {
131         supports_flowctrl_1: bool,
132         fwd_lasthop: bool,
133         rev_lasthop: bool,
134         params: CircParameters,
135         done: ReactorResultChannel<()>,
136     },
137     /// (tests only) Get the send window and expected tags for a given hop.
138     #[cfg(test)]
139     QuerySendWindow {
140         hop: HopNum,
141         done: ReactorResultChannel<(u16, Vec<CircTag>)>,
142     },
143     /// (tests only) Send a raw relay cell with send_relay_cell().
144     #[cfg(test)]
145     SendRelayCell {
146         hop: HopNum,
147         early: bool,
148         cell: RelayCell,
149     },
150 }
151 /// Represents the reactor's view of a single hop.
152 pub(super) struct CircHop {
153     /// Map from stream IDs to streams.
154     ///
155     /// We store this with the reactor instead of the circuit, since the
156     /// reactor needs it for every incoming cell on a stream, whereas
157     /// the circuit only needs it when allocating new streams.
158     map: streammap::StreamMap,
159     /// Window used to say how many cells we can receive.
160     recvwindow: sendme::CircRecvWindow,
161     /// If true, this hop is using an older link protocol and we
162     /// shouldn't expect good authenticated SENDMEs from it.
163     auth_sendme_optional: bool,
164     /// Window used to say how many cells we can send.
165     sendwindow: sendme::CircSendWindow,
166     /// Buffer for messages we can't send to this hop yet due to congestion control.
167     ///
168     /// Contains the tag we should give to the send window, and the cell to send.
169     ///
170     /// This shouldn't grow unboundedly: we try and pop things off it first before
171     /// doing things that would result in it growing (and stop before growing it
172     /// if popping things off it can't be done).
173     outbound: VecDeque<([u8; 20], ChanCell)>,
174 }
175 
176 impl CircHop {
177     /// Create a new hop.
new(auth_sendme_optional: bool, initial_window: u16) -> Self178     pub(super) fn new(auth_sendme_optional: bool, initial_window: u16) -> Self {
179         CircHop {
180             map: streammap::StreamMap::new(),
181             recvwindow: sendme::CircRecvWindow::new(1000),
182             auth_sendme_optional,
183             sendwindow: sendme::CircSendWindow::new(initial_window),
184             outbound: VecDeque::new(),
185         }
186     }
187 }
188 
189 /// An object that's waiting for a meta cell (one not associated with a stream) in order to make
190 /// progress.
191 ///
192 /// # Background
193 ///
194 /// The `Reactor` can't have async functions that send and receive cells, because its job is to
195 /// send and receive cells: if one of its functions tried to do that, it would just hang forever.
196 ///
197 /// To get around this problem, the reactor can send some cells, and then make one of these
198 /// `MetaCellHandler` objects, which will be run when the reply arrives.
199 pub(super) trait MetaCellHandler: Send {
200     /// The hop we're expecting the message to come from. This is compared against the hop
201     /// from which we actually receive messages, and an error is thrown if the two don't match.
expected_hop(&self) -> HopNum202     fn expected_hop(&self) -> HopNum;
203     /// Called when the message we were waiting for arrives.
204     ///
205     /// Gets a copy of the `Reactor` in order to do anything it likes there.
finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>206     fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>;
207 }
208 
209 /// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait.
210 ///
211 /// Yes, I know having trait bounds on structs is bad, but in this case it's necessary
212 /// since we want to be able to use `H::KeyType`.
213 struct CircuitExtender<H, L, FWD, REV>
214 where
215     H: ClientHandshake,
216 {
217     /// Handshake state.
218     state: Option<H::StateType>,
219     /// Whether the hop supports authenticated SENDME cells.
220     supports_flowctrl_1: bool,
221     /// Parameters used for this extension.
222     params: CircParameters,
223     /// An identifier for logging about this reactor's circuit.
224     unique_id: UniqId,
225     /// The hop we're expecting the EXTENDED2 cell to come back from.
226     expected_hop: HopNum,
227     /// `PhantomData` used to make the other type parameters required for a circuit extension
228     /// part of the `struct`, instead of having them be provided during a function call.
229     ///
230     /// This is done this way so we can implement `MetaCellHandler` for this type, which
231     /// doesn't include any generic type parameters; we need them to be part of the type
232     /// so we know what they are for that `impl` block.
233     phantom: PhantomData<(L, FWD, REV)>,
234 }
235 impl<H, L, FWD, REV> CircuitExtender<H, L, FWD, REV>
236 where
237     H: ClientHandshake,
238     H::KeyGen: KeyGenerator,
239     L: CryptInit + ClientLayer<FWD, REV>,
240     FWD: OutboundClientLayer + 'static + Send,
241     REV: InboundClientLayer + 'static + Send,
242 {
243     /// Start extending a circuit, sending the necessary EXTEND cell and returning a
244     /// new `CircuitExtender` to be called when the reply arrives.
245     ///
246     /// The `handshake_id` is the numeric identifier for what kind of
247     /// handshake we're doing.  The `key` is the relay's onion key that
248     /// goes along with the handshake, and the `linkspecs` are the
249     /// link specifiers to include in the EXTEND cell to tell the
250     /// current last hop which relay to connect to.
begin( cx: &mut Context<'_>, handshake_id: u16, key: &H::KeyType, linkspecs: Vec<LinkSpec>, supports_flowctrl_1: bool, params: CircParameters, reactor: &mut Reactor, ) -> Result<Self>251     fn begin(
252         cx: &mut Context<'_>,
253         handshake_id: u16,
254         key: &H::KeyType,
255         linkspecs: Vec<LinkSpec>,
256         supports_flowctrl_1: bool,
257         params: CircParameters,
258         reactor: &mut Reactor,
259     ) -> Result<Self> {
260         let mut rng = rand::thread_rng();
261         let unique_id = reactor.unique_id;
262 
263         use tor_cell::relaycell::msg::{Body, Extend2};
264         // Perform the first part of the cryptographic handshake
265         let (state, msg) = H::client1(&mut rng, key)?;
266 
267         let n_hops = reactor.crypto_out.n_layers();
268         let hop = ((n_hops - 1) as u8).into();
269 
270         debug!(
271             "{}: Extending circuit to hop {} with {:?}",
272             unique_id,
273             n_hops + 1,
274             linkspecs
275         );
276 
277         let extend_msg = Extend2::new(linkspecs, handshake_id, msg);
278         let cell = RelayCell::new(0.into(), extend_msg.into_message());
279 
280         // Send the message to the last hop...
281         reactor.send_relay_cell(
282             cx, hop, true, // use a RELAY_EARLY cell
283             cell,
284         )?;
285         trace!("{}: waiting for EXTENDED2 cell", unique_id);
286         // ... and now we wait for a response.
287 
288         Ok(Self {
289             state: Some(state),
290             supports_flowctrl_1,
291             params,
292             unique_id,
293             expected_hop: hop,
294             phantom: Default::default(),
295         })
296     }
297 }
298 
299 impl<H, L, FWD, REV> MetaCellHandler for CircuitExtender<H, L, FWD, REV>
300 where
301     H: ClientHandshake,
302     H::StateType: Send,
303     H::KeyGen: KeyGenerator,
304     L: CryptInit + ClientLayer<FWD, REV> + Send,
305     FWD: OutboundClientLayer + 'static + Send,
306     REV: InboundClientLayer + 'static + Send,
307 {
expected_hop(&self) -> HopNum308     fn expected_hop(&self) -> HopNum {
309         self.expected_hop
310     }
finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>311     fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()> {
312         // XXXX If two EXTEND cells are of these are launched on the
313         // same circuit at once, could they collide in this part of
314         // the function?  I don't _think_ so, but it might be a good idea
315         // to have an "extending" bit that keeps two tasks from entering
316         // extend_impl at the same time.
317         //
318         // Also we could enforce that `hop` is still what we expect it
319         // to be at this point.
320 
321         // Did we get the right response?
322         if msg.cmd() != RelayCmd::EXTENDED2 {
323             return Err(Error::CircProto(format!(
324                 "wanted EXTENDED2; got {}",
325                 msg.cmd(),
326             )));
327         }
328 
329         // ???? Do we need to shutdown the circuit for the remaining error
330         // ???? cases in this function?
331 
332         let msg = match msg {
333             RelayMsg::Extended2(e) => e,
334             _ => return Err(Error::InternalError("Body didn't match cmd".into())),
335         };
336         let relay_handshake = msg.into_body();
337 
338         trace!(
339             "{}: Received EXTENDED2 cell; completing handshake.",
340             self.unique_id
341         );
342         // Now perform the second part of the handshake, and see if it
343         // succeeded.
344         let keygen = H::client2(
345             self.state
346                 .take()
347                 .expect("CircuitExtender::finish() called twice"),
348             relay_handshake,
349         )?;
350         let layer = L::construct(keygen)?;
351 
352         debug!("{}: Handshake complete; circuit extended.", self.unique_id);
353 
354         // If we get here, it succeeded.  Add a new hop to the circuit.
355         let (layer_fwd, layer_back) = layer.split();
356         reactor.add_hop(
357             self.supports_flowctrl_1,
358             Box::new(layer_fwd),
359             Box::new(layer_back),
360             &self.params,
361         );
362         Ok(())
363     }
364 }
365 
366 /// Object to handle incoming cells and background tasks on a circuit
367 ///
368 /// This type is returned when you finish a circuit; you need to spawn a
369 /// new task that calls `run()` on it.
370 #[must_use = "If you don't call run() on a reactor, the circuit won't work."]
371 pub struct Reactor {
372     /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
373     pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
374     /// Buffer for cells we can't send out the channel yet due to it being full.
375     ///
376     /// This should be used very very rarely: see `send_msg_direct`'s comments for more
377     /// information. (in fact, using it will generate a warning!)
378     pub(super) outbound: VecDeque<ChanCell>,
379     /// The channel this circuit is using to send cells through.
380     pub(super) channel: Channel,
381     /// Input stream, on which we receive ChanMsg objects from this circuit's
382     /// channel.
383     // TODO: could use a SPSC channel here instead.
384     pub(super) input: mpsc::Receiver<ClientCircChanMsg>,
385     /// The cryptographic state for this circuit for inbound cells.
386     /// This object is divided into multiple layers, each of which is
387     /// shared with one hop of the circuit.
388     pub(super) crypto_in: InboundClientCrypt,
389     /// The cryptographic state for this circuit for outbound cells.
390     pub(super) crypto_out: OutboundClientCrypt,
391     /// List of hops state objects used by the reactor
392     pub(super) hops: Vec<CircHop>,
393     /// Shared atomic for the number of hops this circuit has.
394     pub(super) num_hops: Arc<AtomicU8>,
395     /// An identifier for logging about this reactor's circuit.
396     pub(super) unique_id: UniqId,
397     /// This circuit's identifier on the upstream channel.
398     pub(super) channel_id: CircId,
399     /// A handler for a meta cell, together with a result channel to notify on completion.
400     pub(super) meta_handler: Option<(Box<dyn MetaCellHandler>, ReactorResultChannel<()>)>,
401 }
402 
403 impl Reactor {
404     /// Launch the reactor, and run until the circuit closes or we
405     /// encounter an error.
406     ///
407     /// Once this method returns, the circuit is dead and cannot be
408     /// used again.
run(mut self) -> Result<()>409     pub async fn run(mut self) -> Result<()> {
410         trace!("{}: Running circuit reactor", self.unique_id);
411         let result: Result<()> = loop {
412             match self.run_once().await {
413                 Ok(()) => (),
414                 Err(ReactorError::Shutdown) => break Ok(()),
415                 Err(ReactorError::Err(e)) => break Err(e),
416             }
417         };
418         debug!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
419         result
420     }
421 
422     /// Helper for run: doesn't mark the circuit closed on finish.  Only
423     /// processes one cell or control message.
run_once(&mut self) -> std::result::Result<(), ReactorError>424     pub(super) async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
425         #[allow(clippy::cognitive_complexity)]
426         let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> {
427             let mut create_message = None;
428             let mut did_things = false;
429 
430             // Check whether we've got a control message pending.
431             if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) {
432                 match ret {
433                     None => {
434                         trace!("{}: reactor shutdown due to control drop", self.unique_id);
435                         return Poll::Ready(Err(ReactorError::Shutdown));
436                     }
437                     Some(CtrlMsg::Shutdown) => {
438                         trace!(
439                             "{}: reactor shutdown due to explicit request",
440                             self.unique_id
441                         );
442                         return Poll::Ready(Err(ReactorError::Shutdown));
443                     }
444                     // This message requires actually blocking, so we can't handle it inside
445                     // this nonblocking poll_fn.
446                     Some(x @ CtrlMsg::Create { .. }) => create_message = Some(x),
447                     Some(msg) => {
448                         self.handle_control(cx, msg)?;
449                         did_things = true;
450                     }
451                 }
452             }
453 
454             // Check whether we've got an input message pending.
455             if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) {
456                 match ret {
457                     None => {
458                         trace!("{}: reactor shutdown due to input drop", self.unique_id);
459                         return Poll::Ready(Err(ReactorError::Shutdown));
460                     }
461                     Some(cell) => {
462                         if self.handle_cell(cx, cell)? {
463                             trace!("{}: reactor shutdown due to handled cell", self.unique_id);
464                             return Poll::Ready(Err(ReactorError::Shutdown));
465                         }
466                         did_things = true;
467                     }
468                 }
469             }
470 
471             // Now for the tricky part. We want to grab some relay cells from all of our streams
472             // and forward them on to the channel, but we need to pay attention to both whether
473             // the channel can accept cells right now, and whether congestion control allows us
474             // to send them.
475             //
476             // We also have to do somewhat cursed things and call start_send inside this poll_fn,
477             // since we need to check whether the channel can still receive cells after each one
478             // that we send.
479 
480             let mut streams_to_close = vec![];
481             let mut stream_relaycells = vec![];
482 
483             // Is the channel ready to receive anything at all?
484             if self.channel.poll_ready(cx)? {
485                 // (using this as a named block for early returns; not actually a loop)
486                 #[allow(clippy::never_loop)]
487                 'outer: loop {
488                     // First, drain our queue of things we tried to send earlier, but couldn't.
489                     while let Some(msg) = self.outbound.pop_front() {
490                         trace!("{}: sending from enqueued: {:?}", self.unique_id, msg);
491                         Pin::new(&mut self.channel).start_send(msg)?;
492 
493                         // `futures::Sink::start_send` dictates we need to call `poll_ready` before
494                         // each `start_send` call.
495                         if !self.channel.poll_ready(cx)? {
496                             break 'outer;
497                         }
498                     }
499 
500                     // Let's look at our hops, and streams for each hop.
501                     for (i, hop) in self.hops.iter_mut().enumerate() {
502                         let hop_num = HopNum::from(i as u8);
503                         // If we can, drain our queue of things we tried to send earlier, but
504                         // couldn't due to congestion control.
505                         if hop.sendwindow.window() > 0 {
506                             'hop: while let Some((tag, msg)) = hop.outbound.pop_front() {
507                                 trace!(
508                                     "{}: sending from hop-{}-enqueued: {:?}",
509                                     self.unique_id,
510                                     i,
511                                     msg
512                                 );
513                                 Pin::new(&mut self.channel).start_send(msg)?;
514                                 hop.sendwindow.take(&tag)?;
515                                 if !self.channel.poll_ready(cx)? {
516                                     break 'outer;
517                                 }
518                                 if hop.sendwindow.window() == 0 {
519                                     break 'hop;
520                                 }
521                             }
522                         }
523                         // Look at all of the streams on this hop.
524                         for (id, stream) in hop.map.inner().iter_mut() {
525                             if let StreamEnt::Open {
526                                 rx, send_window, ..
527                             } = stream
528                             {
529                                 // Do the stream and hop send windows allow us to obtain and
530                                 // send something?
531                                 //
532                                 // FIXME(eta): not everything counts toward congestion control!
533                                 if send_window.window() > 0 && hop.sendwindow.window() > 0 {
534                                     match Pin::new(rx).poll_next(cx) {
535                                         Poll::Ready(Some(m)) => {
536                                             stream_relaycells
537                                                 .push((hop_num, RelayCell::new(*id, m)));
538                                         }
539                                         Poll::Ready(None) => {
540                                             // Stream receiver was dropped; close the stream.
541                                             // We can't close it here though due to borrowck; that
542                                             // will happen later.
543                                             streams_to_close.push((hop_num, *id));
544                                         }
545                                         Poll::Pending => {}
546                                     }
547                                 }
548                             }
549                         }
550                     }
551 
552                     break;
553                 }
554             }
555 
556             // Close the streams we said we'd close.
557             for (hopn, id) in streams_to_close {
558                 self.close_stream(cx, hopn, id)?;
559                 did_things = true;
560             }
561             // Send messages we said we'd send.
562             for (hopn, rc) in stream_relaycells {
563                 self.send_relay_cell(cx, hopn, false, rc)?;
564                 did_things = true;
565             }
566 
567             let _ = Pin::new(&mut self.channel)
568                 .poll_flush(cx)
569                 .map_err(|_| Error::ChannelClosed)?;
570             if create_message.is_some() {
571                 Poll::Ready(Ok(create_message))
572             } else if did_things {
573                 Poll::Ready(Ok(None))
574             } else {
575                 Poll::Pending
576             }
577         });
578         let create_message = fut.await?;
579         if let Some(CtrlMsg::Create {
580             recv_created,
581             handshake,
582             supports_authenticated_sendme,
583             params,
584             done,
585         }) = create_message
586         {
587             let ret = match handshake {
588                 CircuitHandshake::CreateFast => {
589                     self.create_firsthop_fast(recv_created, &params).await
590                 }
591                 CircuitHandshake::Ntor {
592                     public_key,
593                     ed_identity,
594                 } => {
595                     self.create_firsthop_ntor(
596                         recv_created,
597                         ed_identity,
598                         public_key,
599                         supports_authenticated_sendme,
600                         &params,
601                     )
602                     .await
603                 }
604             };
605             let _ = done.send(ret); // don't care if sender goes away
606             futures::future::poll_fn(|cx| -> Poll<Result<()>> {
607                 let _ = Pin::new(&mut self.channel)
608                     .poll_flush(cx)
609                     .map_err(|_| Error::ChannelClosed)?;
610                 Poll::Ready(Ok(()))
611             })
612             .await?;
613         }
614         Ok(())
615     }
616 
617     /// Helper: create the first hop of a circuit.
618     ///
619     /// This is parameterized not just on the RNG, but a wrapper object to
620     /// build the right kind of create cell, a handshake object to perform
621     /// the cryptographic cryptographic handshake, and a layer type to
622     /// handle relay crypto after this hop is built.
create_impl<L, FWD, REV, H, W>( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, wrap: &W, key: &H::KeyType, supports_flowctrl_1: bool, params: &CircParameters, ) -> Result<()> where L: CryptInit + ClientLayer<FWD, REV> + 'static + Send, FWD: OutboundClientLayer + 'static + Send, REV: InboundClientLayer + 'static + Send, H: ClientHandshake, W: CreateHandshakeWrap, H::KeyGen: KeyGenerator,623     async fn create_impl<L, FWD, REV, H, W>(
624         &mut self,
625         recvcreated: oneshot::Receiver<CreateResponse>,
626         wrap: &W,
627         key: &H::KeyType,
628         supports_flowctrl_1: bool,
629         params: &CircParameters,
630     ) -> Result<()>
631     where
632         L: CryptInit + ClientLayer<FWD, REV> + 'static + Send, // need all this?XXXX
633         FWD: OutboundClientLayer + 'static + Send,
634         REV: InboundClientLayer + 'static + Send,
635         H: ClientHandshake,
636         W: CreateHandshakeWrap,
637         H::KeyGen: KeyGenerator,
638     {
639         // We don't need to shut down the circuit on failure here, since this
640         // function consumes the PendingClientCirc and only returns
641         // a ClientCirc on success.
642 
643         let (state, msg) = {
644             // done like this because holding the RNG across an await boundary makes the future
645             // non-Send
646             let mut rng = rand::thread_rng();
647             H::client1(&mut rng, key)?
648         };
649         let create_cell = wrap.to_chanmsg(msg);
650         debug!(
651             "{}: Extending to hop 1 with {}",
652             self.unique_id,
653             create_cell.cmd()
654         );
655         self.send_msg(create_cell).await?;
656 
657         let reply = recvcreated
658             .await
659             .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
660 
661         let relay_handshake = wrap.from_chanmsg(reply)?;
662         let keygen = H::client2(state, relay_handshake)?;
663 
664         let layer = L::construct(keygen)?;
665 
666         debug!("{}: Handshake complete; circuit created.", self.unique_id);
667 
668         let (layer_fwd, layer_back) = layer.split();
669         self.add_hop(
670             supports_flowctrl_1,
671             Box::new(layer_fwd),
672             Box::new(layer_back),
673             params,
674         );
675         Ok(())
676     }
677 
678     /// Use the (questionable!) CREATE_FAST handshake to connect to the
679     /// first hop of this circuit.
680     ///
681     /// There's no authentication in CREATE_FAST,
682     /// so we don't need to know whom we're connecting to: we're just
683     /// connecting to whichever relay the channel is for.
create_firsthop_fast( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, params: &CircParameters, ) -> Result<()>684     async fn create_firsthop_fast(
685         &mut self,
686         recvcreated: oneshot::Receiver<CreateResponse>,
687         params: &CircParameters,
688     ) -> Result<()> {
689         use crate::crypto::handshake::fast::CreateFastClient;
690         let wrap = CreateFastWrap;
691         self.create_impl::<Tor1RelayCrypto, _, _, CreateFastClient, _>(
692             recvcreated,
693             &wrap,
694             &(),
695             false,
696             params,
697         )
698         .await
699     }
700 
701     /// Use the ntor handshake to connect to the first hop of this circuit.
702     ///
703     /// Note that the provided 'target' must match the channel's target,
704     /// or the handshake will fail.
create_firsthop_ntor( &mut self, recvcreated: oneshot::Receiver<CreateResponse>, ed_identity: pk::ed25519::Ed25519Identity, pubkey: NtorPublicKey, supports_flowctrl_1: bool, params: &CircParameters, ) -> Result<()>705     async fn create_firsthop_ntor(
706         &mut self,
707         recvcreated: oneshot::Receiver<CreateResponse>,
708         ed_identity: pk::ed25519::Ed25519Identity,
709         pubkey: NtorPublicKey,
710         supports_flowctrl_1: bool,
711         params: &CircParameters,
712     ) -> Result<()> {
713         // Exit now if we have an Ed25519 or RSA identity mismatch.
714         // FIXME(eta): this is copypasta from Channel::check_match!
715         if self.channel.peer_rsa_id() != &pubkey.id {
716             return Err(Error::ChanMismatch(format!(
717                 "Identity {} does not match target {}",
718                 self.channel.peer_rsa_id(),
719                 pubkey.id,
720             )));
721         }
722         if self.channel.peer_ed25519_id() != &ed_identity {
723             return Err(Error::ChanMismatch(format!(
724                 "Identity {} does not match target {}",
725                 self.channel.peer_ed25519_id(),
726                 ed_identity
727             )));
728         }
729 
730         let wrap = Create2Wrap {
731             handshake_type: 0x0002, // ntor
732         };
733         self.create_impl::<Tor1RelayCrypto, _, _, NtorClient, _>(
734             recvcreated,
735             &wrap,
736             &pubkey,
737             supports_flowctrl_1,
738             params,
739         )
740         .await
741     }
742 
743     /// Add a hop to the end of this circuit.
add_hop( &mut self, supports_flowctrl_1: bool, fwd: Box<dyn OutboundClientLayer + 'static + Send>, rev: Box<dyn InboundClientLayer + 'static + Send>, params: &CircParameters, )744     fn add_hop(
745         &mut self,
746         supports_flowctrl_1: bool,
747         fwd: Box<dyn OutboundClientLayer + 'static + Send>,
748         rev: Box<dyn InboundClientLayer + 'static + Send>,
749         params: &CircParameters,
750     ) {
751         let hop = crate::circuit::reactor::CircHop::new(
752             supports_flowctrl_1,
753             params.initial_send_window(),
754         );
755         self.hops.push(hop);
756         self.crypto_in.add_layer(rev);
757         self.crypto_out.add_layer(fwd);
758         self.num_hops.fetch_add(1, Ordering::SeqCst);
759     }
760 
761     /// Handle a RELAY cell on this circuit with stream ID 0.
handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()>762     fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()> {
763         // SENDME cells and TRUNCATED get handled internally by the circuit.
764         if let RelayMsg::Sendme(s) = msg {
765             return self.handle_sendme(hopnum, s);
766         }
767         if let RelayMsg::Truncated(_) = msg {
768             // XXXX need to handle Truncated cells. This isn't the right
769             // way, but at least it's safe.
770             // TODO: If we ever do handle Truncate cells more
771             // correctly, we will need to audit all our use of HopNum
772             // to identify a layer.  Otherwise we could confuse a
773             // message from the previous hop N with a message from the
774             // new hop N.
775             return Err(Error::CircuitClosed);
776         }
777 
778         trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
779 
780         // For all other command types, we'll only get them in response
781         // to another command, which should have registered a responder.
782         //
783         // TODO: that means that service-introduction circuits will need
784         // a different implementation, but that should be okay. We'll work
785         // something out.
786         if let Some((mut handler, done)) = self.meta_handler.take() {
787             if handler.expected_hop() == hopnum {
788                 // Somebody was waiting for a message -- maybe this message
789                 let ret = handler.finish(msg, self);
790                 trace!(
791                     "{}: meta handler completed with result: {:?}",
792                     self.unique_id,
793                     ret
794                 );
795                 let _ = done.send(ret); // don't care if sender goes away
796                 Ok(())
797             } else {
798                 // Somebody wanted a message from a different hop!  Put this
799                 // one back.
800                 self.meta_handler = Some((handler, done));
801                 Err(Error::CircProto(format!(
802                     "Unexpected {} cell from hop {} on client circuit",
803                     msg.cmd(),
804                     hopnum,
805                 )))
806             }
807         } else {
808             // No need to call shutdown here, since this error will
809             // propagate to the reactor shut it down.
810             Err(Error::CircProto(format!(
811                 "Unexpected {} cell on client circuit",
812                 msg.cmd()
813             )))
814         }
815     }
816 
817     /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()>818     fn handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()> {
819         // No need to call "shutdown" on errors in this function;
820         // it's called from the reactor task and errors will propagate there.
821         let hop = self
822             .hop_mut(hopnum)
823             .ok_or_else(|| Error::CircProto(format!("Couldn't find {} hop", hopnum)))?;
824 
825         let auth: Option<[u8; 20]> = match msg.into_tag() {
826             Some(v) if v.len() == 20 => {
827                 // XXXX ugly code.
828                 let mut tag = [0_u8; 20];
829                 (&mut tag).copy_from_slice(&v[..]);
830                 Some(tag)
831             }
832             Some(_) => return Err(Error::CircProto("malformed tag on circuit sendme".into())),
833             None => {
834                 if !hop.auth_sendme_optional {
835                     return Err(Error::CircProto("missing tag on circuit sendme".into()));
836                 } else {
837                     None
838                 }
839             }
840         };
841         match hop.sendwindow.put(auth) {
842             Some(_) => Ok(()),
843             None => Err(Error::CircProto("bad auth tag on circuit sendme".into())),
844         }
845     }
846 
847     /// Send a message onto the circuit's channel (to be called with a `Context`)
848     ///
849     /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
850     /// will be enqueued for sending at a later iteration of the reactor loop.
851     ///
852     /// # Note
853     ///
854     /// Making use of the enqueuing capabilities of this function is discouraged! You should first
855     /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
856     /// ideally use this to implement backpressure (such that you do not read from other sources
857     /// that would send here while you know you're unable to forward the messages on).
send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()>858     fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> {
859         let cell = ChanCell::new(self.channel_id, msg);
860         if self.channel.poll_ready(cx)? {
861             Pin::new(&mut self.channel).start_send(cell)?;
862         } else {
863             // This case shouldn't actually happen that often, if ever. We generally check whether
864             // the channel can be sent to before calling this function (the one exception at the
865             // time of writing is in circuit creation).
866             //
867             // If this is suddenly getting hit and it wasn't before, maybe you added something that
868             // doesn't bother to check the channel (`self.channel.poll_ready(cx)`) before calling
869             // this function, and that's getting used a lot?
870             //
871             // We don't want to drop cells on the floor, though, so this is good to have.
872             warn!(
873                 "{}: having to enqueue cell due to backpressure: {:?}",
874                 self.unique_id, cell
875             );
876             self.outbound.push_back(cell);
877         }
878         Ok(())
879     }
880 
881     /// Wrapper around `send_msg_direct` that uses `futures::future::poll_fn` to get a `Context`.
send_msg(&mut self, msg: ChanMsg) -> Result<()>882     async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> {
883         // HACK(eta): technically the closure passed to `poll_fn` is a `FnMut` closure, since it
884         //            can be polled multiple times.
885         //            We're going to return Ready immediately since we're only using `poll_fn` to
886         //            get a `Context`, but the compiler doesn't know that, so use an `Option`
887         //            which we can `take()` in order to move out of it.
888         //            (if we do get polled again this'll panic, but that shouldn't happen!)
889         let mut msg = Some(msg);
890         futures::future::poll_fn(|cx| -> Poll<Result<()>> {
891             self.send_msg_direct(cx, msg.take().expect("poll_fn called twice?"))?;
892             Poll::Ready(Ok(()))
893         })
894         .await?;
895         Ok(())
896     }
897 
898     /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop.
899     ///
900     /// Does not check whether the cell is well-formed or reasonable.
send_relay_cell( &mut self, cx: &mut Context<'_>, hop: HopNum, early: bool, cell: RelayCell, ) -> Result<()>901     fn send_relay_cell(
902         &mut self,
903         cx: &mut Context<'_>,
904         hop: HopNum,
905         early: bool,
906         cell: RelayCell,
907     ) -> Result<()> {
908         let c_t_w = sendme::cell_counts_towards_windows(&cell);
909         let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into();
910         let tag = self.crypto_out.encrypt(&mut body, hop)?;
911         let msg = chancell::msg::Relay::from_raw(body.into());
912         let msg = if early {
913             ChanMsg::RelayEarly(msg)
914         } else {
915             ChanMsg::Relay(msg)
916         };
917         // If the cell counted towards our sendme window, decrement
918         // that window, and maybe remember the authentication tag.
919         if c_t_w {
920             let hop_num = Into::<usize>::into(hop);
921             let hop = &mut self.hops[hop_num];
922             if hop.sendwindow.window() == 0 {
923                 let cell = ChanCell::new(self.channel_id, msg);
924                 // Send window is empty! Push this cell onto the hop's outbound queue, and it'll
925                 // get sent later.
926                 trace!(
927                     "{}: having to use onto hop {} queue for cell: {:?}",
928                     self.unique_id,
929                     hop_num,
930                     cell
931                 );
932                 hop.outbound.push_back((*tag, cell));
933                 return Ok(());
934             }
935             hop.sendwindow.take(tag)?;
936         }
937         self.send_msg_direct(cx, msg)
938     }
939 
940     /// Handle a CtrlMsg other than Shutdown.
handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()>941     fn handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()> {
942         trace!("{}: reactor received {:?}", self.unique_id, msg);
943         match msg {
944             // This is handled earlier, since it requires blocking.
945             CtrlMsg::Create { .. } => panic!("got a CtrlMsg::Create in handle_control"),
946             // This is handled earlier, since it requires generating a ReactorError.
947             CtrlMsg::Shutdown => panic!("got a CtrlMsg::Shutdown in handle_control"),
948             CtrlMsg::ExtendNtor {
949                 public_key,
950                 linkspecs,
951                 supports_authenticated_sendme,
952                 params,
953                 done,
954             } => {
955                 match CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
956                     cx,
957                     0x02,
958                     &public_key,
959                     linkspecs,
960                     supports_authenticated_sendme,
961                     params,
962                     self,
963                 ) {
964                     Ok(e) => {
965                         self.meta_handler = Some((Box::new(e), done));
966                     }
967                     Err(e) => {
968                         let _ = done.send(Err(e));
969                     }
970                 };
971             }
972             CtrlMsg::BeginStream {
973                 hop_num,
974                 message,
975                 sender,
976                 rx,
977                 done,
978             } => {
979                 let ret = self.begin_stream(cx, hop_num, message, sender, rx);
980                 let _ = done.send(ret); // don't care if sender goes away
981             }
982             CtrlMsg::SendSendme { stream_id, hop_num } => {
983                 let sendme = Sendme::new_empty();
984                 let cell = RelayCell::new(stream_id, sendme.into());
985                 self.send_relay_cell(cx, hop_num, false, cell)?;
986             }
987             #[cfg(test)]
988             CtrlMsg::AddFakeHop {
989                 supports_flowctrl_1,
990                 fwd_lasthop,
991                 rev_lasthop,
992                 params,
993                 done,
994             } => {
995                 use crate::circuit::test::DummyCrypto;
996 
997                 let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
998                 let rev = Box::new(DummyCrypto::new(rev_lasthop));
999                 self.add_hop(supports_flowctrl_1, fwd, rev, &params);
1000                 let _ = done.send(Ok(()));
1001             }
1002             #[cfg(test)]
1003             CtrlMsg::QuerySendWindow { hop, done } => {
1004                 let _ = done.send(if let Some(hop) = self.hop_mut(hop) {
1005                     Ok(hop.sendwindow.window_and_expected_tags())
1006                 } else {
1007                     Err(Error::InternalError(
1008                         "received QuerySendWindow for unknown hop".into(),
1009                     ))
1010                 });
1011             }
1012             #[cfg(test)]
1013             CtrlMsg::SendRelayCell { hop, early, cell } => {
1014                 self.send_relay_cell(cx, hop, early, cell)?;
1015             }
1016         }
1017         Ok(())
1018     }
1019 
1020     /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
1021     /// `message` to the provided hop.
begin_stream( &mut self, cx: &mut Context<'_>, hopnum: HopNum, message: RelayMsg, sender: mpsc::Sender<RelayMsg>, rx: mpsc::Receiver<RelayMsg>, ) -> Result<StreamId>1022     fn begin_stream(
1023         &mut self,
1024         cx: &mut Context<'_>,
1025         hopnum: HopNum,
1026         message: RelayMsg,
1027         sender: mpsc::Sender<RelayMsg>,
1028         rx: mpsc::Receiver<RelayMsg>,
1029     ) -> Result<StreamId> {
1030         let hop = self
1031             .hop_mut(hopnum)
1032             .ok_or_else(|| Error::InternalError(format!("No such hop {:?}", hopnum)))?;
1033         let send_window = StreamSendWindow::new(SEND_WINDOW_INIT);
1034         let r = hop.map.add_ent(sender, rx, send_window)?;
1035         let cell = RelayCell::new(r, message);
1036         self.send_relay_cell(cx, hopnum, false, cell)?;
1037         Ok(r)
1038     }
1039 
1040     /// Close the stream associated with `id` because the stream was
1041     /// dropped.
1042     ///
1043     /// If we have not already received an END cell on this stream, send one.
close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()>1044     fn close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()> {
1045         // Mark the stream as closing.
1046         let hop = self.hop_mut(hopnum).ok_or_else(|| {
1047             Error::InternalError("Tried to close a stream on a hop that wasn't there?".into())
1048         })?;
1049 
1050         let should_send_end = hop.map.terminate(id)?;
1051         trace!(
1052             "{}: Ending stream {}; should_send_end={:?}",
1053             self.unique_id,
1054             id,
1055             should_send_end
1056         );
1057         // TODO: I am about 80% sure that we only send an END cell if
1058         // we didn't already get an END cell.  But I should double-check!
1059         if should_send_end == ShouldSendEnd::Send {
1060             let end_cell = RelayCell::new(id, End::new_misc().into());
1061             self.send_relay_cell(cx, hopnum, false, end_cell)?;
1062         }
1063         Ok(())
1064     }
1065 
1066     /// Helper: process a cell on a channel.  Most cells get ignored
1067     /// or rejected; a few get delivered to circuits.
1068     ///
1069     /// Return true if we should exit.
handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result<bool>1070     fn handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result<bool> {
1071         trace!("{}: handling cell: {:?}", self.unique_id, cell);
1072         use ClientCircChanMsg::*;
1073         match cell {
1074             Relay(r) => {
1075                 self.handle_relay_cell(cx, r)?;
1076                 Ok(false)
1077             }
1078             Destroy(_) => {
1079                 self.handle_destroy_cell()?;
1080                 Ok(true)
1081             }
1082         }
1083     }
1084 
1085     /// React to a Relay or RelayEarly cell.
handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<()>1086     fn handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<()> {
1087         let mut body = cell.into_relay_body().into();
1088 
1089         // Decrypt the cell. If it's recognized, then find the
1090         // corresponding hop.
1091         let (hopnum, tag) = self.crypto_in.decrypt(&mut body)?;
1092         // Make a copy of the authentication tag. TODO: I'd rather not
1093         // copy it, but I don't see a way around it right now.
1094         let tag = {
1095             let mut tag_copy = [0_u8; 20];
1096             // XXXX This could crash if the tag length changes.  We'll
1097             // have to refactor it then.
1098             (&mut tag_copy).copy_from_slice(tag);
1099             tag_copy
1100         };
1101         // Decode the cell.
1102         let msg = RelayCell::decode(body.into())?;
1103 
1104         let c_t_w = sendme::cell_counts_towards_windows(&msg);
1105 
1106         // Decrement the circuit sendme windows, and see if we need to
1107         // send a sendme cell.
1108         let send_circ_sendme = if c_t_w {
1109             let hop = self
1110                 .hop_mut(hopnum)
1111                 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?;
1112             hop.recvwindow.take()?
1113         } else {
1114             false
1115         };
1116         // If we do need to send a circuit-level SENDME cell, do so.
1117         if send_circ_sendme {
1118             let sendme = Sendme::new_tag(tag);
1119             let cell = RelayCell::new(0.into(), sendme.into());
1120             self.send_relay_cell(cx, hopnum, false, cell)?;
1121             self.hop_mut(hopnum)
1122                 .ok_or_else(|| {
1123                     Error::InternalError("Trying to send SENDME to nonexistent hop".to_string())
1124                 })?
1125                 .recvwindow
1126                 .put();
1127         }
1128 
1129         // Break the message apart into its streamID and message.
1130         let (streamid, msg) = msg.into_streamid_and_msg();
1131 
1132         // If this cell wants/refuses to have a Stream ID, does it
1133         // have/not have one?
1134         if !msg.cmd().accepts_streamid_val(streamid) {
1135             return Err(Error::CircProto(format!(
1136                 "Invalid stream ID {} for relay command {}",
1137                 streamid,
1138                 msg.cmd()
1139             )));
1140         }
1141 
1142         // If this has a reasonable streamID value of 0, it's a meta cell,
1143         // not meant for a particular stream.
1144         if streamid.is_zero() {
1145             return self.handle_meta_cell(hopnum, msg);
1146         }
1147 
1148         let hop = self
1149             .hop_mut(hopnum)
1150             .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
1151         match hop.map.get_mut(streamid) {
1152             Some(StreamEnt::Open {
1153                 sink,
1154                 send_window,
1155                 dropped,
1156                 ..
1157             }) => {
1158                 // The stream for this message exists, and is open.
1159 
1160                 if let RelayMsg::Sendme(_) = msg {
1161                     // We need to handle sendmes here, not in the stream's
1162                     // recv() method, or else we'd never notice them if the
1163                     // stream isn't reading.
1164                     // FIXME(eta): I think ignoring the must_use return value here is okay, since
1165                     //             the tag is () anyway? or something???
1166                     let _ = send_window.put(Some(()));
1167                     return Ok(());
1168                 }
1169 
1170                 // Remember whether this was an end cell: if so we should
1171                 // close the stream.
1172                 let is_end_cell = matches!(msg, RelayMsg::End(_));
1173 
1174                 // TODO: Add a wrapper type here to reject cells that should
1175                 // never go to a client, like BEGIN.
1176                 if let Err(e) = sink.try_send(msg) {
1177                     if e.is_full() {
1178                         // If we get here, we either have a logic bug (!), or an attacker
1179                         // is sending us more cells than we asked for via congestion control.
1180                         return Err(Error::CircProto(format!(
1181                             "Stream sink would block; received too many cells on stream ID {}",
1182                             streamid,
1183                         )));
1184                     }
1185                     if e.is_disconnected() && c_t_w {
1186                         // the other side of the stream has gone away; remember
1187                         // that we received a cell that we couldn't queue for it.
1188                         //
1189                         // Later this value will be recorded in a half-stream.
1190                         *dropped += 1;
1191                     }
1192                 }
1193                 if is_end_cell {
1194                     hop.map.end_received(streamid)?;
1195                 }
1196             }
1197             Some(StreamEnt::EndSent(halfstream)) => {
1198                 // We sent an end but maybe the other side hasn't heard.
1199 
1200                 if matches!(msg, RelayMsg::End(_)) {
1201                     hop.map.end_received(streamid)?;
1202                 } else {
1203                     halfstream.handle_msg(&msg)?;
1204                 }
1205             }
1206             _ => {
1207                 // No stream wants this message.
1208                 return Err(Error::CircProto(
1209                     "Cell received on nonexistent stream!?".into(),
1210                 ));
1211             }
1212         }
1213         Ok(())
1214     }
1215 
1216     /// Helper: process a destroy cell.
1217     #[allow(clippy::unnecessary_wraps)]
handle_destroy_cell(&mut self) -> Result<()>1218     fn handle_destroy_cell(&mut self) -> Result<()> {
1219         // I think there is nothing more to do here.
1220         Ok(())
1221     }
1222 
1223     /// Return the hop corresponding to `hopnum`, if there is one.
hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop>1224     fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1225         self.hops.get_mut(Into::<usize>::into(hopnum))
1226     }
1227 }
1228 
1229 impl Drop for Reactor {
drop(&mut self)1230     fn drop(&mut self) {
1231         let _ = self.channel.close_circuit(self.channel_id);
1232     }
1233 }
1234 
1235 #[cfg(test)]
1236 mod test {}
1237