1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_hooks.h"
26 #include "sql_common.h"
27 #include "sql_common_m.h"
28 #include "sqlite3_plugin.h"
29 
30 char sqlite3_db[] = "/tmp/pmacct.db";
31 char sqlite3_table[] = "acct";
32 char sqlite3_table_v2[] = "acct_v2";
33 char sqlite3_table_v3[] = "acct_v3";
34 char sqlite3_table_v4[] = "acct_v4";
35 char sqlite3_table_v5[] = "acct_v5";
36 char sqlite3_table_v6[] = "acct_v6";
37 char sqlite3_table_v7[] = "acct_v7";
38 char sqlite3_table_v8[] = "acct_v8";
39 char sqlite3_table_bgp[] = "acct_bgp";
40 
41 /* Functions */
sqlite3_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)42 void sqlite3_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
43 {
44   struct pkt_data *data;
45   struct ports_table pt;
46   struct pollfd pfd;
47   struct insert_data idata;
48   time_t refresh_deadline;
49   int refresh_timeout;
50   int ret, num, recv_budget, poll_bypass;
51   struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
52   struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
53   int datasize = ((struct channels_list_entry *)ptr)->datasize;
54   u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
55   pid_t core_pid = ((struct channels_list_entry *)ptr)->core_pid;
56   struct networks_file_data nfd;
57   unsigned char *dataptr;
58 
59   unsigned char *rgptr;
60   int pollagain = TRUE;
61   u_int32_t seq = 1, rg_err_count = 0;
62 
63   struct extra_primitives extras;
64   struct primitives_ptrs prim_ptrs;
65 
66 #ifdef WITH_ZMQ
67   struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
68 #else
69   void *zmq_host = NULL;
70 #endif
71 
72 #ifdef WITH_REDIS
73   struct p_redis_host redis_host;
74 #endif
75 
76   memcpy(&config, cfgptr, sizeof(struct configuration));
77   memcpy(&extras, &((struct channels_list_entry *)ptr)->extras, sizeof(struct extra_primitives));
78   recollect_pipe_memory(ptr);
79   pm_setproctitle("%s [%s]", "SQLite3 Plugin", config.name);
80 
81   memset(&idata, 0, sizeof(idata));
82   if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
83   if (config.logfile) {
84     fclose(config.logfile_fd);
85     config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
86   }
87 
88   sql_set_signals();
89   sql_init_default_values(&extras);
90   SQLI_init_default_values(&idata);
91   SQLI_set_callbacks(&sqlfunc_cbr);
92   sql_set_insert_func();
93 
94   /* some LOCAL initialization AFTER setting some default values */
95   reload_map = FALSE;
96   idata.now = time(NULL);
97   refresh_deadline = idata.now;
98   idata.cfg = &config;
99 
100   sql_init_maps(&extras, &prim_ptrs, &nt, &nc, &pt);
101   sql_init_global_buffers();
102   sql_init_historical_acct(idata.now, &idata);
103   sql_init_triggers(idata.now, &idata);
104   sql_init_refresh_deadline(&refresh_deadline);
105 
106   if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
107   else setnonblocking(pipe_fd);
108 
109   /* setting number of entries in _protocols structure */
110   while (_protocols[protocols_number].number != -1) protocols_number++;
111 
112   /* building up static SQL clauses */
113   idata.num_primitives = SQLI_compose_static_queries();
114   glob_num_primitives = idata.num_primitives;
115 
116   /* setting up environment variables */
117   SQL_SetENV();
118 
119   sql_link_backend_descriptors(&bed, &p, &b);
120 
121 #ifdef WITH_REDIS
122   if (config.redis_host) {
123     char log_id[SHORTBUFLEN];
124 
125     snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
126     p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
127   }
128 #endif
129 
130   /* plugin main loop */
131   for(;;) {
132     poll_again:
133     status->wakeup = TRUE;
134     poll_bypass = FALSE;
135     calc_refresh_timeout(refresh_deadline, idata.now, &refresh_timeout);
136 
137     pfd.fd = pipe_fd;
138     pfd.events = POLLIN;
139     ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
140 
141     if (ret <= 0) {
142       if (getppid() != core_pid) {
143         Log(LOG_ERR, "ERROR ( %s/%s ): Core process *seems* gone. Exiting.\n", config.name, config.type);
144         exit_gracefully(1);
145       }
146 
147       if (ret < 0) goto poll_again;
148     }
149 
150     poll_ops:
151     idata.now = time(NULL);
152 
153     if (config.sql_history) {
154       while (idata.now > (idata.basetime + idata.timeslot)) {
155         time_t saved_basetime = idata.basetime;
156 
157         idata.basetime += idata.timeslot;
158         if (config.sql_history == COUNT_MONTHLY)
159           idata.timeslot = calc_monthly_timeslot(idata.basetime, config.sql_history_howmany, ADD);
160         glob_basetime = idata.basetime;
161         idata.new_basetime = saved_basetime;
162         glob_new_basetime = saved_basetime;
163       }
164     }
165 
166     if (idata.now > refresh_deadline) {
167       if (qq_ptr) sql_cache_flush(sql_queries_queue, qq_ptr, &idata, FALSE);
168       sql_cache_handle_flush_event(&idata, &refresh_deadline, &pt);
169     }
170     else {
171       if (config.sql_trigger_exec) {
172         while (idata.now > idata.triggertime && idata.t_timeslot > 0) {
173           sql_trigger_exec(config.sql_trigger_exec);
174           idata.triggertime += idata.t_timeslot;
175           if (config.sql_trigger_time == COUNT_MONTHLY)
176             idata.t_timeslot = calc_monthly_timeslot(idata.triggertime, config.sql_trigger_time_howmany, ADD);
177         }
178       }
179     }
180 
181     recv_budget = 0;
182     if (poll_bypass) {
183       poll_bypass = FALSE;
184       goto read_data;
185     }
186 
187     switch (ret) {
188     case 0: /* timeout */
189       break;
190     default: /* we received data */
191       read_data:
192       if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
193 	poll_bypass = TRUE;
194 	goto poll_ops;
195       }
196 
197       if (config.pipe_homegrown) {
198         if (!pollagain) {
199           seq++;
200           seq %= MAX_SEQNUM;
201 	  if (seq == 0) rg_err_count = FALSE;
202 	  idata.now = time(NULL);
203         }
204         else {
205           if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
206 	    exit_gracefully(1); /* we exit silently; something happened at the write end */
207         }
208 
209         if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
210 
211         if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
212 	  if (!pollagain) {
213 	    pollagain = TRUE;
214 	    goto poll_again;
215           }
216 	  else {
217 	    rg_err_count++;
218 	    if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
219               Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
220                         config.name, config.type, config.buffer_size, config.pipe_size);
221               Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
222                         config.name, config.type);
223 	    }
224 
225 	    rg->ptr = (rg->base + status->last_buf_off);
226             seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
227 	  }
228         }
229 
230         pollagain = FALSE;
231         memcpy(pipebuf, rg->ptr, bufsz);
232         rg->ptr += bufsz;
233       }
234 #ifdef WITH_ZMQ
235       else if (config.pipe_zmq) {
236 	ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
237 	if (ret > 0) {
238 	  if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
239 	    Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
240 		config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
241 	  }
242 
243 	  seq = ((struct ch_buf_hdr *)pipebuf)->seq;
244 	}
245 	else goto poll_again;
246       }
247 #endif
248 
249       data = (struct pkt_data *) (pipebuf+sizeof(struct ch_buf_hdr));
250 
251       if (config.debug_internal_msg)
252         Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
253                 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
254                 ((struct ch_buf_hdr *)pipebuf)->num);
255 
256       while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
257         for (num = 0; primptrs_funcs[num]; num++)
258           (*primptrs_funcs[num])((u_char *)data, &extras, &prim_ptrs);
259 
260 	for (num = 0; net_funcs[num]; num++)
261 	  (*net_funcs[num])(&nt, &nc, &data->primitives, prim_ptrs.pbgp, &nfd);
262 
263 	if (config.ports_file) {
264 	  if (!pt.table[data->primitives.src_port]) data->primitives.src_port = 0;
265 	  if (!pt.table[data->primitives.dst_port]) data->primitives.dst_port = 0;
266 	}
267 
268         prim_ptrs.data = data;
269         (*insert_func)(&prim_ptrs, &idata);
270 
271         ((struct ch_buf_hdr *)pipebuf)->num--;
272         if (((struct ch_buf_hdr *)pipebuf)->num) {
273           dataptr = (unsigned char *) data;
274           if (!prim_ptrs.vlen_next_off) dataptr += datasize;
275           else dataptr += prim_ptrs.vlen_next_off;
276           data = (struct pkt_data *) dataptr;
277         }
278       }
279 
280       recv_budget++;
281       goto read_data;
282     }
283   }
284 }
285 
SQLI_cache_dbop(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)286 int SQLI_cache_dbop(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
287 {
288   char *ptr_values, *ptr_where, *ptr_mv, *ptr_set;
289   int num=0, num_set=0, ret=0, have_flows=0, len=0;
290 
291   if (idata->mv.last_queue_elem) {
292     ret = sqlite3_exec(db->desc, multi_values_buffer, NULL, NULL, NULL);
293     Log(LOG_DEBUG, "DEBUG ( %s/%s ): %d INSERT statements sent to the SQLite database.\n",
294                 config.name, config.type, idata->mv.buffer_elem_num);
295     if (ret) goto signal_error;
296     idata->iqn++;
297     idata->mv.buffer_elem_num = FALSE;
298     idata->mv.buffer_offset = 0;
299 
300     return FALSE;
301   }
302 
303   if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
304 
305   /* constructing sql query */
306   ptr_where = where_clause;
307   ptr_values = values_clause;
308   ptr_set = set_clause;
309   memset(where_clause, 0, sizeof(where_clause));
310   memset(values_clause, 0, sizeof(values_clause));
311   memset(set_clause, 0, sizeof(set_clause));
312   memset(insert_full_clause, 0, sizeof(insert_full_clause));
313 
314   for (num = 0; num < idata->num_primitives; num++)
315     (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
316 
317   if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
318     for (num_set = 0; set_event[num_set].type; num_set++)
319       (*set_event[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
320   }
321   else {
322     for (num_set = 0; set[num_set].type; num_set++)
323       (*set[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
324   }
325 
326   /* sending UPDATE query a) if not switched off and
327      b) if we actually have something to update */
328   if (!config.sql_dont_try_update && num_set) {
329     strncpy(sql_data, update_clause, SPACELEFT(sql_data));
330     strncat(sql_data, set_clause, SPACELEFT(sql_data));
331     strncat(sql_data, where_clause, SPACELEFT(sql_data));
332 
333     ret = sqlite3_exec(db->desc, sql_data, NULL, NULL, NULL);
334     if (ret) goto signal_error;
335   }
336 
337   if (config.sql_dont_try_update || !num_set || (sqlite3_changes(db->desc) == 0)) {
338     /* UPDATE failed, trying with an INSERT query */
339     if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
340       strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
341       strncat(insert_full_clause, insert_nocounters_clause, SPACELEFT(insert_full_clause));
342       strncat(ptr_values, ")", SPACELEFT(values_clause));
343     }
344     else {
345       strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
346       strncat(insert_full_clause, insert_counters_clause, SPACELEFT(insert_full_clause));
347       if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter, cache_elem->flows_counter);
348       else snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter);
349     }
350 
351     strncpy(sql_data, insert_full_clause, sizeof(sql_data));
352     strncat(sql_data, values_clause, SPACELEFT(sql_data));
353 
354     if (config.sql_multi_values) {
355       multi_values_handling:
356       len = config.sql_multi_values-idata->mv.buffer_offset;
357       if (strlen(values_clause) < len) {
358 	if (idata->mv.buffer_elem_num) {
359 	  strcpy(multi_values_buffer+idata->mv.buffer_offset, "; ");
360 	  idata->mv.buffer_offset++;
361 	  idata->mv.buffer_offset++;
362 	}
363 	ptr_mv = multi_values_buffer+idata->mv.buffer_offset;
364 	strcpy(multi_values_buffer+idata->mv.buffer_offset, sql_data);
365 	idata->mv.buffer_offset += strlen(ptr_mv);
366         idata->mv.buffer_elem_num++;
367       }
368       else {
369 	if (idata->mv.buffer_elem_num) {
370 	  ret = sqlite3_exec(db->desc, multi_values_buffer, NULL, NULL, NULL);
371 	  Log(LOG_DEBUG, "DEBUG ( %s/%s ): %d INSERT statements sent to the SQLite database.\n",
372 		config.name, config.type, idata->mv.buffer_elem_num);
373 	  if (ret) goto signal_error;
374 	  idata->iqn++;
375 	  idata->mv.buffer_elem_num = FALSE;
376 	  idata->mv.head_buffer_elem = FALSE;
377 	  idata->mv.buffer_offset = 0;
378 	  goto multi_values_handling;
379 	}
380 	else {
381 	  Log(LOG_ERR, "ERROR ( %s/%s ): 'sql_multi_values' is too small (%d). Try with a larger value.\n",
382 	  config.name, config.type, config.sql_multi_values);
383 	  exit_gracefully(1);
384 	}
385       }
386     }
387     else {
388       ret = sqlite3_exec(db->desc, sql_data, NULL, NULL, NULL);
389       Log(LOG_DEBUG, "( %s/%s ): %s\n\n", config.name, config.type, sql_data);
390       if (ret) goto signal_error;
391       idata->iqn++;
392     }
393   }
394   else {
395     Log(LOG_DEBUG, "( %s/%s ): %s\n\n", config.name, config.type, sql_data);
396     idata->uqn++;
397   }
398 
399   idata->een++;
400   // cache_elem->valid = FALSE; /* committed */
401 
402   return ret;
403 
404   signal_error:
405   if (!idata->mv.buffer_elem_num) Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
406   else {
407     if (!idata->recover || db->type != BE_TYPE_PRIMARY) {
408       /* DB failure: we will rewind the multi-values buffer */
409       idata->current_queue_elem = idata->mv.head_buffer_elem;
410       idata->mv.buffer_elem_num = 0;
411     }
412   }
413   SQLI_get_errmsg(db);
414   if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
415 
416   return ret;
417 }
418 
SQLI_cache_purge(struct db_cache * queue[],int index,struct insert_data * idata)419 void SQLI_cache_purge(struct db_cache *queue[], int index, struct insert_data *idata)
420 {
421   struct db_cache *LastElemCommitted = NULL;
422   time_t start;
423   int j, stop, go_to_pending, saved_index = index;
424   char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
425   char tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
426   struct primitives_ptrs prim_ptrs;
427   struct pkt_data dummy_data;
428   pid_t writer_pid = getpid();
429 
430   if (!index) {
431     Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
432     Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: 0/0, ET: X) ***\n", config.name, config.type, writer_pid);
433     return;
434   }
435 
436   memset(&prim_ptrs, 0, sizeof(prim_ptrs));
437   memset(&dummy_data, 0, sizeof(dummy_data));
438 
439   for (j = 0, stop = 0; (!stop) && sql_preprocess_funcs[j]; j++)
440     stop = sql_preprocess_funcs[j](queue, &index, j);
441 
442   if ((config.what_to_count & COUNT_CLASS) || (config.what_to_count_2 & COUNT_NDPI_CLASS))
443     sql_invalidate_shadow_entries(queue, &index);
444 
445   idata->ten = index;
446 
447   Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
448   start = time(NULL);
449 
450   /* re-using pending queries queue stuff from parent and saving clauses */
451   memcpy(sql_pending_queries_queue, queue, index*sizeof(struct db_cache *));
452   pqq_ptr = index;
453 
454   strlcpy(orig_insert_clause, insert_clause, LONGSRVBUFLEN);
455   strlcpy(orig_update_clause, update_clause, LONGSRVBUFLEN);
456   strlcpy(orig_lock_clause, lock_clause, LONGSRVBUFLEN);
457 
458   start:
459   memset(&idata->mv, 0, sizeof(struct multi_values));
460   memcpy(queue, sql_pending_queries_queue, pqq_ptr*sizeof(struct db_cache *));
461   memset(sql_pending_queries_queue, 0, pqq_ptr*sizeof(struct db_cache *));
462   index = pqq_ptr; pqq_ptr = 0;
463 
464   /* We check for variable substitution in SQL table */
465   if (idata->dyn_table) {
466     time_t stamp = 0;
467 
468     memset(tmpbuf, 0, LONGLONGSRVBUFLEN);
469     stamp = queue[0]->basetime;
470 
471     prim_ptrs.data = &dummy_data;
472     primptrs_set_all_from_db_cache(&prim_ptrs, queue[0]);
473 
474     strlcpy(idata->dyn_table_name, config.sql_table, SRVBUFLEN);
475     strlcpy(insert_clause, orig_insert_clause, LONGSRVBUFLEN);
476     strlcpy(update_clause, orig_update_clause, LONGSRVBUFLEN);
477     strlcpy(lock_clause, orig_lock_clause, LONGSRVBUFLEN);
478 
479     handle_dynname_internal_strings_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
480     handle_dynname_internal_strings_same(update_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
481     handle_dynname_internal_strings_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
482     handle_dynname_internal_strings_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
483 
484     pm_strftime_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
485     pm_strftime_same(update_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
486     pm_strftime_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
487     pm_strftime_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
488   }
489 
490   if (config.sql_table_schema) sql_create_table(bed.p, &queue[0]->basetime, &prim_ptrs);
491 
492   (*sqlfunc_cbr.lock)(bed.p);
493 
494   for (idata->current_queue_elem = 0; idata->current_queue_elem < index; idata->current_queue_elem++) {
495     go_to_pending = FALSE;
496 
497     if (idata->dyn_table && (!idata->dyn_table_time_only || !config.nfacctd_time_new || (config.sql_refresh_time != idata->timeslot))) {
498       time_t stamp = 0;
499 
500       memset(tmpbuf, 0, LONGLONGSRVBUFLEN); // XXX: pedantic?
501       stamp = queue[idata->current_queue_elem]->basetime;
502       strlcpy(tmptable, config.sql_table, SRVBUFLEN);
503 
504       prim_ptrs.data = &dummy_data;
505       primptrs_set_all_from_db_cache(&prim_ptrs, queue[idata->current_queue_elem]);
506       handle_dynname_internal_strings_same(tmptable, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
507       pm_strftime_same(tmptable, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
508 
509       if (strncmp(idata->dyn_table_name, tmptable, SRVBUFLEN)) {
510         sql_pending_queries_queue[pqq_ptr] = queue[idata->current_queue_elem];
511 
512         pqq_ptr++;
513         go_to_pending = TRUE;
514       }
515     }
516 
517     if (!go_to_pending) {
518       if (queue[idata->current_queue_elem]->valid)
519         sql_query(&bed, queue[idata->current_queue_elem], idata);
520       if (queue[idata->current_queue_elem]->valid == SQL_CACHE_COMMITTED)
521         LastElemCommitted = queue[idata->current_queue_elem];
522     }
523   }
524 
525   /* multi-value INSERT query: wrap-up */
526   if (idata->mv.buffer_elem_num) {
527     idata->mv.last_queue_elem = TRUE;
528     sql_query(&bed, LastElemCommitted, idata);
529     idata->qn--; /* increased by sql_query() one time too much */
530   }
531 
532   /* rewinding stuff */
533   (*sqlfunc_cbr.unlock)(&bed);
534   if (b.fail) Log(LOG_ALERT, "ALERT ( %s/%s ): recovery for SQLite3 daemon failed.\n", config.name, config.type);
535 
536   /* If we have pending queries then start again */
537   if (pqq_ptr) goto start;
538 
539   idata->elap_time = time(NULL)-start;
540   Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %zu) ***\n",
541 		config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);
542 
543   if (config.sql_trigger_exec) {
544     if (queue && queue[0]) idata->basetime = queue[0]->basetime;
545     idata->elap_time = time(NULL)-start;
546     SQL_SetENV_child(idata);
547   }
548 }
549 
SQLI_evaluate_history(int primitive)550 int SQLI_evaluate_history(int primitive)
551 {
552   if (config.sql_history) {
553     if (primitive) {
554       strncat(insert_clause, ", ", SPACELEFT(insert_clause));
555       strncat(values[primitive].string, ", ", sizeof(values[primitive].string));
556       strncat(where[primitive].string, " AND ", sizeof(where[primitive].string));
557     }
558 
559     if (!config.timestamps_since_epoch) {
560       if (!config.timestamps_utc)
561         strncat(where[primitive].string, "DATETIME(%u, 'unixepoch', 'localtime') = ", SPACELEFT(where[primitive].string));
562       else
563         strncat(where[primitive].string, "DATETIME(%u, 'unixepoch') = ", SPACELEFT(where[primitive].string));
564     }
565     else strncat(where[primitive].string, "%u = ", SPACELEFT(where[primitive].string));
566 
567     strncat(where[primitive].string, "stamp_inserted", SPACELEFT(where[primitive].string));
568     strncat(insert_clause, "stamp_updated, stamp_inserted", SPACELEFT(insert_clause));
569 
570     if (!config.timestamps_since_epoch) {
571       if (!config.timestamps_utc) {
572         strncat(values[primitive].string,
573 		"DATETIME(%u, 'unixepoch', 'localtime'), DATETIME(%u, 'unixepoch', 'localtime')",
574 		SPACELEFT(values[primitive].string));
575       }
576       else strncat(values[primitive].string, "DATETIME(%u, 'unixepoch'), DATETIME(%u, 'unixepoch')", SPACELEFT(values[primitive].string));
577     }
578     else {
579       strncat(values[primitive].string, "%u, %u", SPACELEFT(values[primitive].string));
580     }
581 
582     where[primitive].type = values[primitive].type = TIMESTAMP;
583     values[primitive].handler = where[primitive].handler = count_timestamp_handler;
584     primitive++;
585   }
586 
587   return primitive;
588 }
589 
SQLI_compose_static_queries()590 int SQLI_compose_static_queries()
591 {
592   int primitives=0, set_primitives=0, set_event_primitives=0, have_flows=0;
593 
594   if (config.what_to_count & COUNT_FLOWS || (config.sql_table_version >= 4 &&
595                                              config.sql_table_version < SQL_TABLE_VERSION_BGP &&
596                                              !config.sql_optimize_clauses)) {
597     config.what_to_count |= COUNT_FLOWS;
598     have_flows = TRUE;
599 
600     if ((config.sql_table_version < 4 || config.sql_table_version >= SQL_TABLE_VERSION_BGP) && !config.sql_optimize_clauses) {
601       Log(LOG_ERR, "ERROR ( %s/%s ): The accounting of flows requires SQL table v4. Exiting.\n", config.name, config.type);
602       exit_gracefully(1);
603     }
604   }
605 
606   /* "INSERT INTO ... VALUES ... " and "... WHERE ..." stuff */
607   strncpy(where[primitives].string, " WHERE ", sizeof(where[primitives].string));
608   snprintf(insert_clause, sizeof(insert_clause), "INSERT INTO %s (", config.sql_table);
609   strncpy(values[primitives].string, " VALUES (", sizeof(values[primitives].string));
610   primitives = SQLI_evaluate_history(primitives);
611   primitives = sql_evaluate_primitives(primitives);
612 
613   strncpy(insert_counters_clause, ", packets, bytes", SPACELEFT(insert_counters_clause));
614   if (have_flows) strncat(insert_counters_clause, ", flows", SPACELEFT(insert_counters_clause));
615   strncat(insert_counters_clause, ")", SPACELEFT(insert_counters_clause));
616   strncpy(insert_nocounters_clause, ")", SPACELEFT(insert_nocounters_clause));
617 
618   /* "LOCK ..." stuff */
619   if (config.sql_locking_style) Log(LOG_WARNING, "WARN ( %s/%s ): sql_locking_style is not supported. Ignored.\n", config.name, config.type);
620   snprintf(lock_clause, sizeof(lock_clause), "BEGIN");
621   strncpy(unlock_clause, "COMMIT", sizeof(unlock_clause));
622 
623   /* "UPDATE ... SET ..." stuff */
624   snprintf(update_clause, sizeof(update_clause), "UPDATE %s ", config.sql_table);
625 
626   set_primitives = sql_compose_static_set(have_flows);
627   set_event_primitives = sql_compose_static_set_event();
628 
629   if (config.sql_history) {
630     if (!config.timestamps_since_epoch) {
631       strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
632 
633       if (!config.timestamps_utc) {
634 	strncat(set[set_primitives].string,
635 		"stamp_updated=DATETIME('now', 'localtime')",
636 		SPACELEFT(set[set_primitives].string));
637       }
638       else strncat(set[set_primitives].string, "stamp_updated=DATETIME('now')", SPACELEFT(set[set_primitives].string));
639 
640       set[set_primitives].type = TIMESTAMP;
641       set[set_primitives].handler = count_noop_setclause_handler;
642       set_primitives++;
643 
644       if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
645       else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
646 
647       if (!config.timestamps_utc) {
648 	strncat(set_event[set_event_primitives].string,
649 		"stamp_updated=DATETIME('now', 'localtime')",
650 		SPACELEFT(set_event[set_event_primitives].string));
651       }
652       else strncat(set_event[set_event_primitives].string, "stamp_updated=DATETIME('now')", SPACELEFT(set_event[set_event_primitives].string));
653 
654       set_event[set_event_primitives].type = TIMESTAMP;
655       set_event[set_event_primitives].handler = count_noop_setclause_event_handler;
656       set_event_primitives++;
657     }
658     else {
659       strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
660       strncat(set[set_primitives].string, "stamp_updated=STRFTIME('%%s', 'now')", SPACELEFT(set[set_primitives].string));
661       set[set_primitives].type = TIMESTAMP;
662       set[set_primitives].handler = count_noop_setclause_handler;
663       set_primitives++;
664 
665       if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
666       else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
667       strncat(set_event[set_event_primitives].string, "stamp_updated=STRFTIME('%%s', 'now')", SPACELEFT(set_event[set_event_primitives].string));
668       set_event[set_event_primitives].type = TIMESTAMP;
669       set_event[set_event_primitives].handler = count_noop_setclause_event_handler;
670       set_event_primitives++;
671     }
672   }
673 
674   return primitives;
675 }
676 
SQLI_Lock(struct DBdesc * db)677 void SQLI_Lock(struct DBdesc *db)
678 {
679   if (!db->fail) {
680     if (sqlite3_exec(db->desc, lock_clause, NULL, NULL, NULL)) {
681       SQLI_get_errmsg(db);
682       sql_db_errmsg(db);
683       sql_db_fail(db);
684     }
685   }
686 }
687 
SQLI_Unlock(struct BE_descs * bed)688 void SQLI_Unlock(struct BE_descs *bed)
689 {
690   if (bed->p->connected) sqlite3_exec(bed->p->desc, unlock_clause, NULL, NULL, NULL);
691   if (bed->b->connected) sqlite3_exec(bed->b->desc, unlock_clause, NULL, NULL, NULL);
692 }
693 
SQLI_DB_Connect(struct DBdesc * db,char * host)694 void SQLI_DB_Connect(struct DBdesc *db, char *host)
695 {
696   if (!db->fail) {
697     if (sqlite3_open(db->filename, (sqlite3 **)&db->desc)) {
698       sql_db_fail(db);
699       SQLI_get_errmsg(db);
700       sql_db_errmsg(db);
701     }
702     else sql_db_ok(db);
703   }
704 }
705 
SQLI_DB_Close(struct BE_descs * bed)706 void SQLI_DB_Close(struct BE_descs *bed)
707 {
708   if (bed->p->connected) sqlite3_close(bed->p->desc);
709   if (bed->b->connected) sqlite3_close(bed->b->desc);
710 }
711 
SQLI_create_dyn_table(struct DBdesc * db,char * buf)712 void SQLI_create_dyn_table(struct DBdesc *db, char *buf)
713 {
714   if (!db->fail) {
715     if (sqlite3_exec(db->desc, buf, NULL, NULL, NULL)) {
716       Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, buf);
717       SQLI_get_errmsg(db);
718       sql_db_warnmsg(db);
719     }
720   }
721 }
722 
SQLI_get_errmsg(struct DBdesc * db)723 void SQLI_get_errmsg(struct DBdesc *db)
724 {
725   db->errmsg = (char *) sqlite3_errmsg(db->desc);
726 }
727 
SQLI_create_backend(struct DBdesc * db)728 void SQLI_create_backend(struct DBdesc *db)
729 {
730   if (db->type == BE_TYPE_PRIMARY) db->filename = config.sql_db;
731   if (db->type == BE_TYPE_BACKUP) db->filename = config.sql_backup_host;
732 }
733 
SQLI_set_callbacks(struct sqlfunc_cb_registry * cbr)734 void SQLI_set_callbacks(struct sqlfunc_cb_registry *cbr)
735 {
736   memset(cbr, 0, sizeof(struct sqlfunc_cb_registry));
737 
738   cbr->connect = SQLI_DB_Connect;
739   cbr->close = SQLI_DB_Close;
740   cbr->lock = SQLI_Lock;
741   cbr->unlock = SQLI_Unlock;
742   cbr->op = SQLI_cache_dbop;
743   cbr->create_table = SQLI_create_dyn_table;
744   cbr->purge = SQLI_cache_purge;
745   cbr->create_backend = SQLI_create_backend;
746 }
747 
SQLI_init_default_values(struct insert_data * idata)748 void SQLI_init_default_values(struct insert_data *idata)
749 {
750   /* Linking database parameters */
751   if (!config.sql_db) config.sql_db = sqlite3_db;
752   if (!config.sql_table) {
753     if (config.sql_table_version == (SQL_TABLE_VERSION_BGP+1)) config.sql_table = sqlite3_table_bgp;
754     else if (config.sql_table_version == 8) config.sql_table = sqlite3_table_v8;
755     else if (config.sql_table_version == 7) config.sql_table = sqlite3_table_v7;
756     else if (config.sql_table_version == 6) config.sql_table = sqlite3_table_v6;
757     else if (config.sql_table_version == 5) config.sql_table = sqlite3_table_v5;
758     else if (config.sql_table_version == 4) config.sql_table = sqlite3_table_v4;
759     else if (config.sql_table_version == 3) config.sql_table = sqlite3_table_v3;
760     else if (config.sql_table_version == 2) config.sql_table = sqlite3_table_v2;
761     else config.sql_table = sqlite3_table;
762   }
763   if (strchr(config.sql_table, '%') || strchr(config.sql_table, '$')) {
764     idata->dyn_table = TRUE;
765     if (!strchr(config.sql_table, '$')) idata->dyn_table_time_only = TRUE;
766   }
767   glob_dyn_table = idata->dyn_table;
768   glob_dyn_table_time_only = idata->dyn_table_time_only;
769 
770   if (config.sql_backup_host) idata->recover = TRUE;
771 
772   if (config.sql_multi_values) {
773     multi_values_buffer = malloc(config.sql_multi_values);
774     if (!multi_values_buffer) {
775       Log(LOG_ERR, "ERROR ( %s/%s ): Unable to get enough room (%d) for multi value queries.\n",
776 		config.name, config.type, config.sql_multi_values);
777       config.sql_multi_values = FALSE;
778     }
779     memset(multi_values_buffer, 0, config.sql_multi_values);
780   }
781 
782   if (config.sql_locking_style) idata->locks = sql_select_locking_style(config.sql_locking_style);
783 }
784 
SQLI_sqlite3_get_version()785 void SQLI_sqlite3_get_version()
786 {
787   printf("sqlite3 %s\n", sqlite3_libversion());
788 }
789