1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_hooks.h"
26 #include "sql_common.h"
27 #include "sql_common_m.h"
28 #include "sqlite3_plugin.h"
29
30 char sqlite3_db[] = "/tmp/pmacct.db";
31 char sqlite3_table[] = "acct";
32 char sqlite3_table_v2[] = "acct_v2";
33 char sqlite3_table_v3[] = "acct_v3";
34 char sqlite3_table_v4[] = "acct_v4";
35 char sqlite3_table_v5[] = "acct_v5";
36 char sqlite3_table_v6[] = "acct_v6";
37 char sqlite3_table_v7[] = "acct_v7";
38 char sqlite3_table_v8[] = "acct_v8";
39 char sqlite3_table_bgp[] = "acct_bgp";
40
41 /* Functions */
sqlite3_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)42 void sqlite3_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
43 {
44 struct pkt_data *data;
45 struct ports_table pt;
46 struct pollfd pfd;
47 struct insert_data idata;
48 time_t refresh_deadline;
49 int refresh_timeout;
50 int ret, num, recv_budget, poll_bypass;
51 struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
52 struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
53 int datasize = ((struct channels_list_entry *)ptr)->datasize;
54 u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
55 pid_t core_pid = ((struct channels_list_entry *)ptr)->core_pid;
56 struct networks_file_data nfd;
57 unsigned char *dataptr;
58
59 unsigned char *rgptr;
60 int pollagain = TRUE;
61 u_int32_t seq = 1, rg_err_count = 0;
62
63 struct extra_primitives extras;
64 struct primitives_ptrs prim_ptrs;
65
66 #ifdef WITH_ZMQ
67 struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
68 #else
69 void *zmq_host = NULL;
70 #endif
71
72 #ifdef WITH_REDIS
73 struct p_redis_host redis_host;
74 #endif
75
76 memcpy(&config, cfgptr, sizeof(struct configuration));
77 memcpy(&extras, &((struct channels_list_entry *)ptr)->extras, sizeof(struct extra_primitives));
78 recollect_pipe_memory(ptr);
79 pm_setproctitle("%s [%s]", "SQLite3 Plugin", config.name);
80
81 memset(&idata, 0, sizeof(idata));
82 if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
83 if (config.logfile) {
84 fclose(config.logfile_fd);
85 config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
86 }
87
88 sql_set_signals();
89 sql_init_default_values(&extras);
90 SQLI_init_default_values(&idata);
91 SQLI_set_callbacks(&sqlfunc_cbr);
92 sql_set_insert_func();
93
94 /* some LOCAL initialization AFTER setting some default values */
95 reload_map = FALSE;
96 idata.now = time(NULL);
97 refresh_deadline = idata.now;
98 idata.cfg = &config;
99
100 sql_init_maps(&extras, &prim_ptrs, &nt, &nc, &pt);
101 sql_init_global_buffers();
102 sql_init_historical_acct(idata.now, &idata);
103 sql_init_triggers(idata.now, &idata);
104 sql_init_refresh_deadline(&refresh_deadline);
105
106 if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
107 else setnonblocking(pipe_fd);
108
109 /* setting number of entries in _protocols structure */
110 while (_protocols[protocols_number].number != -1) protocols_number++;
111
112 /* building up static SQL clauses */
113 idata.num_primitives = SQLI_compose_static_queries();
114 glob_num_primitives = idata.num_primitives;
115
116 /* setting up environment variables */
117 SQL_SetENV();
118
119 sql_link_backend_descriptors(&bed, &p, &b);
120
121 #ifdef WITH_REDIS
122 if (config.redis_host) {
123 char log_id[SHORTBUFLEN];
124
125 snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
126 p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
127 }
128 #endif
129
130 /* plugin main loop */
131 for(;;) {
132 poll_again:
133 status->wakeup = TRUE;
134 poll_bypass = FALSE;
135 calc_refresh_timeout(refresh_deadline, idata.now, &refresh_timeout);
136
137 pfd.fd = pipe_fd;
138 pfd.events = POLLIN;
139 ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
140
141 if (ret <= 0) {
142 if (getppid() != core_pid) {
143 Log(LOG_ERR, "ERROR ( %s/%s ): Core process *seems* gone. Exiting.\n", config.name, config.type);
144 exit_gracefully(1);
145 }
146
147 if (ret < 0) goto poll_again;
148 }
149
150 poll_ops:
151 idata.now = time(NULL);
152
153 if (config.sql_history) {
154 while (idata.now > (idata.basetime + idata.timeslot)) {
155 time_t saved_basetime = idata.basetime;
156
157 idata.basetime += idata.timeslot;
158 if (config.sql_history == COUNT_MONTHLY)
159 idata.timeslot = calc_monthly_timeslot(idata.basetime, config.sql_history_howmany, ADD);
160 glob_basetime = idata.basetime;
161 idata.new_basetime = saved_basetime;
162 glob_new_basetime = saved_basetime;
163 }
164 }
165
166 if (idata.now > refresh_deadline) {
167 if (qq_ptr) sql_cache_flush(sql_queries_queue, qq_ptr, &idata, FALSE);
168 sql_cache_handle_flush_event(&idata, &refresh_deadline, &pt);
169 }
170 else {
171 if (config.sql_trigger_exec) {
172 while (idata.now > idata.triggertime && idata.t_timeslot > 0) {
173 sql_trigger_exec(config.sql_trigger_exec);
174 idata.triggertime += idata.t_timeslot;
175 if (config.sql_trigger_time == COUNT_MONTHLY)
176 idata.t_timeslot = calc_monthly_timeslot(idata.triggertime, config.sql_trigger_time_howmany, ADD);
177 }
178 }
179 }
180
181 recv_budget = 0;
182 if (poll_bypass) {
183 poll_bypass = FALSE;
184 goto read_data;
185 }
186
187 switch (ret) {
188 case 0: /* timeout */
189 break;
190 default: /* we received data */
191 read_data:
192 if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
193 poll_bypass = TRUE;
194 goto poll_ops;
195 }
196
197 if (config.pipe_homegrown) {
198 if (!pollagain) {
199 seq++;
200 seq %= MAX_SEQNUM;
201 if (seq == 0) rg_err_count = FALSE;
202 idata.now = time(NULL);
203 }
204 else {
205 if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
206 exit_gracefully(1); /* we exit silently; something happened at the write end */
207 }
208
209 if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
210
211 if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
212 if (!pollagain) {
213 pollagain = TRUE;
214 goto poll_again;
215 }
216 else {
217 rg_err_count++;
218 if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
219 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
220 config.name, config.type, config.buffer_size, config.pipe_size);
221 Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
222 config.name, config.type);
223 }
224
225 rg->ptr = (rg->base + status->last_buf_off);
226 seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
227 }
228 }
229
230 pollagain = FALSE;
231 memcpy(pipebuf, rg->ptr, bufsz);
232 rg->ptr += bufsz;
233 }
234 #ifdef WITH_ZMQ
235 else if (config.pipe_zmq) {
236 ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
237 if (ret > 0) {
238 if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
239 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
240 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
241 }
242
243 seq = ((struct ch_buf_hdr *)pipebuf)->seq;
244 }
245 else goto poll_again;
246 }
247 #endif
248
249 data = (struct pkt_data *) (pipebuf+sizeof(struct ch_buf_hdr));
250
251 if (config.debug_internal_msg)
252 Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
253 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
254 ((struct ch_buf_hdr *)pipebuf)->num);
255
256 while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
257 for (num = 0; primptrs_funcs[num]; num++)
258 (*primptrs_funcs[num])((u_char *)data, &extras, &prim_ptrs);
259
260 for (num = 0; net_funcs[num]; num++)
261 (*net_funcs[num])(&nt, &nc, &data->primitives, prim_ptrs.pbgp, &nfd);
262
263 if (config.ports_file) {
264 if (!pt.table[data->primitives.src_port]) data->primitives.src_port = 0;
265 if (!pt.table[data->primitives.dst_port]) data->primitives.dst_port = 0;
266 }
267
268 prim_ptrs.data = data;
269 (*insert_func)(&prim_ptrs, &idata);
270
271 ((struct ch_buf_hdr *)pipebuf)->num--;
272 if (((struct ch_buf_hdr *)pipebuf)->num) {
273 dataptr = (unsigned char *) data;
274 if (!prim_ptrs.vlen_next_off) dataptr += datasize;
275 else dataptr += prim_ptrs.vlen_next_off;
276 data = (struct pkt_data *) dataptr;
277 }
278 }
279
280 recv_budget++;
281 goto read_data;
282 }
283 }
284 }
285
SQLI_cache_dbop(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)286 int SQLI_cache_dbop(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
287 {
288 char *ptr_values, *ptr_where, *ptr_mv, *ptr_set;
289 int num=0, num_set=0, ret=0, have_flows=0, len=0;
290
291 if (idata->mv.last_queue_elem) {
292 ret = sqlite3_exec(db->desc, multi_values_buffer, NULL, NULL, NULL);
293 Log(LOG_DEBUG, "DEBUG ( %s/%s ): %d INSERT statements sent to the SQLite database.\n",
294 config.name, config.type, idata->mv.buffer_elem_num);
295 if (ret) goto signal_error;
296 idata->iqn++;
297 idata->mv.buffer_elem_num = FALSE;
298 idata->mv.buffer_offset = 0;
299
300 return FALSE;
301 }
302
303 if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
304
305 /* constructing sql query */
306 ptr_where = where_clause;
307 ptr_values = values_clause;
308 ptr_set = set_clause;
309 memset(where_clause, 0, sizeof(where_clause));
310 memset(values_clause, 0, sizeof(values_clause));
311 memset(set_clause, 0, sizeof(set_clause));
312 memset(insert_full_clause, 0, sizeof(insert_full_clause));
313
314 for (num = 0; num < idata->num_primitives; num++)
315 (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
316
317 if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
318 for (num_set = 0; set_event[num_set].type; num_set++)
319 (*set_event[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
320 }
321 else {
322 for (num_set = 0; set[num_set].type; num_set++)
323 (*set[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
324 }
325
326 /* sending UPDATE query a) if not switched off and
327 b) if we actually have something to update */
328 if (!config.sql_dont_try_update && num_set) {
329 strncpy(sql_data, update_clause, SPACELEFT(sql_data));
330 strncat(sql_data, set_clause, SPACELEFT(sql_data));
331 strncat(sql_data, where_clause, SPACELEFT(sql_data));
332
333 ret = sqlite3_exec(db->desc, sql_data, NULL, NULL, NULL);
334 if (ret) goto signal_error;
335 }
336
337 if (config.sql_dont_try_update || !num_set || (sqlite3_changes(db->desc) == 0)) {
338 /* UPDATE failed, trying with an INSERT query */
339 if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
340 strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
341 strncat(insert_full_clause, insert_nocounters_clause, SPACELEFT(insert_full_clause));
342 strncat(ptr_values, ")", SPACELEFT(values_clause));
343 }
344 else {
345 strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
346 strncat(insert_full_clause, insert_counters_clause, SPACELEFT(insert_full_clause));
347 if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter, cache_elem->flows_counter);
348 else snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter);
349 }
350
351 strncpy(sql_data, insert_full_clause, sizeof(sql_data));
352 strncat(sql_data, values_clause, SPACELEFT(sql_data));
353
354 if (config.sql_multi_values) {
355 multi_values_handling:
356 len = config.sql_multi_values-idata->mv.buffer_offset;
357 if (strlen(values_clause) < len) {
358 if (idata->mv.buffer_elem_num) {
359 strcpy(multi_values_buffer+idata->mv.buffer_offset, "; ");
360 idata->mv.buffer_offset++;
361 idata->mv.buffer_offset++;
362 }
363 ptr_mv = multi_values_buffer+idata->mv.buffer_offset;
364 strcpy(multi_values_buffer+idata->mv.buffer_offset, sql_data);
365 idata->mv.buffer_offset += strlen(ptr_mv);
366 idata->mv.buffer_elem_num++;
367 }
368 else {
369 if (idata->mv.buffer_elem_num) {
370 ret = sqlite3_exec(db->desc, multi_values_buffer, NULL, NULL, NULL);
371 Log(LOG_DEBUG, "DEBUG ( %s/%s ): %d INSERT statements sent to the SQLite database.\n",
372 config.name, config.type, idata->mv.buffer_elem_num);
373 if (ret) goto signal_error;
374 idata->iqn++;
375 idata->mv.buffer_elem_num = FALSE;
376 idata->mv.head_buffer_elem = FALSE;
377 idata->mv.buffer_offset = 0;
378 goto multi_values_handling;
379 }
380 else {
381 Log(LOG_ERR, "ERROR ( %s/%s ): 'sql_multi_values' is too small (%d). Try with a larger value.\n",
382 config.name, config.type, config.sql_multi_values);
383 exit_gracefully(1);
384 }
385 }
386 }
387 else {
388 ret = sqlite3_exec(db->desc, sql_data, NULL, NULL, NULL);
389 Log(LOG_DEBUG, "( %s/%s ): %s\n\n", config.name, config.type, sql_data);
390 if (ret) goto signal_error;
391 idata->iqn++;
392 }
393 }
394 else {
395 Log(LOG_DEBUG, "( %s/%s ): %s\n\n", config.name, config.type, sql_data);
396 idata->uqn++;
397 }
398
399 idata->een++;
400 // cache_elem->valid = FALSE; /* committed */
401
402 return ret;
403
404 signal_error:
405 if (!idata->mv.buffer_elem_num) Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
406 else {
407 if (!idata->recover || db->type != BE_TYPE_PRIMARY) {
408 /* DB failure: we will rewind the multi-values buffer */
409 idata->current_queue_elem = idata->mv.head_buffer_elem;
410 idata->mv.buffer_elem_num = 0;
411 }
412 }
413 SQLI_get_errmsg(db);
414 if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
415
416 return ret;
417 }
418
SQLI_cache_purge(struct db_cache * queue[],int index,struct insert_data * idata)419 void SQLI_cache_purge(struct db_cache *queue[], int index, struct insert_data *idata)
420 {
421 struct db_cache *LastElemCommitted = NULL;
422 time_t start;
423 int j, stop, go_to_pending, saved_index = index;
424 char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
425 char tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
426 struct primitives_ptrs prim_ptrs;
427 struct pkt_data dummy_data;
428 pid_t writer_pid = getpid();
429
430 if (!index) {
431 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
432 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: 0/0, ET: X) ***\n", config.name, config.type, writer_pid);
433 return;
434 }
435
436 memset(&prim_ptrs, 0, sizeof(prim_ptrs));
437 memset(&dummy_data, 0, sizeof(dummy_data));
438
439 for (j = 0, stop = 0; (!stop) && sql_preprocess_funcs[j]; j++)
440 stop = sql_preprocess_funcs[j](queue, &index, j);
441
442 if ((config.what_to_count & COUNT_CLASS) || (config.what_to_count_2 & COUNT_NDPI_CLASS))
443 sql_invalidate_shadow_entries(queue, &index);
444
445 idata->ten = index;
446
447 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
448 start = time(NULL);
449
450 /* re-using pending queries queue stuff from parent and saving clauses */
451 memcpy(sql_pending_queries_queue, queue, index*sizeof(struct db_cache *));
452 pqq_ptr = index;
453
454 strlcpy(orig_insert_clause, insert_clause, LONGSRVBUFLEN);
455 strlcpy(orig_update_clause, update_clause, LONGSRVBUFLEN);
456 strlcpy(orig_lock_clause, lock_clause, LONGSRVBUFLEN);
457
458 start:
459 memset(&idata->mv, 0, sizeof(struct multi_values));
460 memcpy(queue, sql_pending_queries_queue, pqq_ptr*sizeof(struct db_cache *));
461 memset(sql_pending_queries_queue, 0, pqq_ptr*sizeof(struct db_cache *));
462 index = pqq_ptr; pqq_ptr = 0;
463
464 /* We check for variable substitution in SQL table */
465 if (idata->dyn_table) {
466 time_t stamp = 0;
467
468 memset(tmpbuf, 0, LONGLONGSRVBUFLEN);
469 stamp = queue[0]->basetime;
470
471 prim_ptrs.data = &dummy_data;
472 primptrs_set_all_from_db_cache(&prim_ptrs, queue[0]);
473
474 strlcpy(idata->dyn_table_name, config.sql_table, SRVBUFLEN);
475 strlcpy(insert_clause, orig_insert_clause, LONGSRVBUFLEN);
476 strlcpy(update_clause, orig_update_clause, LONGSRVBUFLEN);
477 strlcpy(lock_clause, orig_lock_clause, LONGSRVBUFLEN);
478
479 handle_dynname_internal_strings_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
480 handle_dynname_internal_strings_same(update_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
481 handle_dynname_internal_strings_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
482 handle_dynname_internal_strings_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
483
484 pm_strftime_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
485 pm_strftime_same(update_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
486 pm_strftime_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
487 pm_strftime_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
488 }
489
490 if (config.sql_table_schema) sql_create_table(bed.p, &queue[0]->basetime, &prim_ptrs);
491
492 (*sqlfunc_cbr.lock)(bed.p);
493
494 for (idata->current_queue_elem = 0; idata->current_queue_elem < index; idata->current_queue_elem++) {
495 go_to_pending = FALSE;
496
497 if (idata->dyn_table && (!idata->dyn_table_time_only || !config.nfacctd_time_new || (config.sql_refresh_time != idata->timeslot))) {
498 time_t stamp = 0;
499
500 memset(tmpbuf, 0, LONGLONGSRVBUFLEN); // XXX: pedantic?
501 stamp = queue[idata->current_queue_elem]->basetime;
502 strlcpy(tmptable, config.sql_table, SRVBUFLEN);
503
504 prim_ptrs.data = &dummy_data;
505 primptrs_set_all_from_db_cache(&prim_ptrs, queue[idata->current_queue_elem]);
506 handle_dynname_internal_strings_same(tmptable, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
507 pm_strftime_same(tmptable, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
508
509 if (strncmp(idata->dyn_table_name, tmptable, SRVBUFLEN)) {
510 sql_pending_queries_queue[pqq_ptr] = queue[idata->current_queue_elem];
511
512 pqq_ptr++;
513 go_to_pending = TRUE;
514 }
515 }
516
517 if (!go_to_pending) {
518 if (queue[idata->current_queue_elem]->valid)
519 sql_query(&bed, queue[idata->current_queue_elem], idata);
520 if (queue[idata->current_queue_elem]->valid == SQL_CACHE_COMMITTED)
521 LastElemCommitted = queue[idata->current_queue_elem];
522 }
523 }
524
525 /* multi-value INSERT query: wrap-up */
526 if (idata->mv.buffer_elem_num) {
527 idata->mv.last_queue_elem = TRUE;
528 sql_query(&bed, LastElemCommitted, idata);
529 idata->qn--; /* increased by sql_query() one time too much */
530 }
531
532 /* rewinding stuff */
533 (*sqlfunc_cbr.unlock)(&bed);
534 if (b.fail) Log(LOG_ALERT, "ALERT ( %s/%s ): recovery for SQLite3 daemon failed.\n", config.name, config.type);
535
536 /* If we have pending queries then start again */
537 if (pqq_ptr) goto start;
538
539 idata->elap_time = time(NULL)-start;
540 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %zu) ***\n",
541 config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);
542
543 if (config.sql_trigger_exec) {
544 if (queue && queue[0]) idata->basetime = queue[0]->basetime;
545 idata->elap_time = time(NULL)-start;
546 SQL_SetENV_child(idata);
547 }
548 }
549
SQLI_evaluate_history(int primitive)550 int SQLI_evaluate_history(int primitive)
551 {
552 if (config.sql_history) {
553 if (primitive) {
554 strncat(insert_clause, ", ", SPACELEFT(insert_clause));
555 strncat(values[primitive].string, ", ", sizeof(values[primitive].string));
556 strncat(where[primitive].string, " AND ", sizeof(where[primitive].string));
557 }
558
559 if (!config.timestamps_since_epoch) {
560 if (!config.timestamps_utc)
561 strncat(where[primitive].string, "DATETIME(%u, 'unixepoch', 'localtime') = ", SPACELEFT(where[primitive].string));
562 else
563 strncat(where[primitive].string, "DATETIME(%u, 'unixepoch') = ", SPACELEFT(where[primitive].string));
564 }
565 else strncat(where[primitive].string, "%u = ", SPACELEFT(where[primitive].string));
566
567 strncat(where[primitive].string, "stamp_inserted", SPACELEFT(where[primitive].string));
568 strncat(insert_clause, "stamp_updated, stamp_inserted", SPACELEFT(insert_clause));
569
570 if (!config.timestamps_since_epoch) {
571 if (!config.timestamps_utc) {
572 strncat(values[primitive].string,
573 "DATETIME(%u, 'unixepoch', 'localtime'), DATETIME(%u, 'unixepoch', 'localtime')",
574 SPACELEFT(values[primitive].string));
575 }
576 else strncat(values[primitive].string, "DATETIME(%u, 'unixepoch'), DATETIME(%u, 'unixepoch')", SPACELEFT(values[primitive].string));
577 }
578 else {
579 strncat(values[primitive].string, "%u, %u", SPACELEFT(values[primitive].string));
580 }
581
582 where[primitive].type = values[primitive].type = TIMESTAMP;
583 values[primitive].handler = where[primitive].handler = count_timestamp_handler;
584 primitive++;
585 }
586
587 return primitive;
588 }
589
SQLI_compose_static_queries()590 int SQLI_compose_static_queries()
591 {
592 int primitives=0, set_primitives=0, set_event_primitives=0, have_flows=0;
593
594 if (config.what_to_count & COUNT_FLOWS || (config.sql_table_version >= 4 &&
595 config.sql_table_version < SQL_TABLE_VERSION_BGP &&
596 !config.sql_optimize_clauses)) {
597 config.what_to_count |= COUNT_FLOWS;
598 have_flows = TRUE;
599
600 if ((config.sql_table_version < 4 || config.sql_table_version >= SQL_TABLE_VERSION_BGP) && !config.sql_optimize_clauses) {
601 Log(LOG_ERR, "ERROR ( %s/%s ): The accounting of flows requires SQL table v4. Exiting.\n", config.name, config.type);
602 exit_gracefully(1);
603 }
604 }
605
606 /* "INSERT INTO ... VALUES ... " and "... WHERE ..." stuff */
607 strncpy(where[primitives].string, " WHERE ", sizeof(where[primitives].string));
608 snprintf(insert_clause, sizeof(insert_clause), "INSERT INTO %s (", config.sql_table);
609 strncpy(values[primitives].string, " VALUES (", sizeof(values[primitives].string));
610 primitives = SQLI_evaluate_history(primitives);
611 primitives = sql_evaluate_primitives(primitives);
612
613 strncpy(insert_counters_clause, ", packets, bytes", SPACELEFT(insert_counters_clause));
614 if (have_flows) strncat(insert_counters_clause, ", flows", SPACELEFT(insert_counters_clause));
615 strncat(insert_counters_clause, ")", SPACELEFT(insert_counters_clause));
616 strncpy(insert_nocounters_clause, ")", SPACELEFT(insert_nocounters_clause));
617
618 /* "LOCK ..." stuff */
619 if (config.sql_locking_style) Log(LOG_WARNING, "WARN ( %s/%s ): sql_locking_style is not supported. Ignored.\n", config.name, config.type);
620 snprintf(lock_clause, sizeof(lock_clause), "BEGIN");
621 strncpy(unlock_clause, "COMMIT", sizeof(unlock_clause));
622
623 /* "UPDATE ... SET ..." stuff */
624 snprintf(update_clause, sizeof(update_clause), "UPDATE %s ", config.sql_table);
625
626 set_primitives = sql_compose_static_set(have_flows);
627 set_event_primitives = sql_compose_static_set_event();
628
629 if (config.sql_history) {
630 if (!config.timestamps_since_epoch) {
631 strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
632
633 if (!config.timestamps_utc) {
634 strncat(set[set_primitives].string,
635 "stamp_updated=DATETIME('now', 'localtime')",
636 SPACELEFT(set[set_primitives].string));
637 }
638 else strncat(set[set_primitives].string, "stamp_updated=DATETIME('now')", SPACELEFT(set[set_primitives].string));
639
640 set[set_primitives].type = TIMESTAMP;
641 set[set_primitives].handler = count_noop_setclause_handler;
642 set_primitives++;
643
644 if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
645 else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
646
647 if (!config.timestamps_utc) {
648 strncat(set_event[set_event_primitives].string,
649 "stamp_updated=DATETIME('now', 'localtime')",
650 SPACELEFT(set_event[set_event_primitives].string));
651 }
652 else strncat(set_event[set_event_primitives].string, "stamp_updated=DATETIME('now')", SPACELEFT(set_event[set_event_primitives].string));
653
654 set_event[set_event_primitives].type = TIMESTAMP;
655 set_event[set_event_primitives].handler = count_noop_setclause_event_handler;
656 set_event_primitives++;
657 }
658 else {
659 strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
660 strncat(set[set_primitives].string, "stamp_updated=STRFTIME('%%s', 'now')", SPACELEFT(set[set_primitives].string));
661 set[set_primitives].type = TIMESTAMP;
662 set[set_primitives].handler = count_noop_setclause_handler;
663 set_primitives++;
664
665 if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
666 else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
667 strncat(set_event[set_event_primitives].string, "stamp_updated=STRFTIME('%%s', 'now')", SPACELEFT(set_event[set_event_primitives].string));
668 set_event[set_event_primitives].type = TIMESTAMP;
669 set_event[set_event_primitives].handler = count_noop_setclause_event_handler;
670 set_event_primitives++;
671 }
672 }
673
674 return primitives;
675 }
676
SQLI_Lock(struct DBdesc * db)677 void SQLI_Lock(struct DBdesc *db)
678 {
679 if (!db->fail) {
680 if (sqlite3_exec(db->desc, lock_clause, NULL, NULL, NULL)) {
681 SQLI_get_errmsg(db);
682 sql_db_errmsg(db);
683 sql_db_fail(db);
684 }
685 }
686 }
687
SQLI_Unlock(struct BE_descs * bed)688 void SQLI_Unlock(struct BE_descs *bed)
689 {
690 if (bed->p->connected) sqlite3_exec(bed->p->desc, unlock_clause, NULL, NULL, NULL);
691 if (bed->b->connected) sqlite3_exec(bed->b->desc, unlock_clause, NULL, NULL, NULL);
692 }
693
SQLI_DB_Connect(struct DBdesc * db,char * host)694 void SQLI_DB_Connect(struct DBdesc *db, char *host)
695 {
696 if (!db->fail) {
697 if (sqlite3_open(db->filename, (sqlite3 **)&db->desc)) {
698 sql_db_fail(db);
699 SQLI_get_errmsg(db);
700 sql_db_errmsg(db);
701 }
702 else sql_db_ok(db);
703 }
704 }
705
SQLI_DB_Close(struct BE_descs * bed)706 void SQLI_DB_Close(struct BE_descs *bed)
707 {
708 if (bed->p->connected) sqlite3_close(bed->p->desc);
709 if (bed->b->connected) sqlite3_close(bed->b->desc);
710 }
711
SQLI_create_dyn_table(struct DBdesc * db,char * buf)712 void SQLI_create_dyn_table(struct DBdesc *db, char *buf)
713 {
714 if (!db->fail) {
715 if (sqlite3_exec(db->desc, buf, NULL, NULL, NULL)) {
716 Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, buf);
717 SQLI_get_errmsg(db);
718 sql_db_warnmsg(db);
719 }
720 }
721 }
722
SQLI_get_errmsg(struct DBdesc * db)723 void SQLI_get_errmsg(struct DBdesc *db)
724 {
725 db->errmsg = (char *) sqlite3_errmsg(db->desc);
726 }
727
SQLI_create_backend(struct DBdesc * db)728 void SQLI_create_backend(struct DBdesc *db)
729 {
730 if (db->type == BE_TYPE_PRIMARY) db->filename = config.sql_db;
731 if (db->type == BE_TYPE_BACKUP) db->filename = config.sql_backup_host;
732 }
733
SQLI_set_callbacks(struct sqlfunc_cb_registry * cbr)734 void SQLI_set_callbacks(struct sqlfunc_cb_registry *cbr)
735 {
736 memset(cbr, 0, sizeof(struct sqlfunc_cb_registry));
737
738 cbr->connect = SQLI_DB_Connect;
739 cbr->close = SQLI_DB_Close;
740 cbr->lock = SQLI_Lock;
741 cbr->unlock = SQLI_Unlock;
742 cbr->op = SQLI_cache_dbop;
743 cbr->create_table = SQLI_create_dyn_table;
744 cbr->purge = SQLI_cache_purge;
745 cbr->create_backend = SQLI_create_backend;
746 }
747
SQLI_init_default_values(struct insert_data * idata)748 void SQLI_init_default_values(struct insert_data *idata)
749 {
750 /* Linking database parameters */
751 if (!config.sql_db) config.sql_db = sqlite3_db;
752 if (!config.sql_table) {
753 if (config.sql_table_version == (SQL_TABLE_VERSION_BGP+1)) config.sql_table = sqlite3_table_bgp;
754 else if (config.sql_table_version == 8) config.sql_table = sqlite3_table_v8;
755 else if (config.sql_table_version == 7) config.sql_table = sqlite3_table_v7;
756 else if (config.sql_table_version == 6) config.sql_table = sqlite3_table_v6;
757 else if (config.sql_table_version == 5) config.sql_table = sqlite3_table_v5;
758 else if (config.sql_table_version == 4) config.sql_table = sqlite3_table_v4;
759 else if (config.sql_table_version == 3) config.sql_table = sqlite3_table_v3;
760 else if (config.sql_table_version == 2) config.sql_table = sqlite3_table_v2;
761 else config.sql_table = sqlite3_table;
762 }
763 if (strchr(config.sql_table, '%') || strchr(config.sql_table, '$')) {
764 idata->dyn_table = TRUE;
765 if (!strchr(config.sql_table, '$')) idata->dyn_table_time_only = TRUE;
766 }
767 glob_dyn_table = idata->dyn_table;
768 glob_dyn_table_time_only = idata->dyn_table_time_only;
769
770 if (config.sql_backup_host) idata->recover = TRUE;
771
772 if (config.sql_multi_values) {
773 multi_values_buffer = malloc(config.sql_multi_values);
774 if (!multi_values_buffer) {
775 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to get enough room (%d) for multi value queries.\n",
776 config.name, config.type, config.sql_multi_values);
777 config.sql_multi_values = FALSE;
778 }
779 memset(multi_values_buffer, 0, config.sql_multi_values);
780 }
781
782 if (config.sql_locking_style) idata->locks = sql_select_locking_style(config.sql_locking_style);
783 }
784
SQLI_sqlite3_get_version()785 void SQLI_sqlite3_get_version()
786 {
787 printf("sqlite3 %s\n", sqlite3_libversion());
788 }
789