1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_hooks.h"
26 #include "sql_common.h"
27 #include "sql_common_m.h"
28 #include "pgsql_plugin.h"
29
30 int typed = TRUE;
31
32 char pgsql_user[] = "pmacct";
33 char pgsql_pwd[] = "arealsmartpwd";
34 char pgsql_db[] = "pmacct";
35 char pgsql_table[] = "acct";
36 char pgsql_table_v2[] = "acct_v2";
37 char pgsql_table_v3[] = "acct_v3";
38 char pgsql_table_v4[] = "acct_v4";
39 char pgsql_table_v5[] = "acct_v5";
40 char pgsql_table_v6[] = "acct_v6";
41 char pgsql_table_v7[] = "acct_v7";
42 char pgsql_table_v8[] = "acct_v8";
43 char pgsql_table_bgp[] = "acct_bgp";
44 char pgsql_table_uni[] = "acct_uni";
45 char pgsql_table_uni_v2[] = "acct_uni_v2";
46 char pgsql_table_uni_v3[] = "acct_uni_v3";
47 char pgsql_table_uni_v4[] = "acct_uni_v4";
48 char pgsql_table_uni_v5[] = "acct_uni_v5";
49 char pgsql_table_as[] = "acct_as";
50 char pgsql_table_as_v2[] = "acct_as_v2";
51 char pgsql_table_as_v3[] = "acct_as_v3";
52 char pgsql_table_as_v4[] = "acct_as_v4";
53 char pgsql_table_as_v5[] = "acct_as_v5";
54 char typed_str[] = "typed";
55 char unified_str[] = "unified";
56
57 /* Functions */
pgsql_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)58 void pgsql_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
59 {
60 struct pkt_data *data;
61 struct ports_table pt;
62 struct pollfd pfd;
63 struct insert_data idata;
64 time_t refresh_deadline;
65 int refresh_timeout;
66 int ret, num, recv_budget, poll_bypass;
67 struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
68 struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
69 int datasize = ((struct channels_list_entry *)ptr)->datasize;
70 u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
71 pid_t core_pid = ((struct channels_list_entry *)ptr)->core_pid;
72 struct networks_file_data nfd;
73 unsigned char *dataptr;
74
75 unsigned char *rgptr;
76 int pollagain = TRUE;
77 u_int32_t seq = 1, rg_err_count = 0;
78
79 struct extra_primitives extras;
80 struct primitives_ptrs prim_ptrs;
81
82 #ifdef WITH_ZMQ
83 struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
84 #else
85 void *zmq_host = NULL;
86 #endif
87
88 #ifdef WITH_REDIS
89 struct p_redis_host redis_host;
90 #endif
91
92 memcpy(&config, cfgptr, sizeof(struct configuration));
93 memcpy(&extras, &((struct channels_list_entry *)ptr)->extras, sizeof(struct extra_primitives));
94 recollect_pipe_memory(ptr);
95 pm_setproctitle("%s [%s]", "PostgreSQL Plugin", config.name);
96
97 memset(&idata, 0, sizeof(idata));
98 if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
99 if (config.logfile) {
100 fclose(config.logfile_fd);
101 config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
102 }
103
104 sql_set_signals();
105 sql_init_default_values(&extras);
106 PG_init_default_values(&idata);
107 PG_set_callbacks(&sqlfunc_cbr);
108 sql_set_insert_func();
109
110 /* some LOCAL initialization AFTER setting some default values */
111 reload_map = FALSE;
112 idata.now = time(NULL);
113 refresh_deadline = idata.now;
114 idata.cfg = &config;
115
116 sql_init_maps(&extras, &prim_ptrs, &nt, &nc, &pt);
117 sql_init_global_buffers();
118 sql_init_historical_acct(idata.now, &idata);
119 sql_init_triggers(idata.now, &idata);
120 sql_init_refresh_deadline(&refresh_deadline);
121
122 if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
123 else setnonblocking(pipe_fd);
124
125 /* building up static SQL clauses */
126 idata.num_primitives = PG_compose_static_queries();
127 glob_num_primitives = idata.num_primitives;
128
129 /* setting up environment variables */
130 SQL_SetENV();
131
132 sql_link_backend_descriptors(&bed, &p, &b);
133
134 #ifdef WITH_REDIS
135 if (config.redis_host) {
136 char log_id[SHORTBUFLEN];
137
138 snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
139 p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
140 }
141 #endif
142
143 /* plugin main loop */
144 for(;;) {
145 poll_again:
146 status->wakeup = TRUE;
147 poll_bypass = FALSE;
148 calc_refresh_timeout(refresh_deadline, idata.now, &refresh_timeout);
149
150 pfd.fd = pipe_fd;
151 pfd.events = POLLIN;
152 ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
153
154 if (ret <= 0) {
155 if (getppid() != core_pid) {
156 Log(LOG_ERR, "ERROR ( %s/%s ): Core process *seems* gone. Exiting.\n", config.name, config.type);
157 exit_gracefully(1);
158 }
159
160 if (ret < 0) goto poll_again;
161 }
162
163 poll_ops:
164 idata.now = time(NULL);
165 now = idata.now;
166
167 if (config.sql_history) {
168 while (idata.now > (idata.basetime + idata.timeslot)) {
169 time_t saved_basetime = idata.basetime;
170
171 idata.basetime += idata.timeslot;
172 if (config.sql_history == COUNT_MONTHLY)
173 idata.timeslot = calc_monthly_timeslot(idata.basetime, config.sql_history_howmany, ADD);
174 glob_basetime = idata.basetime;
175 idata.new_basetime = saved_basetime;
176 glob_new_basetime = saved_basetime;
177 }
178 }
179
180 if (idata.now > refresh_deadline) {
181 if (qq_ptr) sql_cache_flush(sql_queries_queue, qq_ptr, &idata, FALSE);
182 sql_cache_handle_flush_event(&idata, &refresh_deadline, &pt);
183 }
184 else {
185 if (config.sql_trigger_exec) {
186 while (idata.now > idata.triggertime && idata.t_timeslot > 0) {
187 sql_trigger_exec(config.sql_trigger_exec);
188 idata.triggertime += idata.t_timeslot;
189 if (config.sql_trigger_time == COUNT_MONTHLY)
190 idata.t_timeslot = calc_monthly_timeslot(idata.triggertime, config.sql_trigger_time_howmany, ADD);
191 }
192 }
193 }
194
195 recv_budget = 0;
196 if (poll_bypass) {
197 poll_bypass = FALSE;
198 goto read_data;
199 }
200
201 switch (ret) {
202 case 0: /* poll(): timeout */
203 break;
204 default: /* poll(): received data */
205 read_data:
206 if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
207 poll_bypass = TRUE;
208 goto poll_ops;
209 }
210
211 if (config.pipe_homegrown) {
212 if (!pollagain) {
213 seq++;
214 seq %= MAX_SEQNUM;
215 if (seq == 0) rg_err_count = FALSE;
216 idata.now = time(NULL);
217 now = idata.now;
218 }
219 else {
220 if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
221 exit_gracefully(1); /* we exit silently; something happened at the write end */
222 }
223
224 if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
225
226 if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
227 if (!pollagain) {
228 pollagain = TRUE;
229 goto poll_again;
230 }
231 else {
232 rg_err_count++;
233 if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
234 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
235 config.name, config.type, config.buffer_size, config.pipe_size);
236 Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
237 config.name, config.type);
238 }
239
240 rg->ptr = (rg->base + status->last_buf_off);
241 seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
242 }
243 }
244
245 pollagain = FALSE;
246 memcpy(pipebuf, rg->ptr, bufsz);
247 rg->ptr += bufsz;
248 }
249 #ifdef WITH_ZMQ
250 else if (config.pipe_zmq) {
251 ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
252 if (ret > 0) {
253 if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
254 Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
255 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
256 }
257
258 seq = ((struct ch_buf_hdr *)pipebuf)->seq;
259 }
260 else goto poll_again;
261 }
262 #endif
263
264 data = (struct pkt_data *) (pipebuf+sizeof(struct ch_buf_hdr));
265
266 if (config.debug_internal_msg)
267 Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
268 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
269 ((struct ch_buf_hdr *)pipebuf)->num);
270
271 while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
272 for (num = 0; primptrs_funcs[num]; num++)
273 (*primptrs_funcs[num])((u_char *)data, &extras, &prim_ptrs);
274
275 for (num = 0; net_funcs[num]; num++)
276 (*net_funcs[num])(&nt, &nc, &data->primitives, prim_ptrs.pbgp, &nfd);
277
278 if (config.ports_file) {
279 if (!pt.table[data->primitives.src_port]) data->primitives.src_port = 0;
280 if (!pt.table[data->primitives.dst_port]) data->primitives.dst_port = 0;
281 }
282
283 prim_ptrs.data = data;
284 (*insert_func)(&prim_ptrs, &idata);
285
286 ((struct ch_buf_hdr *)pipebuf)->num--;
287 if (((struct ch_buf_hdr *)pipebuf)->num) {
288 dataptr = (unsigned char *) data;
289 if (!prim_ptrs.vlen_next_off) dataptr += datasize;
290 else dataptr += prim_ptrs.vlen_next_off;
291 data = (struct pkt_data *) dataptr;
292 }
293 }
294
295 goto read_data;
296 }
297 }
298 }
299
PG_cache_dbop_copy(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)300 int PG_cache_dbop_copy(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
301 {
302 char *ptr_values, *ptr_where;
303 char default_delim[] = ",", delim_buf[SRVBUFLEN];
304 int num=0, have_flows=0;
305
306 if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
307
308 if (!config.sql_delimiter)
309 snprintf(delim_buf, SRVBUFLEN, "%s", default_delim);
310 else
311 snprintf(delim_buf, SRVBUFLEN, "%s", config.sql_delimiter);
312
313 /* constructing SQL query */
314 ptr_where = where_clause;
315 ptr_values = values_clause;
316 memset(where_clause, 0, sizeof(where_clause));
317 memset(values_clause, 0, sizeof(values_clause));
318
319 memcpy(&values, ©_values, sizeof(values));
320 while (num < idata->num_primitives) {
321 (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
322 num++;
323 }
324
325 if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), "%s%" PRIu64 "%s%" PRIu64 "%s%" PRIu64 "\n", delim_buf, cache_elem->packet_counter,
326 delim_buf, cache_elem->bytes_counter,
327 delim_buf, cache_elem->flows_counter);
328 else snprintf(ptr_values, SPACELEFT(values_clause), "%s%" PRIu64 "%s%" PRIu64 "\n", delim_buf, cache_elem->packet_counter,
329 delim_buf, cache_elem->bytes_counter);
330
331 strncpy(sql_data, values_clause, SPACELEFT(sql_data));
332
333 if (PQputCopyData(db->desc, sql_data, strlen(sql_data)) < 0) { // avoid strlen()
334 db->errmsg = PQerrorMessage(db->desc);
335 Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
336 if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n", config.name, config.type, db->errmsg);
337 sql_db_fail(db);
338
339 return TRUE;
340 }
341 idata->iqn++;
342 idata->een++;
343
344 Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n", config.name, config.type, sql_data);
345
346 return FALSE;
347 }
348
PG_cache_dbop(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)349 int PG_cache_dbop(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
350 {
351 PGresult *ret = NULL;
352 char *ptr_values, *ptr_where, *ptr_set;
353 int num=0, num_set=0, have_flows=0;
354
355 if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
356
357 /* constructing SQL query */
358 ptr_where = where_clause;
359 ptr_values = values_clause;
360 ptr_set = set_clause;
361 memset(where_clause, 0, sizeof(where_clause));
362 memset(values_clause, 0, sizeof(values_clause));
363 memset(set_clause, 0, sizeof(set_clause));
364 memset(insert_full_clause, 0, sizeof(insert_full_clause));
365
366 for (num = 0; num < idata->num_primitives; num++)
367 (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
368
369 if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
370 for (num_set = 0; set_event[num_set].type; num_set++)
371 (*set_event[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
372 }
373 else {
374 for (num_set = 0; set[num_set].type; num_set++)
375 (*set[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
376 }
377
378 /* sending UPDATE query a) if not switched off and
379 b) if we actually have something to update */
380 if (!config.sql_dont_try_update && num_set) {
381 strncpy(sql_data, update_clause, SPACELEFT(sql_data));
382 strncat(sql_data, set_clause, SPACELEFT(sql_data));
383 strncat(sql_data, where_clause, SPACELEFT(sql_data));
384
385 ret = PQexec(db->desc, sql_data);
386 if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
387 db->errmsg = PQresultErrorMessage(ret);
388 PQclear(ret);
389 Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
390 if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
391 sql_db_fail(db);
392
393 return TRUE;
394 }
395 PQclear(ret);
396 }
397
398 if (config.sql_dont_try_update || !num_set || (!PG_affected_rows(ret))) {
399 /* UPDATE failed, trying with an INSERT query */
400 if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
401 strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
402 strncat(insert_full_clause, insert_nocounters_clause, SPACELEFT(insert_full_clause));
403 strncat(ptr_values, ")", SPACELEFT(values_clause));
404 }
405 else {
406 strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
407 strncat(insert_full_clause, insert_counters_clause, SPACELEFT(insert_full_clause));
408 if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter, cache_elem->flows_counter);
409 else snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter);
410 }
411 strncpy(sql_data, insert_full_clause, sizeof(sql_data));
412 strncat(sql_data, values_clause, SPACELEFT(sql_data));
413
414 ret = PQexec(db->desc, sql_data);
415 if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
416 db->errmsg = PQresultErrorMessage(ret);
417 PQclear(ret);
418 Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
419 if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
420 sql_db_fail(db);
421
422 return TRUE;
423 }
424 PQclear(ret);
425 idata->iqn++;
426 }
427 else idata->uqn++;
428 idata->een++;
429
430 Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n\n", config.name, config.type, sql_data);
431
432 return FALSE;
433 }
434
PG_cache_purge(struct db_cache * queue[],int index,struct insert_data * idata)435 void PG_cache_purge(struct db_cache *queue[], int index, struct insert_data *idata)
436 {
437 PGresult *ret;
438 struct db_cache **reprocess_queries_queue, **bulk_reprocess_queries_queue;
439 char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
440 char orig_copy_clause[LONGSRVBUFLEN], tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
441 time_t start;
442 int j, r, reprocess = 0, stop, go_to_pending, reprocess_idx, bulk_reprocess_idx, saved_index = index;
443 struct primitives_ptrs prim_ptrs;
444 struct pkt_data dummy_data;
445 pid_t writer_pid = getpid();
446
447 if (!index) {
448 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
449 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: 0/0, ET: X) ***\n", config.name, config.type, writer_pid);
450 return;
451 }
452
453 memset(&prim_ptrs, 0, sizeof(prim_ptrs));
454 memset(&dummy_data, 0, sizeof(dummy_data));
455
456 reprocess_queries_queue = (struct db_cache **) malloc(qq_size*sizeof(struct db_cache *));
457 bulk_reprocess_queries_queue = (struct db_cache **) malloc(qq_size*sizeof(struct db_cache *));
458 if (!reprocess_queries_queue || !bulk_reprocess_queries_queue) {
459 Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (reprocess_queries_queue). Exiting ..\n", config.name, config.type);
460 exit_gracefully(1);
461 }
462
463 for (j = 0, stop = 0; (!stop) && sql_preprocess_funcs[j]; j++)
464 stop = sql_preprocess_funcs[j](queue, &index, j);
465
466 if ((config.what_to_count & COUNT_CLASS) || (config.what_to_count_2 & COUNT_NDPI_CLASS))
467 sql_invalidate_shadow_entries(queue, &index);
468
469 idata->ten = index;
470
471 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
472 start = time(NULL);
473
474 /* re-using pending queries queue stuff from parent and saving clauses */
475 memcpy(sql_pending_queries_queue, queue, index*sizeof(struct db_cache *));
476 pqq_ptr = index;
477
478 strlcpy(orig_copy_clause, copy_clause, LONGSRVBUFLEN);
479 strlcpy(orig_insert_clause, insert_clause, LONGSRVBUFLEN);
480 strlcpy(orig_update_clause, update_clause, LONGSRVBUFLEN);
481 strlcpy(orig_lock_clause, lock_clause, LONGSRVBUFLEN);
482
483 start:
484 memcpy(queue, sql_pending_queries_queue, pqq_ptr*sizeof(struct db_cache *));
485 memset(sql_pending_queries_queue, 0, pqq_ptr*sizeof(struct db_cache *));
486 index = pqq_ptr; pqq_ptr = 0;
487
488 /* We check for variable substitution in SQL table */
489 if (idata->dyn_table) {
490 time_t stamp = 0;
491
492 memset(tmpbuf, 0, LONGLONGSRVBUFLEN);
493 stamp = queue[0]->basetime;
494
495 prim_ptrs.data = &dummy_data;
496 primptrs_set_all_from_db_cache(&prim_ptrs, queue[0]);
497
498 strlcpy(idata->dyn_table_name, config.sql_table, SRVBUFLEN);
499 strlcpy(insert_clause, orig_insert_clause, LONGSRVBUFLEN);
500 strlcpy(update_clause, orig_update_clause, LONGSRVBUFLEN);
501 strlcpy(lock_clause, orig_lock_clause, LONGSRVBUFLEN);
502
503 handle_dynname_internal_strings_same(copy_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
504 handle_dynname_internal_strings_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
505 handle_dynname_internal_strings_same(update_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
506 handle_dynname_internal_strings_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
507 handle_dynname_internal_strings_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
508
509 pm_strftime_same(copy_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
510 pm_strftime_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
511 pm_strftime_same(update_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
512 pm_strftime_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
513 pm_strftime_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
514 }
515
516 if (config.sql_table_schema) sql_create_table(bed.p, &queue[0]->basetime, &prim_ptrs);
517
518 /* beginning DB transaction */
519 (*sqlfunc_cbr.lock)(bed.p);
520
521 /* for each element of the queue to be processed we execute sql_query(); the function
522 returns a non-zero value if DB has failed; then we use reprocess_queries_queue and
523 bulk_reprocess_queries_queue to handle reprocessing of specific elements or bulk
524 queue (of elements not being held in a pending_queries_queue) due to final COMMIT
525 failure */
526
527 memset(reprocess_queries_queue, 0, qq_size*sizeof(struct db_cache *));
528 memset(bulk_reprocess_queries_queue, 0, qq_size*sizeof(struct db_cache *));
529 reprocess_idx = 0; bulk_reprocess_idx = 0;
530
531 for (j = 0; j < index; j++) {
532 go_to_pending = FALSE;
533
534 if (idata->dyn_table && (!idata->dyn_table_time_only || !config.nfacctd_time_new || (config.sql_refresh_time != idata->timeslot))) {
535 time_t stamp = 0;
536
537 memset(tmpbuf, 0, LONGLONGSRVBUFLEN); // XXX: pedantic?
538 stamp = queue[idata->current_queue_elem]->basetime;
539 strlcpy(tmptable, config.sql_table, SRVBUFLEN);
540
541 prim_ptrs.data = &dummy_data;
542 primptrs_set_all_from_db_cache(&prim_ptrs, queue[idata->current_queue_elem]);
543 handle_dynname_internal_strings_same(tmptable, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
544 pm_strftime_same(tmptable, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
545
546 if (strncmp(idata->dyn_table_name, tmptable, SRVBUFLEN)) {
547 sql_pending_queries_queue[pqq_ptr] = queue[idata->current_queue_elem];
548
549 pqq_ptr++;
550 go_to_pending = TRUE;
551 }
552 }
553
554 if (!go_to_pending) {
555 if (queue[j]->valid == SQL_CACHE_COMMITTED) {
556 r = sql_query(&bed, queue[j], idata);
557
558 /* note down all elements in case of a reprocess due to COMMIT failure */
559 bulk_reprocess_queries_queue[bulk_reprocess_idx] = queue[j];
560 bulk_reprocess_idx++;
561 }
562 else r = FALSE; /* not valid elements are marked as not to be reprocessed */
563 if (r) {
564 reprocess_queries_queue[reprocess_idx] = queue[j];
565 reprocess_idx++;
566
567 if (!reprocess) sql_db_fail(&p);
568 reprocess = REPROCESS_SPECIFIC;
569 }
570 }
571 }
572
573 /* Finalizing DB transaction */
574 if (!p.fail) {
575 if (config.sql_use_copy) {
576 if (PQputCopyEnd(p.desc, NULL) < 0) Log(LOG_ERR, "ERROR ( %s/%s ): COPY failed!\n\n", config.name, config.type);
577 }
578
579 ret = PQexec(p.desc, "COMMIT");
580 if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
581 if (!reprocess) sql_db_fail(&p);
582 reprocess = REPROCESS_BULK;
583 }
584 PQclear(ret);
585 }
586
587 /* don't reprocess free (SQL_CACHE_FREE) and already recovered (SQL_CACHE_ERROR) elements */
588 if (p.fail) {
589 if (reprocess == REPROCESS_SPECIFIC) {
590 for (j = 0; j < reprocess_idx; j++) {
591 if (reprocess_queries_queue[j]->valid == SQL_CACHE_COMMITTED) sql_query(&bed, reprocess_queries_queue[j], idata);
592 }
593 }
594 else if (reprocess == REPROCESS_BULK) {
595 for (j = 0; j < bulk_reprocess_idx; j++) {
596 if (bulk_reprocess_queries_queue[j]->valid == SQL_CACHE_COMMITTED) sql_query(&bed, bulk_reprocess_queries_queue[j], idata);
597 }
598 }
599 }
600
601 if (b.connected) {
602 if (config.sql_use_copy) {
603 if (PQputCopyEnd(b.desc, NULL) < 0) Log(LOG_ERR, "ERROR ( %s/%s ): COPY failed!\n\n", config.name, config.type);
604 }
605 ret = PQexec(b.desc, "COMMIT");
606 if (PQresultStatus(ret) != PGRES_COMMAND_OK) sql_db_fail(&b);
607 PQclear(ret);
608 }
609
610 /* If we have pending queries then start again */
611 if (pqq_ptr) goto start;
612
613 idata->elap_time = time(NULL)-start;
614 Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %zu) ***\n",
615 config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);
616
617 if (config.sql_trigger_exec) {
618 if (queue && queue[0]) idata->basetime = queue[0]->basetime;
619 idata->elap_time = time(NULL)-start;
620 SQL_SetENV_child(idata);
621 }
622 }
623
PG_evaluate_history(int primitive)624 int PG_evaluate_history(int primitive)
625 {
626 if (config.sql_history) {
627 if (primitive) {
628 strncat(copy_clause, ", ", SPACELEFT(copy_clause));
629 strncat(insert_clause, ", ", SPACELEFT(insert_clause));
630 strncat(values[primitive].string, ", ", sizeof(values[primitive].string));
631 strncat(where[primitive].string, " AND ", sizeof(where[primitive].string));
632 }
633 if (!config.timestamps_since_epoch)
634 strncat(where[primitive].string, "to_timestamp(%u)::Timestamp without time zone = ", SPACELEFT(where[primitive].string));
635 else
636 strncat(where[primitive].string, "%u = ", SPACELEFT(where[primitive].string));
637 strncat(where[primitive].string, "stamp_inserted", SPACELEFT(where[primitive].string));
638
639 strncat(copy_clause, "stamp_updated, stamp_inserted", SPACELEFT(copy_clause));
640 strncat(insert_clause, "stamp_updated, stamp_inserted", SPACELEFT(insert_clause));
641 if (config.sql_use_copy) {
642 char default_delim[] = ",", delim_buf[SRVBUFLEN];
643
644 if (!config.sql_delimiter || !config.sql_use_copy)
645 snprintf(delim_buf, SRVBUFLEN, "%s ", default_delim);
646 else
647 snprintf(delim_buf, SRVBUFLEN, "%s ", config.sql_delimiter);
648
649 if (!config.timestamps_since_epoch) {
650 strncat(values[primitive].string, "%s", SPACELEFT(values[primitive].string));
651 strncat(values[primitive].string, delim_buf, SPACELEFT(values[primitive].string));
652 strncat(values[primitive].string, "%s", SPACELEFT(values[primitive].string));
653 values[primitive].handler = where[primitive].handler = count_copy_timestamp_handler;
654 }
655 else {
656 strncat(values[primitive].string, "%u", SPACELEFT(values[primitive].string));
657 strncat(values[primitive].string, delim_buf, SPACELEFT(values[primitive].string));
658 strncat(values[primitive].string, "%u", SPACELEFT(values[primitive].string));
659 values[primitive].handler = where[primitive].handler = count_timestamp_handler;
660 }
661 }
662 else {
663 if (!config.timestamps_since_epoch)
664 strncat(values[primitive].string, "to_timestamp(%u), to_timestamp(%u)", SPACELEFT(values[primitive].string));
665 else
666 strncat(values[primitive].string, "%u, %u", SPACELEFT(values[primitive].string));
667 values[primitive].handler = where[primitive].handler = count_timestamp_handler;
668 }
669 where[primitive].type = values[primitive].type = TIMESTAMP;
670
671 primitive++;
672 }
673
674 return primitive;
675 }
676
PG_compose_static_queries()677 int PG_compose_static_queries()
678 {
679 int primitives=0, set_primitives=0, set_event_primitives=0, have_flows=0, lock=0;
680 char default_delim[] = ",", delim_buf[SRVBUFLEN];
681
682 if (config.what_to_count & COUNT_FLOWS || (config.sql_table_version >= 4 &&
683 config.sql_table_version < SQL_TABLE_VERSION_BGP &&
684 !config.sql_optimize_clauses)) {
685 config.what_to_count |= COUNT_FLOWS;
686 have_flows = TRUE;
687
688 if ((config.sql_table_version < 4 || config.sql_table_version >= SQL_TABLE_VERSION_BGP) && !config.sql_optimize_clauses) {
689 Log(LOG_ERR, "ERROR ( %s/%s ): The accounting of flows requires SQL table v4. Exiting.\n", config.name, config.type);
690 exit_gracefully(1);
691 }
692 }
693
694 /* "INSERT INTO ... VALUES ... ", "COPY ... ", "... WHERE ..." stuff */
695 strncpy(where[primitives].string, " WHERE ", sizeof(where[primitives].string));
696 snprintf(copy_clause, sizeof(copy_clause), "COPY %s (", config.sql_table);
697 snprintf(insert_clause, sizeof(insert_clause), "INSERT INTO %s (", config.sql_table);
698 strncpy(values[primitives].string, " VALUES (", sizeof(values[primitives].string));
699 primitives = PG_evaluate_history(primitives);
700 primitives = sql_evaluate_primitives(primitives);
701
702 strncat(copy_clause, ", packets, bytes", SPACELEFT(copy_clause));
703 if (have_flows) strncat(copy_clause, ", flows", SPACELEFT(copy_clause));
704
705 if (!config.sql_delimiter || !config.sql_use_copy)
706 snprintf(delim_buf, SRVBUFLEN, ") FROM STDIN DELIMITER \'%s\'", default_delim);
707 else
708 snprintf(delim_buf, SRVBUFLEN, ") FROM STDIN DELIMITER \'%s\'", config.sql_delimiter);
709 strncat(copy_clause, delim_buf, SPACELEFT(copy_clause));
710
711 strncpy(insert_counters_clause, ", packets, bytes", SPACELEFT(insert_counters_clause));
712 if (have_flows) strncat(insert_counters_clause, ", flows", SPACELEFT(insert_counters_clause));
713 strncat(insert_counters_clause, ")", SPACELEFT(insert_counters_clause));
714 strncpy(insert_nocounters_clause, ")", SPACELEFT(insert_nocounters_clause));
715
716 /* "LOCK ..." stuff */
717
718 if (config.sql_dont_try_update) snprintf(lock_clause, sizeof(lock_clause), "BEGIN;");
719 else {
720 if (config.sql_locking_style) lock = sql_select_locking_style(config.sql_locking_style);
721 switch (lock) {
722 case PM_LOCK_NONE:
723 snprintf(lock_clause, sizeof(lock_clause), "BEGIN;");
724 break;
725 case PM_LOCK_ROW_EXCLUSIVE:
726 snprintf(lock_clause, sizeof(lock_clause), "BEGIN; LOCK %s IN ROW EXCLUSIVE MODE;", config.sql_table);
727 break;
728 case PM_LOCK_EXCLUSIVE:
729 default:
730 snprintf(lock_clause, sizeof(lock_clause), "BEGIN; LOCK %s IN EXCLUSIVE MODE;", config.sql_table);
731 break;
732 }
733 }
734
735 /* "UPDATE ... SET ..." stuff */
736 snprintf(update_clause, sizeof(update_clause), "UPDATE %s ", config.sql_table);
737
738 set_primitives = sql_compose_static_set(have_flows);
739 set_event_primitives = sql_compose_static_set_event();
740
741 if (config.sql_history) {
742 if (!config.timestamps_since_epoch) {
743 strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
744 strncat(set[set_primitives].string, "stamp_updated=CURRENT_TIMESTAMP(0)", SPACELEFT(set[set_primitives].string));
745 set[set_primitives].type = TIMESTAMP;
746 set[set_primitives].handler = count_noop_setclause_handler;
747 set_primitives++;
748
749 if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
750 else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
751 strncat(set_event[set_event_primitives].string, "stamp_updated=CURRENT_TIMESTAMP(0)", SPACELEFT(set_event[set_event_primitives].string));
752 set_event[set_event_primitives].type = TIMESTAMP;
753 set_event[set_event_primitives].handler = count_noop_setclause_handler;
754 set_event_primitives++;
755 }
756 else {
757 strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
758 strncat(set[set_primitives].string, "stamp_updated=DATE_PART('epoch',NOW())::BIGINT", SPACELEFT(set[set_primitives].string));
759 set[set_primitives].type = TIMESTAMP;
760 set[set_primitives].handler = count_noop_setclause_handler;
761 set_primitives++;
762
763 if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
764 else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
765 strncat(set_event[set_event_primitives].string, "stamp_updated=DATE_PART('epoch',NOW())::BIGINT", SPACELEFT(set_event[set_event_primitives].string));
766 set_event[set_event_primitives].type = TIMESTAMP;
767 set_event[set_event_primitives].handler = count_noop_setclause_handler;
768 set_primitives++;
769 }
770 }
771
772 /* values for COPY */
773 memcpy(©_values, &values, sizeof(copy_values));
774 {
775 int num, x, y;
776 char *ptr;
777
778 ptr = strchr(copy_values[0].string, '(');
779 ptr++; strcpy(copy_values[0].string, ptr);
780
781 for (num = 0; num < primitives; num++) {
782 for (x = 0; copy_values[num].string[x] != '\0'; x++) {
783 if (copy_values[num].string[x] == ' ' || copy_values[num].string[x] == '\'') {
784 for (y = x + 1; copy_values[num].string[y] != '\0'; y++)
785 copy_values[num].string[y-1] = copy_values[num].string[y];
786 copy_values[num].string[y-1] = '\0';
787 x--;
788 }
789 }
790 copy_values[num].string[x] = '\0';
791 }
792 }
793
794 return primitives;
795 }
796
PG_compose_conn_string(struct DBdesc * db,char * host,int port,char * ca_file)797 void PG_compose_conn_string(struct DBdesc *db, char *host, int port, char *ca_file)
798 {
799 char *string;
800 int slen = LONGLONGSRVBUFLEN;
801
802 if (!db->conn_string) {
803 db->conn_string = (char *) malloc(slen);
804 if (!db->conn_string) {
805 Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (PG_compose_conn_string). Exiting ..\n", config.name, config.type);
806 exit_gracefully(1);
807 }
808 string = db->conn_string;
809
810 snprintf(string, slen, "dbname=%s user=%s password=%s", config.sql_db, config.sql_user, config.sql_passwd);
811 slen -= strlen(string);
812 string += strlen(string);
813
814 if (host) snprintf(string, slen, " host=%s", host);
815 if (port) snprintf(string, slen, " port=%u", port);
816 if (ca_file) snprintf(string, slen, " sslmode=verify-full sslrootcert=%s", ca_file);
817 }
818 }
819
PG_Lock(struct DBdesc * db)820 void PG_Lock(struct DBdesc *db)
821 {
822 PGresult *PGret;
823
824 if (!db->fail) {
825 PGret = PQexec(db->desc, lock_clause);
826 if (PQresultStatus(PGret) != PGRES_COMMAND_OK) {
827 db->errmsg = PQresultErrorMessage(PGret);
828 sql_db_errmsg(db);
829 sql_db_fail(db);
830 }
831 PQclear(PGret);
832
833 /* If using COPY, let's initialize it */
834 if (config.sql_use_copy) {
835 PGret = PQexec(db->desc, copy_clause);
836 if (PQresultStatus(PGret) != PGRES_COPY_IN) {
837 db->errmsg = PQresultErrorMessage(PGret);
838 sql_db_errmsg(db);
839 sql_db_fail(db);
840 }
841 else Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n", config.name, config.type, copy_clause);
842 PQclear(PGret);
843 }
844 }
845 }
846
PG_DB_Connect(struct DBdesc * db,char * host)847 void PG_DB_Connect(struct DBdesc *db, char *host)
848 {
849 if (!db->fail) {
850 db->desc = PQconnectdb(db->conn_string);
851 if (PQstatus(db->desc) == CONNECTION_BAD) {
852 char errmsg[64+SRVBUFLEN];
853
854 sql_db_fail(db);
855 strcpy(errmsg, "Failed connecting to ");
856 strcat(errmsg, db->conn_string);
857 db->errmsg = errmsg;
858 sql_db_errmsg(db);
859 }
860 else sql_db_ok(db);
861 }
862 }
863
PG_DB_Close(struct BE_descs * bed)864 void PG_DB_Close(struct BE_descs *bed)
865 {
866 if (bed->p->connected) PQfinish(bed->p->desc);
867 if (bed->b->connected) PQfinish(bed->b->desc);
868 }
869
PG_create_dyn_table(struct DBdesc * db,char * buf)870 void PG_create_dyn_table(struct DBdesc *db, char *buf)
871 {
872 char *err_string;
873 PGresult *PGret;
874
875 if (!db->fail) {
876 PGret = PQexec(db->desc, buf);
877 if ((PQresultStatus(PGret) != PGRES_COMMAND_OK) &&
878 (PQresultStatus(PGret) != PGRES_TUPLES_OK)) {
879 err_string = PQresultErrorMessage(PGret);
880 Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, buf);
881 Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, err_string);
882 }
883 PQclear(PGret);
884 }
885 }
886
PG_affected_rows(PGresult * result)887 int PG_affected_rows(PGresult *result)
888 {
889 return atoi(PQcmdTuples(result));
890 }
891
PG_create_backend(struct DBdesc * db)892 void PG_create_backend(struct DBdesc *db)
893 {
894 if (db->type == BE_TYPE_PRIMARY) {
895 PG_compose_conn_string(db, config.sql_host, config.sql_port, config.sql_conn_ca_file);
896 }
897 else if (db->type == BE_TYPE_BACKUP) {
898 if (!config.sql_backup_host) return;
899 else PG_compose_conn_string(db, config.sql_backup_host, config.sql_port, config.sql_conn_ca_file);
900 }
901 }
902
PG_set_callbacks(struct sqlfunc_cb_registry * cbr)903 void PG_set_callbacks(struct sqlfunc_cb_registry *cbr)
904 {
905 memset(cbr, 0, sizeof(struct sqlfunc_cb_registry));
906
907 cbr->connect = PG_DB_Connect;
908 cbr->close = PG_DB_Close;
909 cbr->lock = PG_Lock;
910 /* cbr->unlock */
911 if (!config.sql_use_copy) cbr->op = PG_cache_dbop;
912 else cbr->op = PG_cache_dbop_copy;
913 cbr->create_table = PG_create_dyn_table;
914 cbr->purge = PG_cache_purge;
915 cbr->create_backend = PG_create_backend;
916 }
917
PG_init_default_values(struct insert_data * idata)918 void PG_init_default_values(struct insert_data *idata)
919 {
920 /* Linking database parameters */
921 if (!config.sql_data) config.sql_data = typed_str;
922 if (!config.sql_user) config.sql_user = pgsql_user;
923 if (!config.sql_db) config.sql_db = pgsql_db;
924 if (!config.sql_passwd) config.sql_passwd = pgsql_pwd;
925 if (!config.sql_table) {
926 /* checking 'typed' table constraints */
927 if (!strcmp(config.sql_data, "typed")) {
928 if (config.what_to_count & (COUNT_SRC_AS|COUNT_SUM_AS|COUNT_DST_AS) && config.what_to_count &
929 (COUNT_SRC_HOST|COUNT_SUM_HOST|COUNT_DST_HOST|COUNT_SRC_NET|COUNT_SUM_NET|COUNT_DST_NET) &&
930 config.sql_table_version < 6) {
931 Log(LOG_ERR, "ERROR ( %s/%s ): 'typed' PostgreSQL table in use: unable to mix HOST/NET and AS aggregations.\n", config.name, config.type);
932 exit_gracefully(1);
933 }
934 typed = TRUE;
935 }
936 else if (!strcmp(config.sql_data, "unified")) typed = FALSE;
937 else {
938 Log(LOG_ERR, "ERROR ( %s/%s ): Ignoring unknown 'sql_data' value '%s'.\n", config.name, config.type, config.sql_data);
939 exit_gracefully(1);
940 }
941
942 if (typed) {
943 if (config.sql_table_version == (SQL_TABLE_VERSION_BGP+1)) config.sql_table = pgsql_table_bgp;
944 else if (config.sql_table_version == 8) config.sql_table = pgsql_table_v8;
945 else if (config.sql_table_version == 7) config.sql_table = pgsql_table_v7;
946 else if (config.sql_table_version == 6) config.sql_table = pgsql_table_v6;
947 else if (config.sql_table_version == 5) {
948 if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v5;
949 else config.sql_table = pgsql_table_v5;
950 }
951 else if (config.sql_table_version == 4) {
952 if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v4;
953 else config.sql_table = pgsql_table_v4;
954 }
955 else if (config.sql_table_version == 3) {
956 if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v3;
957 else config.sql_table = pgsql_table_v3;
958 }
959 else if (config.sql_table_version == 2) {
960 if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v2;
961 else config.sql_table = pgsql_table_v2;
962 }
963 else {
964 if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as;
965 else config.sql_table = pgsql_table;
966 }
967 }
968 else {
969 if (config.sql_table_version == 8) {
970 Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
971 config.sql_table = pgsql_table_v8;
972 }
973 if (config.sql_table_version == 7) {
974 Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
975 config.sql_table = pgsql_table_v7;
976 }
977 else if (config.sql_table_version == 6) {
978 Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
979 config.sql_table = pgsql_table_v6;
980 }
981 else if (config.sql_table_version == 5) config.sql_table = pgsql_table_uni_v5;
982 else if (config.sql_table_version == 4) config.sql_table = pgsql_table_uni_v4;
983 else if (config.sql_table_version == 3) config.sql_table = pgsql_table_uni_v3;
984 else if (config.sql_table_version == 2) config.sql_table = pgsql_table_uni_v2;
985 else config.sql_table = pgsql_table_uni;
986 }
987 }
988 if (strchr(config.sql_table, '%') || strchr(config.sql_table, '$')) {
989 idata->dyn_table = TRUE;
990 if (!strchr(config.sql_table, '$')) idata->dyn_table_time_only = TRUE;
991 }
992 glob_dyn_table = idata->dyn_table;
993 glob_dyn_table_time_only = idata->dyn_table_time_only;
994
995 if (config.sql_backup_host) idata->recover = TRUE;
996 if (!config.sql_dont_try_update && config.sql_use_copy) config.sql_use_copy = FALSE;
997
998 if (config.sql_locking_style) idata->locks = sql_select_locking_style(config.sql_locking_style);
999 }
1000
PG_postgresql_get_version()1001 void PG_postgresql_get_version()
1002 {
1003 #if defined HAVE_PQLIBVERSION
1004 printf("PostgreSQL %u\n", PQlibVersion());
1005 #else
1006 printf("PostgreSQL\n");
1007 #endif
1008 }
1009