1 /*
2  * This file is part of PowerDNS or dnsdist.
3  * Copyright -- PowerDNS.COM B.V. and its contributors
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of version 2 of the GNU General Public License as
7  * published by the Free Software Foundation.
8  *
9  * In addition, for the avoidance of any doubt, permission is granted to
10  * link this program with OpenSSL and to (re)distribute the binaries
11  * produced as the result of such linking.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21  */
22 
23 #include "dnsdist-healthchecks.hh"
24 #include "dnswriter.hh"
25 #include "dolog.hh"
26 
27 bool g_verboseHealthChecks{false};
28 
updateHealthCheckResult(const std::shared_ptr<DownstreamState> & dss,bool newState)29 void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool newState)
30 {
31   if (newState) {
32     /* check succeeded */
33     dss->currentCheckFailures = 0;
34 
35     if (!dss->upStatus) {
36       /* we were marked as down */
37       dss->consecutiveSuccessfulChecks++;
38       if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
39         /* if we need more than one successful check to rise
40            and we didn't reach the threshold yet,
41            let's stay down */
42         newState = false;
43       }
44     }
45   }
46   else {
47     /* check failed */
48     dss->consecutiveSuccessfulChecks = 0;
49 
50     if (dss->upStatus) {
51       /* we are currently up */
52       dss->currentCheckFailures++;
53       if (dss->currentCheckFailures < dss->maxCheckFailures) {
54         /* we need more than one failure to be marked as down,
55            and we did not reach the threshold yet, let's stay down */
56         newState = true;
57       }
58     }
59   }
60   if(newState != dss->upStatus) {
61     warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
62 
63     if (newState && (!dss->connected || dss->reconnectOnUp)) {
64       newState = dss->reconnect();
65 
66       if (dss->connected && !dss->threadStarted.test_and_set()) {
67         dss->tid = std::thread(responderThread, dss);
68       }
69     }
70 
71     dss->upStatus = newState;
72     dss->currentCheckFailures = 0;
73     dss->consecutiveSuccessfulChecks = 0;
74     if (g_snmpAgent && g_snmpTrapsEnabled) {
75       g_snmpAgent->sendBackendStatusChangeTrap(dss);
76     }
77   }
78 }
79 
handleResponse(std::shared_ptr<HealthCheckData> & data)80 static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
81 {
82   auto& ds = data->d_ds;
83   try {
84     string reply;
85     ComboAddress from;
86     data->d_sock.recvFrom(reply, from);
87 
88     /* we are using a connected socket but hey.. */
89     if (from != ds->remote) {
90       if (g_verboseHealthChecks) {
91         infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
92       }
93       return false;
94     }
95 
96     const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
97 
98     if (reply.size() < sizeof(*responseHeader)) {
99       if (g_verboseHealthChecks) {
100         infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
101       }
102       return false;
103     }
104 
105     if (responseHeader->id != data->d_queryID) {
106       if (g_verboseHealthChecks) {
107         infolog("Invalid health check response id %d from backend %s, expecting %d", data->d_queryID, ds->getNameWithAddr(), data->d_queryID);
108       }
109       return false;
110     }
111 
112     if (!responseHeader->qr) {
113       if (g_verboseHealthChecks) {
114         infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
115       }
116       return false;
117     }
118 
119     if (responseHeader->rcode == RCode::ServFail) {
120       if (g_verboseHealthChecks) {
121         infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
122       }
123       return false;
124     }
125 
126     if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
127       if (g_verboseHealthChecks) {
128         infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
129       }
130       return false;
131     }
132 
133     uint16_t receivedType;
134     uint16_t receivedClass;
135     DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
136 
137     if (receivedName != data->d_checkName || receivedType != data->d_checkType || receivedClass != data->d_checkClass) {
138       if (g_verboseHealthChecks) {
139         infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), data->d_checkName.toLogString(), QType(receivedType).getName(), QType(data->d_checkType).getName(), receivedClass, data->d_checkClass);
140       }
141       return false;
142     }
143   }
144   catch(const std::exception& e)
145   {
146     if (g_verboseHealthChecks) {
147       infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
148     }
149     return false;
150   }
151   catch(...)
152   {
153     if (g_verboseHealthChecks) {
154       infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
155     }
156     return false;
157   }
158 
159   return true;
160 }
161 
healthCheckCallback(int fd,FDMultiplexer::funcparam_t & param)162 static void healthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
163 {
164   auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
165   data->d_mplexer->removeReadFD(fd);
166   updateHealthCheckResult(data->d_ds, handleResponse(data));
167 }
168 
initialHealthCheckCallback(int fd,FDMultiplexer::funcparam_t & param)169 static void initialHealthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
170 {
171   auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
172   data->d_mplexer->removeReadFD(fd);
173   bool up = handleResponse(data);
174   warnlog("Marking downstream %s as '%s'", data->d_ds->getNameWithAddr(), up ? "up" : "down");
175   data->d_ds->upStatus = up;
176 }
177 
queueHealthCheck(std::shared_ptr<FDMultiplexer> & mplexer,const std::shared_ptr<DownstreamState> & ds,bool initialCheck)178 bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck)
179 {
180   try
181   {
182     uint16_t queryID = getRandomDNSID();
183     DNSName checkName = ds->checkName;
184     uint16_t checkType = ds->checkType.getCode();
185     uint16_t checkClass = ds->checkClass;
186     dnsheader checkHeader;
187     memset(&checkHeader, 0, sizeof(checkHeader));
188 
189     checkHeader.qdcount = htons(1);
190     checkHeader.id = queryID;
191 
192     checkHeader.rd = true;
193     if (ds->setCD) {
194       checkHeader.cd = true;
195     }
196 
197     if (ds->checkFunction) {
198       std::lock_guard<std::mutex> lock(g_luamutex);
199       auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
200       checkName = std::get<0>(ret);
201       checkType = std::get<1>(ret);
202       checkClass = std::get<2>(ret);
203     }
204 
205     PacketBuffer packet;
206     GenericDNSPacketWriter<PacketBuffer> dpw(packet, checkName, checkType, checkClass);
207     dnsheader * requestHeader = dpw.getHeader();
208     *requestHeader = checkHeader;
209 
210     if (ds->useProxyProtocol) {
211       auto payload = makeLocalProxyHeader();
212       packet.insert(packet.begin(), payload.begin(), payload.end());
213     }
214 
215     Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
216     sock.setNonBlocking();
217     if (!IsAnyAddress(ds->sourceAddr)) {
218       sock.setReuseAddr();
219       if (!ds->sourceItfName.empty()) {
220 #ifdef SO_BINDTODEVICE
221         int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
222         if (res != 0 && g_verboseHealthChecks) {
223           infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
224         }
225 #endif
226       }
227       sock.bind(ds->sourceAddr);
228     }
229     sock.connect(ds->remote);
230     ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), packet, true);
231     if (sent < 0) {
232       int ret = errno;
233       if (g_verboseHealthChecks)
234         infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
235       return false;
236     }
237 
238     auto data = std::make_shared<HealthCheckData>(mplexer, ds, std::move(sock), std::move(checkName), checkType, checkClass, queryID);
239     struct timeval ttd;
240     gettimeofday(&ttd, nullptr);
241     ttd.tv_sec += ds->checkTimeout / 1000; /* ms to seconds */
242     ttd.tv_usec += (ds->checkTimeout % 1000) * 1000; /* remaining ms to us */
243     if (ttd.tv_usec > 1000000) {
244       ++ttd.tv_sec;
245       ttd.tv_usec -= 1000000;
246     }
247     mplexer->addReadFD(data->d_sock.getHandle(), initialCheck ? &initialHealthCheckCallback : &healthCheckCallback, data, &ttd);
248 
249     return true;
250   }
251   catch(const std::exception& e)
252   {
253     if (g_verboseHealthChecks) {
254       infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
255     }
256     return false;
257   }
258   catch(...)
259   {
260     if (g_verboseHealthChecks) {
261       infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
262     }
263     return false;
264   }
265 }
266 
handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer> & mplexer,bool initial)267 void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial)
268 {
269   while (mplexer->getWatchedFDCount(false) > 0) {
270     struct timeval now;
271     int ret = mplexer->run(&now, 100);
272     if (ret == -1) {
273       if (g_verboseHealthChecks) {
274         infolog("Error while waiting for the health check response from backends: %d", ret);
275       }
276       break;
277     }
278     auto timeouts = mplexer->getTimeouts(now);
279     for (const auto& timeout : timeouts) {
280       mplexer->removeReadFD(timeout.first);
281       auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
282       if (g_verboseHealthChecks) {
283         infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
284       }
285       if (initial) {
286         warnlog("Marking downstream %s as 'down'", data->d_ds->getNameWithAddr());
287         data->d_ds->upStatus = false;
288       }
289       else {
290         updateHealthCheckResult(data->d_ds, false);
291       }
292     }
293   }
294 }
295