1 use crate::cache::*;
2 use crate::dns::{self, *};
3 use crate::errors::*;
4 use crate::globals::*;
5 use crate::ClientCtx;
6
7 use byteorder::{BigEndian, ByteOrder};
8 use rand::prelude::*;
9 use siphasher::sip128::Hasher128;
10 use std::cmp;
11 use std::hash::Hasher;
12 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
13 use tokio::io::{AsyncReadExt, AsyncWriteExt};
14 use tokio::net::{TcpSocket, UdpSocket};
15
resolve_udp( globals: &Globals, mut packet: &mut Vec<u8>, packet_qname: &[u8], tid: u16, has_cached_response: bool, ) -> Result<Vec<u8>, Error>16 pub async fn resolve_udp(
17 globals: &Globals,
18 mut packet: &mut Vec<u8>,
19 packet_qname: &[u8],
20 tid: u16,
21 has_cached_response: bool,
22 ) -> Result<Vec<u8>, Error> {
23 let ext_socket = match globals.external_addr {
24 Some(x) => UdpSocket::bind(x).await?,
25 None => match globals.upstream_addr {
26 SocketAddr::V4(_) => {
27 UdpSocket::bind(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
28 .await?
29 }
30 SocketAddr::V6(s) => {
31 UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
32 Ipv6Addr::UNSPECIFIED,
33 0,
34 s.flowinfo(),
35 s.scope_id(),
36 )))
37 .await?
38 }
39 },
40 };
41 ext_socket.connect(globals.upstream_addr).await?;
42 dns::set_edns_max_payload_size(&mut packet, DNS_MAX_PACKET_SIZE as u16)?;
43 let mut response;
44 let timeout = if has_cached_response {
45 globals.udp_timeout / 2
46 } else {
47 globals.udp_timeout
48 };
49 loop {
50 ext_socket.send(packet).await?;
51 response = vec![0u8; DNS_MAX_PACKET_SIZE];
52 dns::set_rcode_servfail(&mut response);
53 let fut = tokio::time::timeout(timeout, ext_socket.recv_from(&mut response[..]));
54 match fut.await {
55 Ok(Ok((response_len, response_addr))) => {
56 response.truncate(response_len);
57 if response_addr == globals.upstream_addr
58 && response_len >= DNS_HEADER_SIZE
59 && dns::tid(&response) == tid
60 && packet_qname.eq_ignore_ascii_case(dns::qname(&response)?.as_slice())
61 {
62 break;
63 }
64 }
65 _ => {
66 if has_cached_response {
67 trace!("Timeout, but cached response is present");
68 break;
69 }
70 trace!("Timeout, no cached response");
71 }
72 }
73 }
74 Ok(response)
75 }
76
resolve_tcp( globals: &Globals, packet: &mut Vec<u8>, packet_qname: &[u8], tid: u16, ) -> Result<Vec<u8>, Error>77 pub async fn resolve_tcp(
78 globals: &Globals,
79 packet: &mut Vec<u8>,
80 packet_qname: &[u8],
81 tid: u16,
82 ) -> Result<Vec<u8>, Error> {
83 let socket = match globals.external_addr {
84 Some(x @ SocketAddr::V4(_)) => {
85 let socket = TcpSocket::new_v4()?;
86 socket.set_reuseaddr(true).ok();
87 socket.bind(x)?;
88 socket
89 }
90 Some(x @ SocketAddr::V6(_)) => {
91 let socket = TcpSocket::new_v6()?;
92 socket.set_reuseaddr(true).ok();
93 socket.bind(x)?;
94 socket
95 }
96 None => match globals.upstream_addr {
97 SocketAddr::V4(_) => TcpSocket::new_v4()?,
98 SocketAddr::V6(_) => TcpSocket::new_v6()?,
99 },
100 };
101 let mut ext_socket = socket.connect(globals.upstream_addr).await?;
102 ext_socket.set_nodelay(true)?;
103 let mut binlen = [0u8, 0];
104 BigEndian::write_u16(&mut binlen[..], packet.len() as u16);
105 ext_socket.write_all(&binlen).await?;
106 ext_socket.write_all(packet).await?;
107 ext_socket.flush().await?;
108 ext_socket.read_exact(&mut binlen).await?;
109 let response_len = BigEndian::read_u16(&binlen) as usize;
110 ensure!(
111 (DNS_HEADER_SIZE..=DNS_MAX_PACKET_SIZE).contains(&response_len),
112 "Unexpected response size"
113 );
114 let mut response = vec![0u8; response_len];
115 ext_socket.read_exact(&mut response).await?;
116 ensure!(dns::tid(&response) == tid, "Unexpected transaction ID");
117 ensure!(
118 packet_qname.eq_ignore_ascii_case(dns::qname(&response)?.as_slice()),
119 "Unexpected query name in the response"
120 );
121 Ok(response)
122 }
123
resolve( globals: &Globals, mut packet: &mut Vec<u8>, packet_qname: Vec<u8>, cached_response: Option<CachedResponse>, packet_hash: u128, original_tid: u16, ) -> Result<Vec<u8>, Error>124 pub async fn resolve(
125 globals: &Globals,
126 mut packet: &mut Vec<u8>,
127 packet_qname: Vec<u8>,
128 cached_response: Option<CachedResponse>,
129 packet_hash: u128,
130 original_tid: u16,
131 ) -> Result<Vec<u8>, Error> {
132 #[cfg(feature = "metrics")]
133 globals.varz.upstream_sent.inc();
134 let tid = random();
135 dns::set_tid(&mut packet, tid);
136 let mut response = resolve_udp(
137 globals,
138 packet,
139 &packet_qname,
140 tid,
141 cached_response.is_some(),
142 )
143 .await?;
144 if dns::is_truncated(&response) {
145 response = resolve_tcp(globals, packet, &packet_qname, tid).await?;
146 }
147 #[cfg(feature = "metrics")]
148 {
149 globals.varz.upstream_received.inc();
150 if dns::rcode_nxdomain(&response) {
151 globals.varz.upstream_rcode_nxdomain.inc();
152 }
153 }
154 if dns::rcode_servfail(&response) || dns::rcode_refused(&response) {
155 trace!("SERVFAIL/REFUSED: {}", dns::rcode(&response));
156 if let Some(cached_response) = cached_response {
157 trace!("Serving stale");
158 #[cfg(feature = "metrics")]
159 {
160 globals.varz.client_queries_offline.inc();
161 globals.varz.client_queries_cached.inc();
162 }
163 return Ok(cached_response.into_response());
164 } else {
165 #[cfg(feature = "metrics")]
166 globals.varz.upstream_errors.inc();
167 }
168 } else {
169 trace!("Adding to cache");
170 let cached_response = CachedResponse::new(&globals.cache, response.clone());
171 globals.cache.lock().insert(packet_hash, cached_response);
172 }
173 dns::set_tid(&mut response, original_tid);
174 dns::recase_qname(&mut response, &packet_qname)?;
175 #[cfg(feature = "metrics")]
176 globals
177 .varz
178 .upstream_response_sizes
179 .observe(response.len() as f64);
180 Ok(response)
181 }
182
get_cached_response_or_resolve( globals: &Globals, client_ctx: &ClientCtx, mut packet: &mut Vec<u8>, ) -> Result<Vec<u8>, Error>183 pub async fn get_cached_response_or_resolve(
184 globals: &Globals,
185 client_ctx: &ClientCtx,
186 mut packet: &mut Vec<u8>,
187 ) -> Result<Vec<u8>, Error> {
188 let packet_qname = dns::qname(packet)?;
189 if let Some(my_ip) = &globals.my_ip {
190 if &packet_qname.to_ascii_lowercase() == my_ip {
191 let client_ip = match client_ctx {
192 ClientCtx::Udp(u) => u.client_addr,
193 ClientCtx::Tcp(t) => t.client_connection.peer_addr()?,
194 }
195 .ip();
196 return serve_ip_response(packet.to_vec(), client_ip, 1);
197 }
198 }
199 if let Some(blacklist) = &globals.blacklist {
200 if blacklist.find(&packet_qname) {
201 #[cfg(feature = "metrics")]
202 globals.varz.client_queries_blocked.inc();
203 return dns::serve_blocked_response(packet.to_vec());
204 }
205 }
206 let tld = dns::qname_tld(&packet_qname);
207 let synthesize_nxdomain = {
208 if globals.ignore_unqualified_hostnames && tld.len() == packet_qname.len() {
209 let (qtype, qclass) = dns::qtype_qclass(packet)?;
210 qtype == dns::DNS_CLASS_INET
211 && (qclass == dns::DNS_TYPE_A || qclass == dns::DNS_TYPE_AAAA)
212 } else if let Some(undelegated_list) = &globals.undelegated_list {
213 undelegated_list.find(tld)
214 } else {
215 false
216 }
217 };
218 if synthesize_nxdomain {
219 #[cfg(feature = "metrics")]
220 globals.varz.client_queries_rcode_nxdomain.inc();
221 return dns::serve_nxdomain_response(packet.to_vec());
222 }
223 let original_tid = dns::tid(packet);
224 dns::set_tid(&mut packet, 0);
225 dns::normalize_qname(&mut packet)?;
226 let mut hasher = globals.hasher;
227 hasher.write(packet);
228 let packet_hash = hasher.finish128().as_u128();
229 let cached_response = {
230 match globals.cache.lock().get(&packet_hash) {
231 None => None,
232 Some(response) => {
233 let cached_response = (*response).clone();
234 Some(cached_response)
235 }
236 }
237 };
238 let cached_response = match cached_response {
239 None => None,
240 Some(mut cached_response) => {
241 if !cached_response.has_expired() {
242 trace!("Cached");
243 #[cfg(feature = "metrics")]
244 globals.varz.client_queries_cached.inc();
245 cached_response.set_tid(original_tid);
246 let original_ttl = cached_response.original_ttl();
247 let mut ttl = cached_response.ttl();
248 if ttl.saturating_add(globals.client_ttl_holdon) > original_ttl {
249 ttl = original_ttl;
250 }
251 ttl = cmp::max(1, ttl);
252 let mut response = cached_response.into_response();
253 dns::set_ttl(&mut response, ttl)?;
254 dns::recase_qname(&mut response, &packet_qname)?;
255 return Ok(response);
256 }
257 trace!("Expired");
258 #[cfg(feature = "metrics")]
259 globals.varz.client_queries_expired.inc();
260 Some(cached_response)
261 }
262 };
263 resolve(
264 globals,
265 packet,
266 packet_qname,
267 cached_response,
268 packet_hash,
269 original_tid,
270 )
271 .await
272 }
273