1 /*
2  * This file is part of PowerDNS or dnsdist.
3  * Copyright -- PowerDNS.COM B.V. and its contributors
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of version 2 of the GNU General Public License as
7  * published by the Free Software Foundation.
8  *
9  * In addition, for the avoidance of any doubt, permission is granted to
10  * link this program with OpenSSL and to (re)distribute the binaries
11  * produced as the result of such linking.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21  */
22 #pragma once
23 #include <string>
24 #include <deque>
25 #include <queue>
26 #include <vector>
27 #include <thread>
28 #include <pthread.h>
29 #include "threadname.hh"
30 #include <unistd.h>
31 #include "logger.hh"
32 #include "dns.hh"
33 #include "dnsbackend.hh"
34 #include "pdnsexception.hh"
35 #include "arguments.hh"
36 #include <atomic>
37 #include "statbag.hh"
38 
39 extern StatBag S;
40 
41 /** the Distributor template class enables you to multithread slow question/answer
42     processes.
43 
44     Questions are posed to the Distributor, which returns the answer via a callback.
45 
46     The Distributor spawns sufficient backends, and if they thrown an exception,
47     it will cycle the backend but drop the query that was active during the exception.
48 */
49 
50 template<class Answer, class Question, class Backend> class Distributor
51 {
52 public:
53   static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
54   typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
55   virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
56   virtual int getQueueSize() =0; //!< Returns length of question queue
57   virtual bool isOverloaded() =0;
~Distributor()58   virtual ~Distributor() { cerr<<__func__<<endl;}
59 };
60 
61 template<class Answer, class Question, class Backend> class SingleThreadDistributor
62     : public Distributor<Answer, Question, Backend>
63 {
64 public:
65   SingleThreadDistributor(const SingleThreadDistributor&) = delete;
66   void operator=(const SingleThreadDistributor&) = delete;
67   SingleThreadDistributor();
68   typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
69   int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
getQueueSize()70   int getQueueSize() override {
71     return 0;
72   }
73 
isOverloaded()74   bool isOverloaded() override
75   {
76     return false;
77   }
78 
79 private:
80   std::unique_ptr<Backend> b{nullptr};
81 };
82 
83 template<class Answer, class Question, class Backend> class MultiThreadDistributor
84     : public Distributor<Answer, Question, Backend>
85 {
86 public:
87   MultiThreadDistributor(const MultiThreadDistributor&) = delete;
88   void operator=(const MultiThreadDistributor&) = delete;
89   MultiThreadDistributor(int n);
90   typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
91   int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
92   void distribute(int n);
getQueueSize()93   int getQueueSize() override {
94     return d_queued;
95   }
96 
97   struct QuestionData
98   {
QuestionDataMultiThreadDistributor::QuestionData99     QuestionData(const Question& query): Q(query)
100     {
101     }
102 
103     Question Q;
104     callback_t callback;
105     int id;
106   };
107 
isOverloaded()108   bool isOverloaded() override
109   {
110     return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
111   }
112 
113 private:
114   int nextid;
115   time_t d_last_started;
116   unsigned int d_overloadQueueLength, d_maxQueueLength;
117   int d_num_threads;
118   std::atomic<unsigned int> d_queued{0};
119   std::vector<std::pair<int,int>> d_pipes;
120 };
121 
122 //template<class Answer, class Question, class Backend>::nextid;
Create(int n)123 template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
124 {
125     if( n == 1 )
126       return new SingleThreadDistributor<Answer,Question,Backend>();
127     else
128       return new MultiThreadDistributor<Answer,Question,Backend>( n );
129 }
130 
SingleThreadDistributor()131 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
132 {
133   g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
134   try {
135     b=make_unique<Backend>();
136   }
137   catch(const PDNSException &AE) {
138     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
139     _exit(1);
140   }
141   catch(const std::exception& e) {
142     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
143     _exit(1);
144   }
145   catch(...) {
146     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
147     _exit(1);
148   }
149 }
150 
MultiThreadDistributor(int n)151 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
152 {
153   d_num_threads=n;
154   d_overloadQueueLength=::arg().asNum("overload-queue-length");
155   d_maxQueueLength=::arg().asNum("max-queue-length");
156   nextid=0;
157   d_last_started=time(0);
158 
159   for(int i=0; i < n; ++i) {
160     int fds[2];
161     if(pipe(fds) < 0)
162       unixDie("Creating pipe");
163     d_pipes.push_back({fds[0],fds[1]});
164   }
165 
166   if (n<1) {
167     g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
168     _exit(1);
169   }
170 
171   g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
172   for(int i=0;i<n;i++) {
173     std::thread t([=](){distribute(i);});
174     t.detach();
175     Utility::usleep(50000); // we've overloaded mysql in the past :-)
176   }
177   g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
178 }
179 
180 
181 // start of a new thread
distribute(int ournum)182 template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
183 {
184   setThreadName("pdns/distributo");
185 
186   try {
187     std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
188     int queuetimeout=::arg().asNum("queue-limit");
189 
190     for(;;) {
191 
192       QuestionData* tempQD = nullptr;
193       if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
194 	unixDie("read");
195       --d_queued;
196       std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
197       tempQD = nullptr;
198       std::unique_ptr<Answer> a = nullptr;
199 
200       if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
201         S.inc("timedout-packets");
202         continue;
203       }
204 
205       bool allowRetry=true;
206 retry:
207       // this is the only point where we interact with the backend (synchronous)
208       try {
209         if (!b) {
210           allowRetry=false;
211           b=make_unique<Backend>();
212         }
213         a=b->question(QD->Q);
214       }
215       catch(const PDNSException &e) {
216         b.reset();
217         if (!allowRetry) {
218           g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
219           a=QD->Q.replyPacket();
220 
221           a->setRcode(RCode::ServFail);
222           S.inc("servfail-packets");
223           S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
224         } else {
225           g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
226           goto retry;
227         }
228       }
229       catch(...) {
230         b.reset();
231         if (!allowRetry) {
232           g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
233           a=QD->Q.replyPacket();
234 
235           a->setRcode(RCode::ServFail);
236           S.inc("servfail-packets");
237           S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
238         } else {
239           g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
240           goto retry;
241         }
242       }
243 
244       QD->callback(a);
245       QD.reset();
246     }
247 
248     b.reset();
249   }
250   catch(const PDNSException &AE) {
251     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
252     _exit(1);
253   }
254   catch(const std::exception& e) {
255     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
256     _exit(1);
257   }
258   catch(...) {
259     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
260     _exit(1);
261   }
262 }
263 
question(Question & q,callback_t callback)264 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
265 {
266   std::unique_ptr<Answer> a = nullptr;
267   bool allowRetry=true;
268 retry:
269   try {
270     if (!b) {
271       allowRetry=false;
272       b=make_unique<Backend>();
273     }
274     a=b->question(q); // a can be NULL!
275   }
276   catch(const PDNSException &e) {
277     b.reset();
278     if (!allowRetry) {
279       g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
280       a=q.replyPacket();
281 
282       a->setRcode(RCode::ServFail);
283       S.inc("servfail-packets");
284       S.ringAccount("servfail-queries", q.qdomain, q.qtype);
285     } else {
286       g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
287       goto retry;
288     }
289   }
290   catch(...) {
291     b.reset();
292     if (!allowRetry) {
293       g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
294       a=q.replyPacket();
295 
296       a->setRcode(RCode::ServFail);
297       S.inc("servfail-packets");
298       S.ringAccount("servfail-queries", q.qdomain, q.qtype);
299     } else {
300       g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
301       goto retry;
302     }
303   }
304   callback(a);
305   return 0;
306 }
307 
308 struct DistributorFatal{};
309 
question(Question & q,callback_t callback)310 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
311 {
312   // this is passed to other process over pipe and released there
313   auto QD=new QuestionData(q);
314   auto ret = QD->id = nextid++; // might be deleted after write!
315   QD->callback=callback;
316 
317   ++d_queued;
318   if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
319     --d_queued;
320     delete QD;
321     unixDie("write");
322   }
323 
324   if(d_queued > d_maxQueueLength) {
325     g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
326     // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
327     throw DistributorFatal();
328   }
329 
330   return ret;
331 }
332