1 use crate::cache::*;
2 use crate::dns::{self, *};
3 use crate::errors::*;
4 use crate::globals::*;
5 use crate::ClientCtx;
6 
7 use byteorder::{BigEndian, ByteOrder};
8 use rand::prelude::*;
9 use siphasher::sip128::Hasher128;
10 use std::cmp;
11 use std::hash::Hasher;
12 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
13 use tokio::io::{AsyncReadExt, AsyncWriteExt};
14 use tokio::net::{TcpSocket, UdpSocket};
15 
resolve_udp( globals: &Globals, mut packet: &mut Vec<u8>, packet_qname: &[u8], tid: u16, has_cached_response: bool, ) -> Result<Vec<u8>, Error>16 pub async fn resolve_udp(
17     globals: &Globals,
18     mut packet: &mut Vec<u8>,
19     packet_qname: &[u8],
20     tid: u16,
21     has_cached_response: bool,
22 ) -> Result<Vec<u8>, Error> {
23     let ext_socket = match globals.external_addr {
24         Some(x) => UdpSocket::bind(x).await?,
25         None => match globals.upstream_addr {
26             SocketAddr::V4(_) => {
27                 UdpSocket::bind(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
28                     .await?
29             }
30             SocketAddr::V6(s) => {
31                 UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
32                     Ipv6Addr::UNSPECIFIED,
33                     0,
34                     s.flowinfo(),
35                     s.scope_id(),
36                 )))
37                 .await?
38             }
39         },
40     };
41     ext_socket.connect(globals.upstream_addr).await?;
42     dns::set_edns_max_payload_size(&mut packet, DNS_MAX_PACKET_SIZE as u16)?;
43     let mut response;
44     let timeout = if has_cached_response {
45         globals.udp_timeout / 2
46     } else {
47         globals.udp_timeout
48     };
49     loop {
50         ext_socket.send(packet).await?;
51         response = vec![0u8; DNS_MAX_PACKET_SIZE];
52         dns::set_rcode_servfail(&mut response);
53         let fut = tokio::time::timeout(timeout, ext_socket.recv_from(&mut response[..]));
54         match fut.await {
55             Ok(Ok((response_len, response_addr))) => {
56                 response.truncate(response_len);
57                 if response_addr == globals.upstream_addr
58                     && response_len >= DNS_HEADER_SIZE
59                     && dns::tid(&response) == tid
60                     && packet_qname.eq_ignore_ascii_case(dns::qname(&response)?.as_slice())
61                 {
62                     break;
63                 }
64             }
65             _ => {
66                 if has_cached_response {
67                     trace!("Timeout, but cached response is present");
68                     break;
69                 }
70                 trace!("Timeout, no cached response");
71             }
72         }
73     }
74     Ok(response)
75 }
76 
resolve_tcp( globals: &Globals, packet: &mut Vec<u8>, packet_qname: &[u8], tid: u16, ) -> Result<Vec<u8>, Error>77 pub async fn resolve_tcp(
78     globals: &Globals,
79     packet: &mut Vec<u8>,
80     packet_qname: &[u8],
81     tid: u16,
82 ) -> Result<Vec<u8>, Error> {
83     let socket = match globals.external_addr {
84         Some(x @ SocketAddr::V4(_)) => {
85             let socket = TcpSocket::new_v4()?;
86             socket.set_reuseaddr(true).ok();
87             socket.bind(x)?;
88             socket
89         }
90         Some(x @ SocketAddr::V6(_)) => {
91             let socket = TcpSocket::new_v6()?;
92             socket.set_reuseaddr(true).ok();
93             socket.bind(x)?;
94             socket
95         }
96         None => match globals.upstream_addr {
97             SocketAddr::V4(_) => TcpSocket::new_v4()?,
98             SocketAddr::V6(_) => TcpSocket::new_v6()?,
99         },
100     };
101     let mut ext_socket = socket.connect(globals.upstream_addr).await?;
102     ext_socket.set_nodelay(true)?;
103     let mut binlen = [0u8, 0];
104     BigEndian::write_u16(&mut binlen[..], packet.len() as u16);
105     ext_socket.write_all(&binlen).await?;
106     ext_socket.write_all(packet).await?;
107     ext_socket.flush().await?;
108     ext_socket.read_exact(&mut binlen).await?;
109     let response_len = BigEndian::read_u16(&binlen) as usize;
110     ensure!(
111         (DNS_HEADER_SIZE..=DNS_MAX_PACKET_SIZE).contains(&response_len),
112         "Unexpected response size"
113     );
114     let mut response = vec![0u8; response_len];
115     ext_socket.read_exact(&mut response).await?;
116     ensure!(dns::tid(&response) == tid, "Unexpected transaction ID");
117     ensure!(
118         packet_qname.eq_ignore_ascii_case(dns::qname(&response)?.as_slice()),
119         "Unexpected query name in the response"
120     );
121     Ok(response)
122 }
123 
resolve( globals: &Globals, mut packet: &mut Vec<u8>, packet_qname: Vec<u8>, cached_response: Option<CachedResponse>, packet_hash: u128, original_tid: u16, ) -> Result<Vec<u8>, Error>124 pub async fn resolve(
125     globals: &Globals,
126     mut packet: &mut Vec<u8>,
127     packet_qname: Vec<u8>,
128     cached_response: Option<CachedResponse>,
129     packet_hash: u128,
130     original_tid: u16,
131 ) -> Result<Vec<u8>, Error> {
132     #[cfg(feature = "metrics")]
133     globals.varz.upstream_sent.inc();
134     let tid = random();
135     dns::set_tid(&mut packet, tid);
136     let mut response = resolve_udp(
137         globals,
138         packet,
139         &packet_qname,
140         tid,
141         cached_response.is_some(),
142     )
143     .await?;
144     if dns::is_truncated(&response) {
145         response = resolve_tcp(globals, packet, &packet_qname, tid).await?;
146     }
147     #[cfg(feature = "metrics")]
148     {
149         globals.varz.upstream_received.inc();
150         if dns::rcode_nxdomain(&response) {
151             globals.varz.upstream_rcode_nxdomain.inc();
152         }
153     }
154     if dns::rcode_servfail(&response) || dns::rcode_refused(&response) {
155         trace!("SERVFAIL/REFUSED: {}", dns::rcode(&response));
156         if let Some(cached_response) = cached_response {
157             trace!("Serving stale");
158             #[cfg(feature = "metrics")]
159             {
160                 globals.varz.client_queries_offline.inc();
161                 globals.varz.client_queries_cached.inc();
162             }
163             return Ok(cached_response.into_response());
164         } else {
165             #[cfg(feature = "metrics")]
166             globals.varz.upstream_errors.inc();
167         }
168     } else {
169         trace!("Adding to cache");
170         let cached_response = CachedResponse::new(&globals.cache, response.clone());
171         globals.cache.lock().insert(packet_hash, cached_response);
172     }
173     dns::set_tid(&mut response, original_tid);
174     dns::recase_qname(&mut response, &packet_qname)?;
175     #[cfg(feature = "metrics")]
176     globals
177         .varz
178         .upstream_response_sizes
179         .observe(response.len() as f64);
180     Ok(response)
181 }
182 
get_cached_response_or_resolve( globals: &Globals, client_ctx: &ClientCtx, mut packet: &mut Vec<u8>, ) -> Result<Vec<u8>, Error>183 pub async fn get_cached_response_or_resolve(
184     globals: &Globals,
185     client_ctx: &ClientCtx,
186     mut packet: &mut Vec<u8>,
187 ) -> Result<Vec<u8>, Error> {
188     let packet_qname = dns::qname(packet)?;
189     if let Some(my_ip) = &globals.my_ip {
190         if &packet_qname.to_ascii_lowercase() == my_ip {
191             let client_ip = match client_ctx {
192                 ClientCtx::Udp(u) => u.client_addr,
193                 ClientCtx::Tcp(t) => t.client_connection.peer_addr()?,
194             }
195             .ip();
196             return serve_ip_response(packet.to_vec(), client_ip, 1);
197         }
198     }
199     if let Some(blacklist) = &globals.blacklist {
200         if blacklist.find(&packet_qname) {
201             #[cfg(feature = "metrics")]
202             globals.varz.client_queries_blocked.inc();
203             return dns::serve_blocked_response(packet.to_vec());
204         }
205     }
206     let tld = dns::qname_tld(&packet_qname);
207     let synthesize_nxdomain = {
208         if globals.ignore_unqualified_hostnames && tld.len() == packet_qname.len() {
209             let (qtype, qclass) = dns::qtype_qclass(packet)?;
210             qtype == dns::DNS_CLASS_INET
211                 && (qclass == dns::DNS_TYPE_A || qclass == dns::DNS_TYPE_AAAA)
212         } else if let Some(undelegated_list) = &globals.undelegated_list {
213             undelegated_list.find(tld)
214         } else {
215             false
216         }
217     };
218     if synthesize_nxdomain {
219         #[cfg(feature = "metrics")]
220         globals.varz.client_queries_rcode_nxdomain.inc();
221         return dns::serve_nxdomain_response(packet.to_vec());
222     }
223     let original_tid = dns::tid(packet);
224     dns::set_tid(&mut packet, 0);
225     dns::normalize_qname(&mut packet)?;
226     let mut hasher = globals.hasher;
227     hasher.write(packet);
228     let packet_hash = hasher.finish128().as_u128();
229     let cached_response = {
230         match globals.cache.lock().get(&packet_hash) {
231             None => None,
232             Some(response) => {
233                 let cached_response = (*response).clone();
234                 Some(cached_response)
235             }
236         }
237     };
238     let cached_response = match cached_response {
239         None => None,
240         Some(mut cached_response) => {
241             if !cached_response.has_expired() {
242                 trace!("Cached");
243                 #[cfg(feature = "metrics")]
244                 globals.varz.client_queries_cached.inc();
245                 cached_response.set_tid(original_tid);
246                 let original_ttl = cached_response.original_ttl();
247                 let mut ttl = cached_response.ttl();
248                 if ttl.saturating_add(globals.client_ttl_holdon) > original_ttl {
249                     ttl = original_ttl;
250                 }
251                 ttl = cmp::max(1, ttl);
252                 let mut response = cached_response.into_response();
253                 dns::set_ttl(&mut response, ttl)?;
254                 dns::recase_qname(&mut response, &packet_qname)?;
255                 return Ok(response);
256             }
257             trace!("Expired");
258             #[cfg(feature = "metrics")]
259             globals.varz.client_queries_expired.inc();
260             Some(cached_response)
261         }
262     };
263     resolve(
264         globals,
265         packet,
266         packet_qname,
267         cached_response,
268         packet_hash,
269         original_tid,
270     )
271     .await
272 }
273