1 //! DNS high level transit implimentations.
2 //!
3 //! Primarily there are two types in this module of interest, the `DnsMultiplexer` type and the `DnsHandle` type. `DnsMultiplexer` can be thought of as the state machine responsible for sending and receiving DNS messages. `DnsHandle` is the type given to API users of the `trust-dns-proto` library to send messages into the `DnsMultiplexer` for delivery. Finally there is the `DnsRequest` type. This allows for customizations, through `DnsRequestOptions`, to the delivery of messages via a `DnsMultiplexer`.
4 //!
5 //! TODO: this module needs some serious refactoring and normalization.
6
7 use std::fmt::{Debug, Display};
8 use std::net::SocketAddr;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 use futures::channel::mpsc::{TrySendError, UnboundedSender};
13 use futures::channel::oneshot::{self, Receiver, Sender};
14 use futures::{ready, Future, Stream};
15 use log::{debug, warn};
16
17 use crate::error::*;
18 use crate::op::Message;
19
20 mod dns_exchange;
21 pub mod dns_handle;
22 pub mod dns_multiplexer;
23 pub mod dns_request;
24 pub mod dns_response;
25 pub mod retry_dns_handle;
26 #[cfg(feature = "dnssec")]
27 pub mod secure_dns_handle;
28 mod serial_message;
29
30 pub use self::dns_exchange::{DnsExchange, DnsExchangeConnect};
31 pub use self::dns_handle::{BasicDnsHandle, DnsHandle, DnsStreamHandle, StreamHandle};
32 pub use self::dns_multiplexer::{
33 DnsMultiplexer, DnsMultiplexerConnect, DnsMultiplexerSerialResponse,
34 };
35 pub use self::dns_request::{DnsRequest, DnsRequestOptions};
36 pub use self::dns_response::DnsResponse;
37 pub use self::retry_dns_handle::RetryDnsHandle;
38 #[cfg(feature = "dnssec")]
39 pub use self::secure_dns_handle::SecureDnsHandle;
40 pub use self::serial_message::SerialMessage;
41
42 /// Ignores the result of a send operation and logs and ignores errors
ignore_send<M, E: Debug>(result: Result<M, E>)43 fn ignore_send<M, E: Debug>(result: Result<M, E>) {
44 if let Err(error) = result {
45 warn!("error notifying wait, possible future leak: {:?}", error);
46 }
47 }
48
49 /// A non-multiplexed stream of Serialized DNS messages
50 pub trait DnsClientStream:
51 Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
52 {
53 /// The remote name server address
name_server_addr(&self) -> SocketAddr54 fn name_server_addr(&self) -> SocketAddr;
55 }
56
57 // TODO: change to Sink
58 /// A sender to which serialized DNS Messages can be sent
59 #[derive(Clone)]
60 pub struct BufStreamHandle {
61 sender: UnboundedSender<SerialMessage>,
62 }
63
64 impl BufStreamHandle {
65 /// Constructs a new BufStreamHandle with the associated ProtoError
new(sender: UnboundedSender<SerialMessage>) -> Self66 pub fn new(sender: UnboundedSender<SerialMessage>) -> Self {
67 BufStreamHandle { sender }
68 }
69
70 /// see [`futures::sync::mpsc::UnboundedSender`]
unbounded_send(&self, msg: SerialMessage) -> Result<(), TrySendError<SerialMessage>>71 pub fn unbounded_send(&self, msg: SerialMessage) -> Result<(), TrySendError<SerialMessage>> {
72 self.sender.unbounded_send(msg)
73 }
74 }
75
76 // TODO: change to Sink
77 /// A sender to which a Message can be sent
78 pub type MessageStreamHandle = UnboundedSender<Message>;
79
80 /// A buffering stream bound to a `SocketAddr`
81 pub struct BufDnsStreamHandle {
82 name_server: SocketAddr,
83 sender: BufStreamHandle,
84 }
85
86 impl BufDnsStreamHandle {
87 /// Constructs a new Buffered Stream Handle, used for sending data to the DNS peer.
88 ///
89 /// # Arguments
90 ///
91 /// * `name_server` - the address of the DNS server
92 /// * `sender` - the handle being used to send data to the server
new(name_server: SocketAddr, sender: BufStreamHandle) -> Self93 pub fn new(name_server: SocketAddr, sender: BufStreamHandle) -> Self {
94 BufDnsStreamHandle {
95 name_server,
96 sender,
97 }
98 }
99 }
100
101 impl DnsStreamHandle for BufDnsStreamHandle {
send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError>102 fn send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError> {
103 let name_server: SocketAddr = self.name_server;
104 let sender: &mut _ = &mut self.sender;
105 sender
106 .sender
107 .unbounded_send(SerialMessage::new(buffer.unwrap().0, name_server))
108 .map_err(|e| ProtoError::from(format!("mpsc::SendError {}", e)))
109 }
110 }
111
112 // TODO: expose the Sink trait for this?
113 /// A sender to which serialized DNS Messages can be sent
114 pub struct DnsRequestStreamHandle<F>
115 where
116 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
117 {
118 sender: UnboundedSender<OneshotDnsRequest<F>>,
119 }
120
121 impl<F> DnsRequestStreamHandle<F>
122 where
123 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
124 {
125 /// Constructs a new BufStreamHandle with the associated ProtoError
new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self126 pub fn new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self {
127 DnsRequestStreamHandle { sender }
128 }
129
130 /// see [`futures::sync::mpsc::UnboundedSender`]
unbounded_send( &self, msg: OneshotDnsRequest<F>, ) -> Result<(), TrySendError<OneshotDnsRequest<F>>>131 pub fn unbounded_send(
132 &self,
133 msg: OneshotDnsRequest<F>,
134 ) -> Result<(), TrySendError<OneshotDnsRequest<F>>> {
135 self.sender.unbounded_send(msg)
136 }
137 }
138
139 impl<F> Clone for DnsRequestStreamHandle<F>
140 where
141 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
142 {
clone(&self) -> Self143 fn clone(&self) -> Self {
144 DnsRequestStreamHandle {
145 sender: self.sender.clone(),
146 }
147 }
148 }
149
150 /// Types that implement this are capable of sending a serialized DNS message on a stream
151 ///
152 /// The underlying Stream implementation should yield `Some(())` whenever it is ready to send a message,
153 /// NotReady, if it is not ready to send a message, and `Err` or `None` in the case that the stream is
154 /// done, and should be shutdown.
155 pub trait DnsRequestSender:
156 Stream<Item = Result<(), ProtoError>> + 'static + Display + Send + Unpin
157 {
158 /// A future that resolves to a response serial message
159 type DnsResponseFuture: Future<Output = Result<DnsResponse, ProtoError>>
160 + 'static
161 + Send
162 + Unpin;
163
164 /// Send a message, and return a future of the response
165 ///
166 /// # Return
167 ///
168 /// A future which will resolve to a SerialMessage response
send_message(&mut self, message: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture169 fn send_message(&mut self, message: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture;
170
171 /// Constructs an error response
error_response(error: ProtoError) -> Self::DnsResponseFuture172 fn error_response(error: ProtoError) -> Self::DnsResponseFuture;
173
174 /// Allows the upstream user to inform the underling stream that it should shutdown.
175 ///
176 /// After this is called, the next time `poll` is called on the stream it would be correct to return `Poll::Ready(Ok(()))`. This is not required though, if there are say outstanding requests that are not yet complete, then it would be correct to first wait for those results.
shutdown(&mut self)177 fn shutdown(&mut self);
178
179 /// Returns true if the stream has been shutdown with `shutdown`
is_shutdown(&self) -> bool180 fn is_shutdown(&self) -> bool;
181 }
182
183 /// Used for associating a name_server to a DnsRequestStreamHandle
184 pub struct BufDnsRequestStreamHandle<F>
185 where
186 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
187 {
188 sender: DnsRequestStreamHandle<F>,
189 }
190
191 impl<F> BufDnsRequestStreamHandle<F>
192 where
193 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
194 {
195 /// Construct a new BufDnsRequestStreamHandle
new(sender: DnsRequestStreamHandle<F>) -> Self196 pub fn new(sender: DnsRequestStreamHandle<F>) -> Self {
197 BufDnsRequestStreamHandle { sender }
198 }
199 }
200
201 impl<F> Clone for BufDnsRequestStreamHandle<F>
202 where
203 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
204 {
clone(&self) -> Self205 fn clone(&self) -> Self {
206 BufDnsRequestStreamHandle {
207 sender: self.sender.clone(),
208 }
209 }
210 }
211
212 macro_rules! try_oneshot {
213 ($expr:expr) => {{
214 use std::result::Result;
215
216 match $expr {
217 Result::Ok(val) => val,
218 Result::Err(err) => {
219 return OneshotDnsResponseReceiver::Err(Some(ProtoError::from(err)))
220 }
221 }
222 }};
223 ($expr:expr,) => {
224 $expr?
225 };
226 }
227
228 impl<F> DnsHandle for BufDnsRequestStreamHandle<F>
229 where
230 F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin + 'static,
231 {
232 type Response = OneshotDnsResponseReceiver<F>;
233
send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response234 fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
235 let request: DnsRequest = request.into();
236 debug!("enqueueing message: {:?}", request.queries());
237
238 let (request, oneshot) = OneshotDnsRequest::oneshot(request);
239 try_oneshot!(self.sender.unbounded_send(request).map_err(|_| {
240 debug!("unable to enqueue message");
241 ProtoError::from("could not send request")
242 }));
243
244 OneshotDnsResponseReceiver::Receiver(oneshot)
245 }
246 }
247
248 // TODO: this future should return the origin message in the response on errors
249 /// A OneshotDnsRequest creates a channel for a response to message
250 pub struct OneshotDnsRequest<F>
251 where
252 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
253 {
254 dns_request: DnsRequest,
255 sender_for_response: Sender<F>,
256 }
257
258 impl<F> OneshotDnsRequest<F>
259 where
260 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
261 {
oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>)262 fn oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>) {
263 let (sender_for_response, receiver) = oneshot::channel();
264
265 (
266 OneshotDnsRequest {
267 dns_request,
268 sender_for_response,
269 },
270 receiver,
271 )
272 }
273
unwrap(self) -> (DnsRequest, OneshotDnsResponse<F>)274 fn unwrap(self) -> (DnsRequest, OneshotDnsResponse<F>) {
275 (
276 self.dns_request,
277 OneshotDnsResponse(self.sender_for_response),
278 )
279 }
280 }
281
282 struct OneshotDnsResponse<F>(oneshot::Sender<F>)
283 where
284 F: Future<Output = Result<DnsResponse, ProtoError>> + Send;
285
286 impl<F> OneshotDnsResponse<F>
287 where
288 F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
289 {
send_response(self, serial_response: F) -> Result<(), F>290 fn send_response(self, serial_response: F) -> Result<(), F> {
291 self.0.send(serial_response)
292 }
293 }
294
295 /// A Future that wraps a oneshot::Receiver and resolves to the final value
296 pub enum OneshotDnsResponseReceiver<F>
297 where
298 F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
299 {
300 /// The receiver
301 Receiver(Receiver<F>),
302 /// The future once received
303 Received(F),
304 /// Error during the send operation
305 Err(Option<ProtoError>),
306 }
307
308 impl<F> Future for OneshotDnsResponseReceiver<F>
309 where
310 F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
311 {
312 type Output = <F as Future>::Output;
313
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>314 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
315 loop {
316 *self = match *self.as_mut() {
317 OneshotDnsResponseReceiver::Receiver(ref mut receiver) => {
318 let receiver = Pin::new(receiver);
319 let future = ready!(receiver
320 .poll(cx)
321 .map_err(|_| ProtoError::from("receiver was canceled")))?;
322 OneshotDnsResponseReceiver::Received(future)
323 }
324 OneshotDnsResponseReceiver::Received(ref mut future) => {
325 let future = Pin::new(future);
326 return future.poll(cx);
327 }
328 OneshotDnsResponseReceiver::Err(ref mut err) => {
329 return Poll::Ready(Err(err
330 .take()
331 .expect("futures should not be polled after complete")))
332 }
333 };
334 }
335 }
336 }
337