1 /*
2  * Copyright (c) 2008-2020, OARC, Inc.
3  * Copyright (c) 2007-2008, Internet Systems Consortium, Inc.
4  * Copyright (c) 2003-2007, The Measurement Factory, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  *
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  *
19  * 3. Neither the name of the copyright holder nor the names of its
20  *    contributors may be used to endorse or promote products derived
21  *    from this software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "config.h"
38 
39 #include "response_time_index.h"
40 #include "hashtbl.h"
41 #include "inX_addr.h"
42 #include "xmalloc.h"
43 #include "syslog_debug.h"
44 #include "pcap.h"
45 #include "compat.h"
46 
47 #include <math.h>
48 #include <assert.h>
49 
50 #define TIMED_OUT 0
51 #define MISSING_QUERY 1
52 #define DROPPED_QUERY 2
53 #define INTERNAL_ERROR 3
54 #define FIRST_BUCKET 4
55 
56 struct query;
57 struct query {
58     struct query *    prev, *next;
59     transport_message tm;
60     dns_message       m;
61 };
62 
63 #define MAX_ARRAY_SZ 65536
64 static hashtbl*                        theHash      = 0;
65 static enum response_time_mode         mode         = response_time_log10;
66 static time_t                          max_sec      = 5;
67 static enum response_time_max_sec_mode max_sec_mode = response_time_ceil;
68 static unsigned int                    bucket_size  = 100;
69 static size_t                          max_queries = 1000000, num_queries = 0;
70 static struct query *                  qfirst = 0, *qlast = 0;
71 static int                             max_iter = INTERNAL_ERROR, next_iter, flushing = 0;
72 static enum response_time_full_mode    full_mode = response_time_drop_query;
73 
response_time_set_mode(enum response_time_mode m)74 void response_time_set_mode(enum response_time_mode m)
75 {
76     mode = m;
77 }
78 
response_time_set_max_sec(time_t s)79 void response_time_set_max_sec(time_t s)
80 {
81     max_sec = s;
82 }
83 
response_time_set_max_sec_mode(enum response_time_max_sec_mode m)84 void response_time_set_max_sec_mode(enum response_time_max_sec_mode m)
85 {
86     max_sec_mode = m;
87 }
88 
response_time_set_bucket_size(unsigned int s)89 void response_time_set_bucket_size(unsigned int s)
90 {
91     bucket_size = s;
92 }
93 
response_time_set_max_queries(size_t q)94 void response_time_set_max_queries(size_t q)
95 {
96     max_queries = q;
97 }
98 
response_time_set_full_mode(enum response_time_full_mode m)99 void response_time_set_full_mode(enum response_time_full_mode m)
100 {
101     full_mode = m;
102 }
103 
_hash(const struct query * q)104 static unsigned int _hash(const struct query* q)
105 {
106     if (q->m.qr)
107         return inXaddr_hash(&q->tm.dst_ip_addr) ^ ((q->tm.dst_port & 0xffff) | (q->m.id << 16));
108     return inXaddr_hash(&q->tm.src_ip_addr) ^ ((q->tm.src_port & 0xffff) | (q->m.id << 16));
109 }
110 
_cmp(const struct query * a,const struct query * b)111 static int _cmp(const struct query* a, const struct query* b)
112 {
113     int cmp;
114 
115     // compare DNS ID
116     if (a->m.id != b->m.id)
117         return a->m.id < b->m.id ? -1 : 1;
118 
119     // compare IP version, since inXaddr_cmp() does not, and protocol
120     if (a->tm.ip_version != b->tm.ip_version)
121         return a->tm.ip_version < b->tm.ip_version ? -1 : 1;
122     if (a->tm.proto != b->tm.proto)
123         return a->tm.proto < b->tm.proto ? -1 : 1;
124 
125     // compare client IP&port first since other should be server and more static data
126     if (!a->m.qr && !b->m.qr) {
127         // both are queries, compare source address first as that is the client
128         if ((cmp = inXaddr_cmp(&a->tm.src_ip_addr, &b->tm.src_ip_addr)))
129             return cmp;
130         if (a->tm.src_port != b->tm.src_port)
131             return a->tm.src_port < b->tm.src_port ? -1 : 1;
132         if ((cmp = inXaddr_cmp(&a->tm.dst_ip_addr, &b->tm.dst_ip_addr)))
133             return cmp;
134         if (a->tm.dst_port != b->tm.dst_port)
135             return a->tm.dst_port < b->tm.dst_port ? -1 : 1;
136     } else if (a->m.qr && b->m.qr) {
137         // both are responses, compare destination address first as that is the client
138         if ((cmp = inXaddr_cmp(&a->tm.dst_ip_addr, &b->tm.dst_ip_addr)))
139             return cmp;
140         if (a->tm.dst_port != b->tm.dst_port)
141             return a->tm.dst_port < b->tm.dst_port ? -1 : 1;
142         if ((cmp = inXaddr_cmp(&a->tm.src_ip_addr, &b->tm.src_ip_addr)))
143             return cmp;
144         if (a->tm.src_port != b->tm.src_port)
145             return a->tm.src_port < b->tm.src_port ? -1 : 1;
146     } else if (a->m.qr && !b->m.qr) {
147         // a is a response and b is a query, compare a's destination with b's source first
148         if ((cmp = inXaddr_cmp(&a->tm.dst_ip_addr, &b->tm.src_ip_addr)))
149             return cmp;
150         if (a->tm.dst_port != b->tm.src_port)
151             return a->tm.dst_port < b->tm.src_port ? -1 : 1;
152         if ((cmp = inXaddr_cmp(&a->tm.src_ip_addr, &b->tm.dst_ip_addr)))
153             return cmp;
154         if (a->tm.src_port != b->tm.dst_port)
155             return a->tm.src_port < b->tm.dst_port ? -1 : 1;
156     } else {
157         // a is a query and b is a response, compare a's source with b's destination first
158         if ((cmp = inXaddr_cmp(&a->tm.src_ip_addr, &b->tm.dst_ip_addr)))
159             return cmp;
160         if (a->tm.src_port != b->tm.dst_port)
161             return a->tm.src_port < b->tm.dst_port ? -1 : 1;
162         if ((cmp = inXaddr_cmp(&a->tm.dst_ip_addr, &b->tm.src_ip_addr)))
163             return cmp;
164         if (a->tm.dst_port != b->tm.src_port)
165             return a->tm.dst_port < b->tm.src_port ? -1 : 1;
166     }
167 
168     return 0;
169 }
170 
response_time_indexer(const dns_message * m)171 int response_time_indexer(const dns_message* m)
172 {
173     struct query       q, *obj;
174     transport_message* tm  = m->tm;
175     int                ret = -1;
176 
177     if (flushing) {
178         dfprintf(1, "response_time: flushing %u %s", m->id, m->qname);
179         return TIMED_OUT;
180     }
181 
182     if (m->malformed) {
183         return -1;
184     }
185 
186     dfprintf(1, "response_time: %s %u %s", m->qr ? "response" : "query", m->id, m->qname);
187 
188     if (!theHash) {
189         theHash = hash_create(MAX_ARRAY_SZ, (hashfunc*)_hash, (hashkeycmp*)_cmp, 0, 0, 0);
190         if (!theHash)
191             return INTERNAL_ERROR;
192     }
193 
194     q.m     = *m;
195     q.tm    = *tm;
196     q.m.tm  = &q.tm;
197     q.m.tld = 0;
198 
199     obj = hash_find(&q, theHash);
200 
201     if (m->qr) {
202         struct timeval diff;
203         unsigned long  us;
204         int            iter;
205 
206         if (!obj) {
207             // got a response without a query,
208             dfprint(1, "response_time: missing query for response");
209             return MISSING_QUERY;
210         }
211 
212         // TODO: compare more?
213         // - qclass/qtype, qname
214 
215         // found query, remove and calculate index
216         if (obj->prev)
217             obj->prev->next = obj->next;
218         if (obj->next)
219             obj->next->prev = obj->prev;
220         if (obj == qfirst)
221             qfirst = obj->next;
222         if (obj == qlast)
223             qlast = obj->prev;
224         hash_remove(obj, theHash);
225         num_queries--;
226 
227         q      = *obj;
228         q.m.tm = &q.tm;
229         xfree(obj);
230 
231         diff.tv_sec  = tm->ts.tv_sec - q.tm.ts.tv_sec;
232         diff.tv_usec = tm->ts.tv_usec - q.tm.ts.tv_usec;
233         if (diff.tv_usec >= 1000000) {
234             diff.tv_sec += 1;
235             diff.tv_usec -= 1000000;
236         } else if (diff.tv_usec < 0) {
237             diff.tv_sec -= 1;
238             diff.tv_usec += 1000000;
239         }
240 
241         if (diff.tv_sec < 0 || diff.tv_usec < 0) {
242             dfprintf(1, "response_time: bad diff " PRItime ", " PRItime " - " PRItime, diff.tv_sec, diff.tv_usec, q.tm.ts.tv_sec, q.tm.ts.tv_usec, tm->ts.tv_sec, tm->ts.tv_usec);
243             return INTERNAL_ERROR;
244         }
245         if (diff.tv_sec >= max_sec) {
246             switch (max_sec_mode) {
247             case response_time_ceil:
248                 dfprintf(2, "response_time: diff " PRItime " ceiled to " PRItime, diff.tv_sec, diff.tv_usec, max_sec, 0L);
249                 diff.tv_sec  = max_sec;
250                 diff.tv_usec = 0;
251                 break;
252             case response_time_timed_out:
253                 dfprintf(1, "response_time: diff " PRItime " too old, timed out", diff.tv_sec, diff.tv_usec);
254                 return TIMED_OUT;
255             default:
256                 dfprint(1, "response_time: bad max_sec_mode");
257                 return INTERNAL_ERROR;
258             }
259         }
260 
261         us = (diff.tv_sec * 1000000) + diff.tv_usec;
262         switch (mode) {
263         case response_time_bucket:
264             iter = FIRST_BUCKET + (us / bucket_size);
265             dfprintf(2, "response_time: found q/r us:%lu, put in bucket %d (%lu-%lu usec)", us, iter, (us / bucket_size) * bucket_size, ((us / bucket_size) + 1) * bucket_size);
266             break;
267         case response_time_log10: {
268             double d = log10((double)us);
269             if (d < 0) {
270                 dfprintf(1, "response_time: bad log10(%lu) ret %f", us, d);
271                 return INTERNAL_ERROR;
272             }
273             iter = FIRST_BUCKET + (int)d;
274             dfprintf(2, "response_time: found q/r us:%lu, log10 %d (%.0f-%.0f usec)", us, iter, pow(10, (int)d), pow(10, (int)d + 1));
275             break;
276         }
277         case response_time_log2: {
278             double d = log2((double)us);
279             if (d < 0) {
280                 dfprintf(1, "response_time: bad log2(%lu) ret %f", us, d);
281                 return INTERNAL_ERROR;
282             }
283             iter = FIRST_BUCKET + (int)d;
284             dfprintf(2, "response_time: found q/r us:%lu, log2 %d (%.0f-%.0f usec)", us, iter, pow(2, (int)d), pow(2, (int)d + 1));
285             break;
286         }
287         default:
288             dfprint(1, "response_time: bad mode");
289             return INTERNAL_ERROR;
290         }
291 
292         if (iter > max_iter)
293             max_iter = iter;
294         return iter;
295     }
296 
297     if (obj) {
298         // Found another query in the hash so the old one have timed out,
299         // reuse the obj for the new query
300         obj->tm.ts = tm->ts;
301         if (obj != qlast) {
302             if (obj->prev)
303                 obj->prev->next = obj->next;
304             if (obj->next) {
305                 if (obj == qfirst)
306                     qfirst = obj->next;
307                 obj->next->prev = obj->prev;
308             }
309             obj->prev = qlast;
310             obj->next = 0;
311             assert(qlast);
312             qlast->next = obj;
313             qlast       = obj;
314         }
315         dfprintf(1, "response_time: reuse %p, timed out", obj);
316         return TIMED_OUT;
317     }
318 
319     if (num_queries >= max_queries) {
320         // We're at max, see if we can time out the oldest query
321         ret = TIMED_OUT;
322         assert(qfirst);
323         if (tm->ts.tv_sec - qfirst->tm.ts.tv_sec < max_sec) {
324             // no, so what to do?
325             switch (full_mode) {
326             case response_time_drop_query:
327                 dfprint(1, "response_time: full and oldest not old enough");
328                 return DROPPED_QUERY;
329             case response_time_drop_oldest:
330                 ret = DROPPED_QUERY;
331                 dfprint(2, "response_time: full and dropping oldest");
332                 break;
333             default:
334                 dfprint(1, "response_time: bad full_mode");
335                 return INTERNAL_ERROR;
336             }
337         }
338 
339         // remove oldest obj from hash and reuse it
340         obj    = qfirst;
341         qfirst = obj->next;
342         if (qfirst)
343             qfirst->prev = 0;
344         hash_remove(obj, theHash);
345         num_queries--;
346         dfprintf(1, "response_time: reuse %p, too old", obj);
347     } else {
348         obj = xcalloc(1, sizeof(*obj));
349         if (!obj) {
350             dfprint(1, "response_time: failed to alloc obj");
351             return INTERNAL_ERROR;
352         }
353     }
354 
355     *obj      = q;
356     obj->m.tm = &obj->tm;
357     if (hash_add(obj, obj, theHash)) {
358         xfree(obj);
359         dfprint(1, "response_time: failed to add to hash");
360         return INTERNAL_ERROR;
361     }
362 
363     obj->prev = qlast;
364     obj->next = 0;
365     if (qlast)
366         qlast->next = obj;
367     qlast = obj;
368     if (!qfirst)
369         qfirst = obj;
370     num_queries++;
371     dfprintf(2, "response_time: add %p, %zu/%zu queries", obj, num_queries, max_queries);
372 
373     return ret;
374 }
375 
response_time_iterator(const char ** label)376 int response_time_iterator(const char** label)
377 {
378     char label_buf[128];
379 
380     if (!label) {
381         next_iter = 0;
382         return max_iter + 1;
383     }
384     if (next_iter > max_iter) {
385         return -1;
386     }
387 
388     if (next_iter < FIRST_BUCKET) {
389         switch (next_iter) {
390         case TIMED_OUT:
391             *label = "timeouts";
392             break;
393         case MISSING_QUERY:
394             *label = "missing_queries";
395             break;
396         case DROPPED_QUERY:
397             *label = "dropped_queries";
398             break;
399         case INTERNAL_ERROR:
400             *label = "internal_errors";
401             break;
402         default:
403             return -1;
404         }
405     } else {
406         switch (mode) {
407         case response_time_bucket:
408             snprintf(label_buf, 128, "%d-%d", (next_iter - FIRST_BUCKET) * bucket_size, (next_iter - FIRST_BUCKET + 1) * bucket_size);
409             break;
410         case response_time_log10:
411             snprintf(label_buf, 128, "%.0f-%.0f", pow(10, next_iter - FIRST_BUCKET), pow(10, next_iter - FIRST_BUCKET + 1));
412             break;
413         case response_time_log2:
414             snprintf(label_buf, 128, "%.0f-%.0f", pow(2, next_iter - FIRST_BUCKET), pow(2, next_iter - FIRST_BUCKET + 1));
415             break;
416         default:
417             return -1;
418         }
419         *label = label_buf;
420     }
421 
422     return next_iter++;
423 }
424 
response_time_reset()425 void response_time_reset()
426 {
427     max_iter = INTERNAL_ERROR;
428 }
429 
430 static struct query* flushed_obj = 0;
431 
response_time_flush(enum flush_mode fm)432 const dns_message* response_time_flush(enum flush_mode fm)
433 {
434     switch (fm) {
435     case flush_get:
436         if (qfirst && last_ts.tv_sec - qfirst->tm.ts.tv_sec >= max_sec) {
437             dfprintf(2, "response_time: flush_get old %p, new %p", flushed_obj, qfirst);
438 
439             if (flushed_obj)
440                 xfree(flushed_obj);
441 
442             flushed_obj = qfirst;
443             qfirst      = flushed_obj->next;
444             if (qfirst)
445                 qfirst->prev = 0;
446             if (flushed_obj == qlast)
447                 qlast = 0;
448             hash_remove(flushed_obj, theHash);
449             num_queries--;
450             return &flushed_obj->m;
451         }
452         break;
453     case flush_on:
454         dfprintf(2, "response_time: flush_on %p", flushed_obj);
455         flushing = 1;
456         break;
457     case flush_off:
458         dfprintf(2, "response_time: flush_off %p", flushed_obj);
459         if (flushed_obj) {
460             xfree(flushed_obj);
461             flushed_obj = 0;
462         }
463         flushing = 0;
464         break;
465     default:
466         break;
467     }
468 
469     return 0;
470 }
471