1 //! DNS high level transit implimentations.
2 //!
3 //! Primarily there are two types in this module of interest, the `DnsMultiplexer` type and the `DnsHandle` type. `DnsMultiplexer` can be thought of as the state machine responsible for sending and receiving DNS messages. `DnsHandle` is the type given to API users of the `trust-dns-proto` library to send messages into the `DnsMultiplexer` for delivery. Finally there is the `DnsRequest` type. This allows for customizations, through `DnsRequestOptions`, to the delivery of messages via a `DnsMultiplexer`.
4 //!
5 //! TODO: this module needs some serious refactoring and normalization.
6 
7 use std::fmt::{Debug, Display};
8 use std::net::SocketAddr;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11 
12 use futures::channel::mpsc::{TrySendError, UnboundedSender};
13 use futures::channel::oneshot::{self, Receiver, Sender};
14 use futures::{ready, Future, Stream};
15 use log::{debug, warn};
16 
17 use crate::error::*;
18 use crate::op::Message;
19 
20 mod dns_exchange;
21 pub mod dns_handle;
22 pub mod dns_multiplexer;
23 pub mod dns_request;
24 pub mod dns_response;
25 pub mod retry_dns_handle;
26 #[cfg(feature = "dnssec")]
27 pub mod secure_dns_handle;
28 mod serial_message;
29 
30 pub use self::dns_exchange::{DnsExchange, DnsExchangeConnect};
31 pub use self::dns_handle::{BasicDnsHandle, DnsHandle, DnsStreamHandle, StreamHandle};
32 pub use self::dns_multiplexer::{
33     DnsMultiplexer, DnsMultiplexerConnect, DnsMultiplexerSerialResponse,
34 };
35 pub use self::dns_request::{DnsRequest, DnsRequestOptions};
36 pub use self::dns_response::DnsResponse;
37 pub use self::retry_dns_handle::RetryDnsHandle;
38 #[cfg(feature = "dnssec")]
39 pub use self::secure_dns_handle::SecureDnsHandle;
40 pub use self::serial_message::SerialMessage;
41 
42 /// Ignores the result of a send operation and logs and ignores errors
ignore_send<M, E: Debug>(result: Result<M, E>)43 fn ignore_send<M, E: Debug>(result: Result<M, E>) {
44     if let Err(error) = result {
45         warn!("error notifying wait, possible future leak: {:?}", error);
46     }
47 }
48 
49 /// A non-multiplexed stream of Serialized DNS messages
50 pub trait DnsClientStream:
51     Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
52 {
53     /// The remote name server address
name_server_addr(&self) -> SocketAddr54     fn name_server_addr(&self) -> SocketAddr;
55 }
56 
57 // TODO: change to Sink
58 /// A sender to which serialized DNS Messages can be sent
59 #[derive(Clone)]
60 pub struct BufStreamHandle {
61     sender: UnboundedSender<SerialMessage>,
62 }
63 
64 impl BufStreamHandle {
65     /// Constructs a new BufStreamHandle with the associated ProtoError
new(sender: UnboundedSender<SerialMessage>) -> Self66     pub fn new(sender: UnboundedSender<SerialMessage>) -> Self {
67         BufStreamHandle { sender }
68     }
69 
70     /// see [`futures::sync::mpsc::UnboundedSender`]
unbounded_send(&self, msg: SerialMessage) -> Result<(), TrySendError<SerialMessage>>71     pub fn unbounded_send(&self, msg: SerialMessage) -> Result<(), TrySendError<SerialMessage>> {
72         self.sender.unbounded_send(msg)
73     }
74 }
75 
76 // TODO: change to Sink
77 /// A sender to which a Message can be sent
78 pub type MessageStreamHandle = UnboundedSender<Message>;
79 
80 /// A buffering stream bound to a `SocketAddr`
81 pub struct BufDnsStreamHandle {
82     name_server: SocketAddr,
83     sender: BufStreamHandle,
84 }
85 
86 impl BufDnsStreamHandle {
87     /// Constructs a new Buffered Stream Handle, used for sending data to the DNS peer.
88     ///
89     /// # Arguments
90     ///
91     /// * `name_server` - the address of the DNS server
92     /// * `sender` - the handle being used to send data to the server
new(name_server: SocketAddr, sender: BufStreamHandle) -> Self93     pub fn new(name_server: SocketAddr, sender: BufStreamHandle) -> Self {
94         BufDnsStreamHandle {
95             name_server,
96             sender,
97         }
98     }
99 }
100 
101 impl DnsStreamHandle for BufDnsStreamHandle {
send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError>102     fn send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError> {
103         let name_server: SocketAddr = self.name_server;
104         let sender: &mut _ = &mut self.sender;
105         sender
106             .sender
107             .unbounded_send(SerialMessage::new(buffer.unwrap().0, name_server))
108             .map_err(|e| ProtoError::from(format!("mpsc::SendError {}", e)))
109     }
110 }
111 
112 // TODO: expose the Sink trait for this?
113 /// A sender to which serialized DNS Messages can be sent
114 pub struct DnsRequestStreamHandle<F>
115 where
116     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
117 {
118     sender: UnboundedSender<OneshotDnsRequest<F>>,
119 }
120 
121 impl<F> DnsRequestStreamHandle<F>
122 where
123     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
124 {
125     /// Constructs a new BufStreamHandle with the associated ProtoError
new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self126     pub fn new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self {
127         DnsRequestStreamHandle { sender }
128     }
129 
130     /// see [`futures::sync::mpsc::UnboundedSender`]
unbounded_send( &self, msg: OneshotDnsRequest<F>, ) -> Result<(), TrySendError<OneshotDnsRequest<F>>>131     pub fn unbounded_send(
132         &self,
133         msg: OneshotDnsRequest<F>,
134     ) -> Result<(), TrySendError<OneshotDnsRequest<F>>> {
135         self.sender.unbounded_send(msg)
136     }
137 }
138 
139 impl<F> Clone for DnsRequestStreamHandle<F>
140 where
141     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
142 {
clone(&self) -> Self143     fn clone(&self) -> Self {
144         DnsRequestStreamHandle {
145             sender: self.sender.clone(),
146         }
147     }
148 }
149 
150 /// Types that implement this are capable of sending a serialized DNS message on a stream
151 ///
152 /// The underlying Stream implementation should yield `Some(())` whenever it is ready to send a message,
153 ///   NotReady, if it is not ready to send a message, and `Err` or `None` in the case that the stream is
154 ///   done, and should be shutdown.
155 pub trait DnsRequestSender:
156     Stream<Item = Result<(), ProtoError>> + 'static + Display + Send + Unpin
157 {
158     /// A future that resolves to a response serial message
159     type DnsResponseFuture: Future<Output = Result<DnsResponse, ProtoError>>
160         + 'static
161         + Send
162         + Unpin;
163 
164     /// Send a message, and return a future of the response
165     ///
166     /// # Return
167     ///
168     /// A future which will resolve to a SerialMessage response
send_message(&mut self, message: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture169     fn send_message(&mut self, message: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture;
170 
171     /// Constructs an error response
error_response(error: ProtoError) -> Self::DnsResponseFuture172     fn error_response(error: ProtoError) -> Self::DnsResponseFuture;
173 
174     /// Allows the upstream user to inform the underling stream that it should shutdown.
175     ///
176     /// After this is called, the next time `poll` is called on the stream it would be correct to return `Poll::Ready(Ok(()))`. This is not required though, if there are say outstanding requests that are not yet complete, then it would be correct to first wait for those results.
shutdown(&mut self)177     fn shutdown(&mut self);
178 
179     /// Returns true if the stream has been shutdown with `shutdown`
is_shutdown(&self) -> bool180     fn is_shutdown(&self) -> bool;
181 }
182 
183 /// Used for associating a name_server to a DnsRequestStreamHandle
184 pub struct BufDnsRequestStreamHandle<F>
185 where
186     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
187 {
188     sender: DnsRequestStreamHandle<F>,
189 }
190 
191 impl<F> BufDnsRequestStreamHandle<F>
192 where
193     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
194 {
195     /// Construct a new BufDnsRequestStreamHandle
new(sender: DnsRequestStreamHandle<F>) -> Self196     pub fn new(sender: DnsRequestStreamHandle<F>) -> Self {
197         BufDnsRequestStreamHandle { sender }
198     }
199 }
200 
201 impl<F> Clone for BufDnsRequestStreamHandle<F>
202 where
203     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
204 {
clone(&self) -> Self205     fn clone(&self) -> Self {
206         BufDnsRequestStreamHandle {
207             sender: self.sender.clone(),
208         }
209     }
210 }
211 
212 macro_rules! try_oneshot {
213     ($expr:expr) => {{
214         use std::result::Result;
215 
216         match $expr {
217             Result::Ok(val) => val,
218             Result::Err(err) => {
219                 return OneshotDnsResponseReceiver::Err(Some(ProtoError::from(err)))
220             }
221         }
222     }};
223     ($expr:expr,) => {
224         $expr?
225     };
226 }
227 
228 impl<F> DnsHandle for BufDnsRequestStreamHandle<F>
229 where
230     F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin + 'static,
231 {
232     type Response = OneshotDnsResponseReceiver<F>;
233 
send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response234     fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
235         let request: DnsRequest = request.into();
236         debug!("enqueueing message: {:?}", request.queries());
237 
238         let (request, oneshot) = OneshotDnsRequest::oneshot(request);
239         try_oneshot!(self.sender.unbounded_send(request).map_err(|_| {
240             debug!("unable to enqueue message");
241             ProtoError::from("could not send request")
242         }));
243 
244         OneshotDnsResponseReceiver::Receiver(oneshot)
245     }
246 }
247 
248 // TODO: this future should return the origin message in the response on errors
249 /// A OneshotDnsRequest creates a channel for a response to message
250 pub struct OneshotDnsRequest<F>
251 where
252     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
253 {
254     dns_request: DnsRequest,
255     sender_for_response: Sender<F>,
256 }
257 
258 impl<F> OneshotDnsRequest<F>
259 where
260     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
261 {
oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>)262     fn oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>) {
263         let (sender_for_response, receiver) = oneshot::channel();
264 
265         (
266             OneshotDnsRequest {
267                 dns_request,
268                 sender_for_response,
269             },
270             receiver,
271         )
272     }
273 
unwrap(self) -> (DnsRequest, OneshotDnsResponse<F>)274     fn unwrap(self) -> (DnsRequest, OneshotDnsResponse<F>) {
275         (
276             self.dns_request,
277             OneshotDnsResponse(self.sender_for_response),
278         )
279     }
280 }
281 
282 struct OneshotDnsResponse<F>(oneshot::Sender<F>)
283 where
284     F: Future<Output = Result<DnsResponse, ProtoError>> + Send;
285 
286 impl<F> OneshotDnsResponse<F>
287 where
288     F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
289 {
send_response(self, serial_response: F) -> Result<(), F>290     fn send_response(self, serial_response: F) -> Result<(), F> {
291         self.0.send(serial_response)
292     }
293 }
294 
295 /// A Future that wraps a oneshot::Receiver and resolves to the final value
296 pub enum OneshotDnsResponseReceiver<F>
297 where
298     F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
299 {
300     /// The receiver
301     Receiver(Receiver<F>),
302     /// The future once received
303     Received(F),
304     /// Error during the send operation
305     Err(Option<ProtoError>),
306 }
307 
308 impl<F> Future for OneshotDnsResponseReceiver<F>
309 where
310     F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
311 {
312     type Output = <F as Future>::Output;
313 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>314     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
315         loop {
316             *self = match *self.as_mut() {
317                 OneshotDnsResponseReceiver::Receiver(ref mut receiver) => {
318                     let receiver = Pin::new(receiver);
319                     let future = ready!(receiver
320                         .poll(cx)
321                         .map_err(|_| ProtoError::from("receiver was canceled")))?;
322                     OneshotDnsResponseReceiver::Received(future)
323                 }
324                 OneshotDnsResponseReceiver::Received(ref mut future) => {
325                     let future = Pin::new(future);
326                     return future.poll(cx);
327                 }
328                 OneshotDnsResponseReceiver::Err(ref mut err) => {
329                     return Poll::Ready(Err(err
330                         .take()
331                         .expect("futures should not be polled after complete")))
332                 }
333             };
334         }
335     }
336 }
337