1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_hooks.h"
26 #include "sql_common.h"
27 #include "sql_common_m.h"
28 #include "pgsql_plugin.h"
29 
30 int typed = TRUE;
31 
32 char pgsql_user[] = "pmacct";
33 char pgsql_pwd[] = "arealsmartpwd";
34 char pgsql_db[] = "pmacct";
35 char pgsql_table[] = "acct";
36 char pgsql_table_v2[] = "acct_v2";
37 char pgsql_table_v3[] = "acct_v3";
38 char pgsql_table_v4[] = "acct_v4";
39 char pgsql_table_v5[] = "acct_v5";
40 char pgsql_table_v6[] = "acct_v6";
41 char pgsql_table_v7[] = "acct_v7";
42 char pgsql_table_v8[] = "acct_v8";
43 char pgsql_table_bgp[] = "acct_bgp";
44 char pgsql_table_uni[] = "acct_uni";
45 char pgsql_table_uni_v2[] = "acct_uni_v2";
46 char pgsql_table_uni_v3[] = "acct_uni_v3";
47 char pgsql_table_uni_v4[] = "acct_uni_v4";
48 char pgsql_table_uni_v5[] = "acct_uni_v5";
49 char pgsql_table_as[] = "acct_as";
50 char pgsql_table_as_v2[] = "acct_as_v2";
51 char pgsql_table_as_v3[] = "acct_as_v3";
52 char pgsql_table_as_v4[] = "acct_as_v4";
53 char pgsql_table_as_v5[] = "acct_as_v5";
54 char typed_str[] = "typed";
55 char unified_str[] = "unified";
56 
57 /* Functions */
pgsql_plugin(int pipe_fd,struct configuration * cfgptr,void * ptr)58 void pgsql_plugin(int pipe_fd, struct configuration *cfgptr, void *ptr)
59 {
60   struct pkt_data *data;
61   struct ports_table pt;
62   struct pollfd pfd;
63   struct insert_data idata;
64   time_t refresh_deadline;
65   int refresh_timeout;
66   int ret, num, recv_budget, poll_bypass;
67   struct ring *rg = &((struct channels_list_entry *)ptr)->rg;
68   struct ch_status *status = ((struct channels_list_entry *)ptr)->status;
69   int datasize = ((struct channels_list_entry *)ptr)->datasize;
70   u_int32_t bufsz = ((struct channels_list_entry *)ptr)->bufsize;
71   pid_t core_pid = ((struct channels_list_entry *)ptr)->core_pid;
72   struct networks_file_data nfd;
73   unsigned char *dataptr;
74 
75   unsigned char *rgptr;
76   int pollagain = TRUE;
77   u_int32_t seq = 1, rg_err_count = 0;
78 
79   struct extra_primitives extras;
80   struct primitives_ptrs prim_ptrs;
81 
82 #ifdef WITH_ZMQ
83   struct p_zmq_host *zmq_host = &((struct channels_list_entry *)ptr)->zmq_host;
84 #else
85   void *zmq_host = NULL;
86 #endif
87 
88 #ifdef WITH_REDIS
89   struct p_redis_host redis_host;
90 #endif
91 
92   memcpy(&config, cfgptr, sizeof(struct configuration));
93   memcpy(&extras, &((struct channels_list_entry *)ptr)->extras, sizeof(struct extra_primitives));
94   recollect_pipe_memory(ptr);
95   pm_setproctitle("%s [%s]", "PostgreSQL Plugin", config.name);
96 
97   memset(&idata, 0, sizeof(idata));
98   if (config.pidfile) write_pid_file_plugin(config.pidfile, config.type, config.name);
99   if (config.logfile) {
100     fclose(config.logfile_fd);
101     config.logfile_fd = open_output_file(config.logfile, "a", FALSE);
102   }
103 
104   sql_set_signals();
105   sql_init_default_values(&extras);
106   PG_init_default_values(&idata);
107   PG_set_callbacks(&sqlfunc_cbr);
108   sql_set_insert_func();
109 
110   /* some LOCAL initialization AFTER setting some default values */
111   reload_map = FALSE;
112   idata.now = time(NULL);
113   refresh_deadline = idata.now;
114   idata.cfg = &config;
115 
116   sql_init_maps(&extras, &prim_ptrs, &nt, &nc, &pt);
117   sql_init_global_buffers();
118   sql_init_historical_acct(idata.now, &idata);
119   sql_init_triggers(idata.now, &idata);
120   sql_init_refresh_deadline(&refresh_deadline);
121 
122   if (config.pipe_zmq) P_zmq_pipe_init(zmq_host, &pipe_fd, &seq);
123   else setnonblocking(pipe_fd);
124 
125   /* building up static SQL clauses */
126   idata.num_primitives = PG_compose_static_queries();
127   glob_num_primitives = idata.num_primitives;
128 
129   /* setting up environment variables */
130   SQL_SetENV();
131 
132   sql_link_backend_descriptors(&bed, &p, &b);
133 
134 #ifdef WITH_REDIS
135   if (config.redis_host) {
136     char log_id[SHORTBUFLEN];
137 
138     snprintf(log_id, sizeof(log_id), "%s/%s", config.name, config.type);
139     p_redis_init(&redis_host, log_id, p_redis_thread_produce_common_plugin_handler);
140   }
141 #endif
142 
143   /* plugin main loop */
144   for(;;) {
145     poll_again:
146     status->wakeup = TRUE;
147     poll_bypass = FALSE;
148     calc_refresh_timeout(refresh_deadline, idata.now, &refresh_timeout);
149 
150     pfd.fd = pipe_fd;
151     pfd.events = POLLIN;
152     ret = poll(&pfd, (pfd.fd == ERR ? 0 : 1), refresh_timeout);
153 
154     if (ret <= 0) {
155       if (getppid() != core_pid) {
156         Log(LOG_ERR, "ERROR ( %s/%s ): Core process *seems* gone. Exiting.\n", config.name, config.type);
157         exit_gracefully(1);
158       }
159 
160       if (ret < 0) goto poll_again;
161     }
162 
163     poll_ops:
164     idata.now = time(NULL);
165     now = idata.now;
166 
167     if (config.sql_history) {
168       while (idata.now > (idata.basetime + idata.timeslot)) {
169         time_t saved_basetime = idata.basetime;
170 
171         idata.basetime += idata.timeslot;
172         if (config.sql_history == COUNT_MONTHLY)
173           idata.timeslot = calc_monthly_timeslot(idata.basetime, config.sql_history_howmany, ADD);
174         glob_basetime = idata.basetime;
175         idata.new_basetime = saved_basetime;
176         glob_new_basetime = saved_basetime;
177       }
178     }
179 
180     if (idata.now > refresh_deadline) {
181       if (qq_ptr) sql_cache_flush(sql_queries_queue, qq_ptr, &idata, FALSE);
182       sql_cache_handle_flush_event(&idata, &refresh_deadline, &pt);
183     }
184     else {
185       if (config.sql_trigger_exec) {
186         while (idata.now > idata.triggertime && idata.t_timeslot > 0) {
187           sql_trigger_exec(config.sql_trigger_exec);
188           idata.triggertime += idata.t_timeslot;
189           if (config.sql_trigger_time == COUNT_MONTHLY)
190             idata.t_timeslot = calc_monthly_timeslot(idata.triggertime, config.sql_trigger_time_howmany, ADD);
191         }
192       }
193     }
194 
195     recv_budget = 0;
196     if (poll_bypass) {
197       poll_bypass = FALSE;
198       goto read_data;
199     }
200 
201     switch (ret) {
202     case 0: /* poll(): timeout */
203       break;
204     default: /* poll(): received data */
205       read_data:
206       if (recv_budget == DEFAULT_PLUGIN_COMMON_RECV_BUDGET) {
207 	poll_bypass = TRUE;
208 	goto poll_ops;
209       }
210 
211       if (config.pipe_homegrown) {
212         if (!pollagain) {
213           seq++;
214           seq %= MAX_SEQNUM;
215           if (seq == 0) rg_err_count = FALSE;
216           idata.now = time(NULL);
217 	  now = idata.now;
218         }
219         else {
220           if ((ret = read(pipe_fd, &rgptr, sizeof(rgptr))) == 0)
221             exit_gracefully(1); /* we exit silently; something happened at the write end */
222         }
223 
224         if ((rg->ptr + bufsz) > rg->end) rg->ptr = rg->base;
225 
226         if (((struct ch_buf_hdr *)rg->ptr)->seq != seq) {
227           if (!pollagain) {
228             pollagain = TRUE;
229             goto poll_again;
230           }
231           else {
232             rg_err_count++;
233             if (config.debug || (rg_err_count > MAX_RG_COUNT_ERR)) {
234               Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected (plugin_buffer_size=%" PRIu64 " plugin_pipe_size=%" PRIu64 ").\n",
235                         config.name, config.type, config.buffer_size, config.pipe_size);
236               Log(LOG_WARNING, "WARN ( %s/%s ): Increase values or look for plugin_buffer_size, plugin_pipe_size in CONFIG-KEYS document.\n\n",
237                         config.name, config.type);
238             }
239 
240 	    rg->ptr = (rg->base + status->last_buf_off);
241             seq = ((struct ch_buf_hdr *)rg->ptr)->seq;
242           }
243         }
244 
245         pollagain = FALSE;
246         memcpy(pipebuf, rg->ptr, bufsz);
247         rg->ptr += bufsz;
248       }
249 #ifdef WITH_ZMQ
250       else if (config.pipe_zmq) {
251 	ret = p_zmq_topic_recv(zmq_host, pipebuf, config.buffer_size);
252 	if (ret > 0) {
253 	  if (seq && (((struct ch_buf_hdr *)pipebuf)->seq != ((seq + 1) % MAX_SEQNUM))) {
254 	    Log(LOG_WARNING, "WARN ( %s/%s ): Missing data detected. Sequence received=%u expected=%u\n",
255 		config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->seq, ((seq + 1) % MAX_SEQNUM));
256 	  }
257 
258 	  seq = ((struct ch_buf_hdr *)pipebuf)->seq;
259 	}
260 	else goto poll_again;
261       }
262 #endif
263 
264       data = (struct pkt_data *) (pipebuf+sizeof(struct ch_buf_hdr));
265 
266       if (config.debug_internal_msg)
267         Log(LOG_DEBUG, "DEBUG ( %s/%s ): buffer received len=%" PRIu64 " seq=%u num_entries=%u\n",
268                 config.name, config.type, ((struct ch_buf_hdr *)pipebuf)->len, seq,
269                 ((struct ch_buf_hdr *)pipebuf)->num);
270 
271       while (((struct ch_buf_hdr *)pipebuf)->num > 0) {
272         for (num = 0; primptrs_funcs[num]; num++)
273           (*primptrs_funcs[num])((u_char *)data, &extras, &prim_ptrs);
274 
275 	for (num = 0; net_funcs[num]; num++)
276 	  (*net_funcs[num])(&nt, &nc, &data->primitives, prim_ptrs.pbgp, &nfd);
277 
278 	if (config.ports_file) {
279           if (!pt.table[data->primitives.src_port]) data->primitives.src_port = 0;
280           if (!pt.table[data->primitives.dst_port]) data->primitives.dst_port = 0;
281         }
282 
283         prim_ptrs.data = data;
284         (*insert_func)(&prim_ptrs, &idata);
285 
286         ((struct ch_buf_hdr *)pipebuf)->num--;
287         if (((struct ch_buf_hdr *)pipebuf)->num) {
288           dataptr = (unsigned char *) data;
289           if (!prim_ptrs.vlen_next_off) dataptr += datasize;
290           else dataptr += prim_ptrs.vlen_next_off;
291           data = (struct pkt_data *) dataptr;
292 	}
293       }
294 
295       goto read_data;
296     }
297   }
298 }
299 
PG_cache_dbop_copy(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)300 int PG_cache_dbop_copy(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
301 {
302   char *ptr_values, *ptr_where;
303   char default_delim[] = ",", delim_buf[SRVBUFLEN];
304   int num=0, have_flows=0;
305 
306   if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
307 
308   if (!config.sql_delimiter)
309     snprintf(delim_buf, SRVBUFLEN, "%s", default_delim);
310   else
311     snprintf(delim_buf, SRVBUFLEN, "%s", config.sql_delimiter);
312 
313   /* constructing SQL query */
314   ptr_where = where_clause;
315   ptr_values = values_clause;
316   memset(where_clause, 0, sizeof(where_clause));
317   memset(values_clause, 0, sizeof(values_clause));
318 
319   memcpy(&values, &copy_values, sizeof(values));
320   while (num < idata->num_primitives) {
321     (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
322     num++;
323   }
324 
325   if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), "%s%" PRIu64 "%s%" PRIu64 "%s%" PRIu64 "\n", delim_buf, cache_elem->packet_counter,
326 											delim_buf, cache_elem->bytes_counter,
327 											delim_buf, cache_elem->flows_counter);
328   else snprintf(ptr_values, SPACELEFT(values_clause), "%s%" PRIu64 "%s%" PRIu64 "\n", delim_buf, cache_elem->packet_counter,
329 									delim_buf, cache_elem->bytes_counter);
330 
331   strncpy(sql_data, values_clause, SPACELEFT(sql_data));
332 
333   if (PQputCopyData(db->desc, sql_data, strlen(sql_data)) < 0) { // avoid strlen()
334     db->errmsg = PQerrorMessage(db->desc);
335     Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
336     if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n", config.name, config.type, db->errmsg);
337     sql_db_fail(db);
338 
339     return TRUE;
340   }
341   idata->iqn++;
342   idata->een++;
343 
344   Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n", config.name, config.type, sql_data);
345 
346   return FALSE;
347 }
348 
PG_cache_dbop(struct DBdesc * db,struct db_cache * cache_elem,struct insert_data * idata)349 int PG_cache_dbop(struct DBdesc *db, struct db_cache *cache_elem, struct insert_data *idata)
350 {
351   PGresult *ret = NULL;
352   char *ptr_values, *ptr_where, *ptr_set;
353   int num=0, num_set=0, have_flows=0;
354 
355   if (config.what_to_count & COUNT_FLOWS) have_flows = TRUE;
356 
357   /* constructing SQL query */
358   ptr_where = where_clause;
359   ptr_values = values_clause;
360   ptr_set = set_clause;
361   memset(where_clause, 0, sizeof(where_clause));
362   memset(values_clause, 0, sizeof(values_clause));
363   memset(set_clause, 0, sizeof(set_clause));
364   memset(insert_full_clause, 0, sizeof(insert_full_clause));
365 
366   for (num = 0; num < idata->num_primitives; num++)
367     (*where[num].handler)(cache_elem, idata, num, &ptr_values, &ptr_where);
368 
369   if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
370     for (num_set = 0; set_event[num_set].type; num_set++)
371       (*set_event[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
372   }
373   else {
374     for (num_set = 0; set[num_set].type; num_set++)
375       (*set[num_set].handler)(cache_elem, idata, num_set, &ptr_set, NULL);
376   }
377 
378   /* sending UPDATE query a) if not switched off and
379      b) if we actually have something to update */
380   if (!config.sql_dont_try_update && num_set) {
381     strncpy(sql_data, update_clause, SPACELEFT(sql_data));
382     strncat(sql_data, set_clause, SPACELEFT(sql_data));
383     strncat(sql_data, where_clause, SPACELEFT(sql_data));
384 
385     ret = PQexec(db->desc, sql_data);
386     if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
387       db->errmsg = PQresultErrorMessage(ret);
388       PQclear(ret);
389       Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
390       if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
391       sql_db_fail(db);
392 
393       return TRUE;
394     }
395     PQclear(ret);
396   }
397 
398   if (config.sql_dont_try_update || !num_set || (!PG_affected_rows(ret))) {
399     /* UPDATE failed, trying with an INSERT query */
400     if (cache_elem->flow_type == NF9_FTYPE_EVENT || cache_elem->flow_type == NF9_FTYPE_OPTION) {
401       strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
402       strncat(insert_full_clause, insert_nocounters_clause, SPACELEFT(insert_full_clause));
403       strncat(ptr_values, ")", SPACELEFT(values_clause));
404     }
405     else {
406       strncpy(insert_full_clause, insert_clause, SPACELEFT(insert_full_clause));
407       strncat(insert_full_clause, insert_counters_clause, SPACELEFT(insert_full_clause));
408       if (have_flows) snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter, cache_elem->flows_counter);
409       else snprintf(ptr_values, SPACELEFT(values_clause), ", %" PRIu64 ", %" PRIu64 ")", cache_elem->packet_counter, cache_elem->bytes_counter);
410     }
411     strncpy(sql_data, insert_full_clause, sizeof(sql_data));
412     strncat(sql_data, values_clause, SPACELEFT(sql_data));
413 
414     ret = PQexec(db->desc, sql_data);
415     if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
416       db->errmsg = PQresultErrorMessage(ret);
417       PQclear(ret);
418       Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, sql_data);
419       if (db->errmsg) Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, db->errmsg);
420       sql_db_fail(db);
421 
422       return TRUE;
423     }
424     PQclear(ret);
425     idata->iqn++;
426   }
427   else idata->uqn++;
428   idata->een++;
429 
430   Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n\n", config.name, config.type, sql_data);
431 
432   return FALSE;
433 }
434 
PG_cache_purge(struct db_cache * queue[],int index,struct insert_data * idata)435 void PG_cache_purge(struct db_cache *queue[], int index, struct insert_data *idata)
436 {
437   PGresult *ret;
438   struct db_cache **reprocess_queries_queue, **bulk_reprocess_queries_queue;
439   char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
440   char orig_copy_clause[LONGSRVBUFLEN], tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
441   time_t start;
442   int j, r, reprocess = 0, stop, go_to_pending, reprocess_idx, bulk_reprocess_idx, saved_index = index;
443   struct primitives_ptrs prim_ptrs;
444   struct pkt_data dummy_data;
445   pid_t writer_pid = getpid();
446 
447   if (!index) {
448     Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
449     Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: 0/0, ET: X) ***\n", config.name, config.type, writer_pid);
450     return;
451   }
452 
453   memset(&prim_ptrs, 0, sizeof(prim_ptrs));
454   memset(&dummy_data, 0, sizeof(dummy_data));
455 
456   reprocess_queries_queue = (struct db_cache **) malloc(qq_size*sizeof(struct db_cache *));
457   bulk_reprocess_queries_queue = (struct db_cache **) malloc(qq_size*sizeof(struct db_cache *));
458   if (!reprocess_queries_queue || !bulk_reprocess_queries_queue) {
459     Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (reprocess_queries_queue). Exiting ..\n", config.name, config.type);
460     exit_gracefully(1);
461   }
462 
463   for (j = 0, stop = 0; (!stop) && sql_preprocess_funcs[j]; j++)
464     stop = sql_preprocess_funcs[j](queue, &index, j);
465 
466   if ((config.what_to_count & COUNT_CLASS) || (config.what_to_count_2 & COUNT_NDPI_CLASS))
467     sql_invalidate_shadow_entries(queue, &index);
468 
469   idata->ten = index;
470 
471   Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - START (PID: %u) ***\n", config.name, config.type, writer_pid);
472   start = time(NULL);
473 
474   /* re-using pending queries queue stuff from parent and saving clauses */
475   memcpy(sql_pending_queries_queue, queue, index*sizeof(struct db_cache *));
476   pqq_ptr = index;
477 
478   strlcpy(orig_copy_clause, copy_clause, LONGSRVBUFLEN);
479   strlcpy(orig_insert_clause, insert_clause, LONGSRVBUFLEN);
480   strlcpy(orig_update_clause, update_clause, LONGSRVBUFLEN);
481   strlcpy(orig_lock_clause, lock_clause, LONGSRVBUFLEN);
482 
483   start:
484   memcpy(queue, sql_pending_queries_queue, pqq_ptr*sizeof(struct db_cache *));
485   memset(sql_pending_queries_queue, 0, pqq_ptr*sizeof(struct db_cache *));
486   index = pqq_ptr; pqq_ptr = 0;
487 
488   /* We check for variable substitution in SQL table */
489   if (idata->dyn_table) {
490     time_t stamp = 0;
491 
492     memset(tmpbuf, 0, LONGLONGSRVBUFLEN);
493     stamp = queue[0]->basetime;
494 
495     prim_ptrs.data = &dummy_data;
496     primptrs_set_all_from_db_cache(&prim_ptrs, queue[0]);
497 
498     strlcpy(idata->dyn_table_name, config.sql_table, SRVBUFLEN);
499     strlcpy(insert_clause, orig_insert_clause, LONGSRVBUFLEN);
500     strlcpy(update_clause, orig_update_clause, LONGSRVBUFLEN);
501     strlcpy(lock_clause, orig_lock_clause, LONGSRVBUFLEN);
502 
503     handle_dynname_internal_strings_same(copy_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
504     handle_dynname_internal_strings_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
505     handle_dynname_internal_strings_same(update_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
506     handle_dynname_internal_strings_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
507     handle_dynname_internal_strings_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
508 
509     pm_strftime_same(copy_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
510     pm_strftime_same(insert_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
511     pm_strftime_same(update_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
512     pm_strftime_same(lock_clause, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
513     pm_strftime_same(idata->dyn_table_name, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
514   }
515 
516   if (config.sql_table_schema) sql_create_table(bed.p, &queue[0]->basetime, &prim_ptrs);
517 
518   /* beginning DB transaction */
519   (*sqlfunc_cbr.lock)(bed.p);
520 
521   /* for each element of the queue to be processed we execute sql_query(); the function
522      returns a non-zero value if DB has failed; then we use reprocess_queries_queue and
523      bulk_reprocess_queries_queue to handle reprocessing of specific elements or bulk
524      queue (of elements not being held in a pending_queries_queue) due to final COMMIT
525      failure */
526 
527   memset(reprocess_queries_queue, 0, qq_size*sizeof(struct db_cache *));
528   memset(bulk_reprocess_queries_queue, 0, qq_size*sizeof(struct db_cache *));
529   reprocess_idx = 0; bulk_reprocess_idx = 0;
530 
531   for (j = 0; j < index; j++) {
532     go_to_pending = FALSE;
533 
534     if (idata->dyn_table && (!idata->dyn_table_time_only || !config.nfacctd_time_new || (config.sql_refresh_time != idata->timeslot))) {
535       time_t stamp = 0;
536 
537       memset(tmpbuf, 0, LONGLONGSRVBUFLEN); // XXX: pedantic?
538       stamp = queue[idata->current_queue_elem]->basetime;
539       strlcpy(tmptable, config.sql_table, SRVBUFLEN);
540 
541       prim_ptrs.data = &dummy_data;
542       primptrs_set_all_from_db_cache(&prim_ptrs, queue[idata->current_queue_elem]);
543       handle_dynname_internal_strings_same(tmptable, LONGSRVBUFLEN, tmpbuf, &prim_ptrs, DYN_STR_SQL_TABLE);
544       pm_strftime_same(tmptable, LONGSRVBUFLEN, tmpbuf, &stamp, config.timestamps_utc);
545 
546       if (strncmp(idata->dyn_table_name, tmptable, SRVBUFLEN)) {
547         sql_pending_queries_queue[pqq_ptr] = queue[idata->current_queue_elem];
548 
549         pqq_ptr++;
550         go_to_pending = TRUE;
551       }
552     }
553 
554     if (!go_to_pending) {
555       if (queue[j]->valid == SQL_CACHE_COMMITTED) {
556 	r = sql_query(&bed, queue[j], idata);
557 
558 	/* note down all elements in case of a reprocess due to COMMIT failure */
559 	bulk_reprocess_queries_queue[bulk_reprocess_idx] = queue[j];
560 	bulk_reprocess_idx++;
561       }
562       else r = FALSE; /* not valid elements are marked as not to be reprocessed */
563       if (r) {
564         reprocess_queries_queue[reprocess_idx] = queue[j];
565         reprocess_idx++;
566 
567 	if (!reprocess) sql_db_fail(&p);
568         reprocess = REPROCESS_SPECIFIC;
569       }
570     }
571   }
572 
573   /* Finalizing DB transaction */
574   if (!p.fail) {
575     if (config.sql_use_copy) {
576       if (PQputCopyEnd(p.desc, NULL) < 0) Log(LOG_ERR, "ERROR ( %s/%s ): COPY failed!\n\n", config.name, config.type);
577     }
578 
579     ret = PQexec(p.desc, "COMMIT");
580     if (PQresultStatus(ret) != PGRES_COMMAND_OK) {
581       if (!reprocess) sql_db_fail(&p);
582       reprocess = REPROCESS_BULK;
583     }
584     PQclear(ret);
585   }
586 
587   /* don't reprocess free (SQL_CACHE_FREE) and already recovered (SQL_CACHE_ERROR) elements */
588   if (p.fail) {
589     if (reprocess == REPROCESS_SPECIFIC) {
590       for (j = 0; j < reprocess_idx; j++) {
591         if (reprocess_queries_queue[j]->valid == SQL_CACHE_COMMITTED) sql_query(&bed, reprocess_queries_queue[j], idata);
592       }
593     }
594     else if (reprocess == REPROCESS_BULK) {
595       for (j = 0; j < bulk_reprocess_idx; j++) {
596         if (bulk_reprocess_queries_queue[j]->valid == SQL_CACHE_COMMITTED) sql_query(&bed, bulk_reprocess_queries_queue[j], idata);
597       }
598     }
599   }
600 
601   if (b.connected) {
602     if (config.sql_use_copy) {
603       if (PQputCopyEnd(b.desc, NULL) < 0) Log(LOG_ERR, "ERROR ( %s/%s ): COPY failed!\n\n", config.name, config.type);
604     }
605     ret = PQexec(b.desc, "COMMIT");
606     if (PQresultStatus(ret) != PGRES_COMMAND_OK) sql_db_fail(&b);
607     PQclear(ret);
608   }
609 
610   /* If we have pending queries then start again */
611   if (pqq_ptr) goto start;
612 
613   idata->elap_time = time(NULL)-start;
614   Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %zu) ***\n",
615 		config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);
616 
617   if (config.sql_trigger_exec) {
618     if (queue && queue[0]) idata->basetime = queue[0]->basetime;
619     idata->elap_time = time(NULL)-start;
620     SQL_SetENV_child(idata);
621   }
622 }
623 
PG_evaluate_history(int primitive)624 int PG_evaluate_history(int primitive)
625 {
626   if (config.sql_history) {
627     if (primitive) {
628       strncat(copy_clause, ", ", SPACELEFT(copy_clause));
629       strncat(insert_clause, ", ", SPACELEFT(insert_clause));
630       strncat(values[primitive].string, ", ", sizeof(values[primitive].string));
631       strncat(where[primitive].string, " AND ", sizeof(where[primitive].string));
632     }
633     if (!config.timestamps_since_epoch)
634       strncat(where[primitive].string, "to_timestamp(%u)::Timestamp without time zone = ", SPACELEFT(where[primitive].string));
635     else
636       strncat(where[primitive].string, "%u = ", SPACELEFT(where[primitive].string));
637     strncat(where[primitive].string, "stamp_inserted", SPACELEFT(where[primitive].string));
638 
639     strncat(copy_clause, "stamp_updated, stamp_inserted", SPACELEFT(copy_clause));
640     strncat(insert_clause, "stamp_updated, stamp_inserted", SPACELEFT(insert_clause));
641     if (config.sql_use_copy) {
642       char default_delim[] = ",", delim_buf[SRVBUFLEN];
643 
644       if (!config.sql_delimiter || !config.sql_use_copy)
645         snprintf(delim_buf, SRVBUFLEN, "%s ", default_delim);
646       else
647         snprintf(delim_buf, SRVBUFLEN, "%s ", config.sql_delimiter);
648 
649       if (!config.timestamps_since_epoch) {
650 	strncat(values[primitive].string, "%s", SPACELEFT(values[primitive].string));
651 	strncat(values[primitive].string, delim_buf, SPACELEFT(values[primitive].string));
652 	strncat(values[primitive].string, "%s", SPACELEFT(values[primitive].string));
653         values[primitive].handler = where[primitive].handler = count_copy_timestamp_handler;
654       }
655       else {
656 	strncat(values[primitive].string, "%u", SPACELEFT(values[primitive].string));
657 	strncat(values[primitive].string, delim_buf, SPACELEFT(values[primitive].string));
658 	strncat(values[primitive].string, "%u", SPACELEFT(values[primitive].string));
659         values[primitive].handler = where[primitive].handler = count_timestamp_handler;
660       }
661     }
662     else {
663       if (!config.timestamps_since_epoch)
664 	strncat(values[primitive].string, "to_timestamp(%u), to_timestamp(%u)", SPACELEFT(values[primitive].string));
665       else
666 	strncat(values[primitive].string, "%u, %u", SPACELEFT(values[primitive].string));
667       values[primitive].handler = where[primitive].handler = count_timestamp_handler;
668     }
669     where[primitive].type = values[primitive].type = TIMESTAMP;
670 
671     primitive++;
672   }
673 
674   return primitive;
675 }
676 
PG_compose_static_queries()677 int PG_compose_static_queries()
678 {
679   int primitives=0, set_primitives=0, set_event_primitives=0, have_flows=0, lock=0;
680   char default_delim[] = ",", delim_buf[SRVBUFLEN];
681 
682   if (config.what_to_count & COUNT_FLOWS || (config.sql_table_version >= 4 &&
683                                              config.sql_table_version < SQL_TABLE_VERSION_BGP &&
684                                              !config.sql_optimize_clauses)) {
685     config.what_to_count |= COUNT_FLOWS;
686     have_flows = TRUE;
687 
688     if ((config.sql_table_version < 4 || config.sql_table_version >= SQL_TABLE_VERSION_BGP) && !config.sql_optimize_clauses) {
689       Log(LOG_ERR, "ERROR ( %s/%s ): The accounting of flows requires SQL table v4. Exiting.\n", config.name, config.type);
690       exit_gracefully(1);
691     }
692   }
693 
694   /* "INSERT INTO ... VALUES ... ", "COPY ... ", "... WHERE ..." stuff */
695   strncpy(where[primitives].string, " WHERE ", sizeof(where[primitives].string));
696   snprintf(copy_clause, sizeof(copy_clause), "COPY %s (", config.sql_table);
697   snprintf(insert_clause, sizeof(insert_clause), "INSERT INTO %s (", config.sql_table);
698   strncpy(values[primitives].string, " VALUES (", sizeof(values[primitives].string));
699   primitives = PG_evaluate_history(primitives);
700   primitives = sql_evaluate_primitives(primitives);
701 
702   strncat(copy_clause, ", packets, bytes", SPACELEFT(copy_clause));
703   if (have_flows) strncat(copy_clause, ", flows", SPACELEFT(copy_clause));
704 
705   if (!config.sql_delimiter || !config.sql_use_copy)
706     snprintf(delim_buf, SRVBUFLEN, ") FROM STDIN DELIMITER \'%s\'", default_delim);
707   else
708     snprintf(delim_buf, SRVBUFLEN, ") FROM STDIN DELIMITER \'%s\'", config.sql_delimiter);
709   strncat(copy_clause, delim_buf, SPACELEFT(copy_clause));
710 
711   strncpy(insert_counters_clause, ", packets, bytes", SPACELEFT(insert_counters_clause));
712   if (have_flows) strncat(insert_counters_clause, ", flows", SPACELEFT(insert_counters_clause));
713   strncat(insert_counters_clause, ")", SPACELEFT(insert_counters_clause));
714   strncpy(insert_nocounters_clause, ")", SPACELEFT(insert_nocounters_clause));
715 
716   /* "LOCK ..." stuff */
717 
718   if (config.sql_dont_try_update) snprintf(lock_clause, sizeof(lock_clause), "BEGIN;");
719   else {
720     if (config.sql_locking_style) lock = sql_select_locking_style(config.sql_locking_style);
721     switch (lock) {
722     case PM_LOCK_NONE:
723       snprintf(lock_clause, sizeof(lock_clause), "BEGIN;");
724       break;
725     case PM_LOCK_ROW_EXCLUSIVE:
726       snprintf(lock_clause, sizeof(lock_clause), "BEGIN; LOCK %s IN ROW EXCLUSIVE MODE;", config.sql_table);
727       break;
728     case PM_LOCK_EXCLUSIVE:
729     default:
730       snprintf(lock_clause, sizeof(lock_clause), "BEGIN; LOCK %s IN EXCLUSIVE MODE;", config.sql_table);
731       break;
732     }
733   }
734 
735   /* "UPDATE ... SET ..." stuff */
736   snprintf(update_clause, sizeof(update_clause), "UPDATE %s ", config.sql_table);
737 
738   set_primitives = sql_compose_static_set(have_flows);
739   set_event_primitives = sql_compose_static_set_event();
740 
741   if (config.sql_history) {
742     if (!config.timestamps_since_epoch) {
743       strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
744       strncat(set[set_primitives].string, "stamp_updated=CURRENT_TIMESTAMP(0)", SPACELEFT(set[set_primitives].string));
745       set[set_primitives].type = TIMESTAMP;
746       set[set_primitives].handler = count_noop_setclause_handler;
747       set_primitives++;
748 
749       if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
750       else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
751       strncat(set_event[set_event_primitives].string, "stamp_updated=CURRENT_TIMESTAMP(0)", SPACELEFT(set_event[set_event_primitives].string));
752       set_event[set_event_primitives].type = TIMESTAMP;
753       set_event[set_event_primitives].handler = count_noop_setclause_handler;
754       set_event_primitives++;
755     }
756     else {
757       strncpy(set[set_primitives].string, ", ", SPACELEFT(set[set_primitives].string));
758       strncat(set[set_primitives].string, "stamp_updated=DATE_PART('epoch',NOW())::BIGINT", SPACELEFT(set[set_primitives].string));
759       set[set_primitives].type = TIMESTAMP;
760       set[set_primitives].handler = count_noop_setclause_handler;
761       set_primitives++;
762 
763       if (set_event_primitives) strncpy(set_event[set_event_primitives].string, ", ", SPACELEFT(set_event[set_event_primitives].string));
764       else strncpy(set_event[set_event_primitives].string, "SET ", SPACELEFT(set_event[set_event_primitives].string));
765       strncat(set_event[set_event_primitives].string, "stamp_updated=DATE_PART('epoch',NOW())::BIGINT", SPACELEFT(set_event[set_event_primitives].string));
766       set_event[set_event_primitives].type = TIMESTAMP;
767       set_event[set_event_primitives].handler = count_noop_setclause_handler;
768       set_primitives++;
769     }
770   }
771 
772   /* values for COPY */
773   memcpy(&copy_values, &values, sizeof(copy_values));
774   {
775     int num, x, y;
776     char *ptr;
777 
778     ptr = strchr(copy_values[0].string, '(');
779     ptr++; strcpy(copy_values[0].string, ptr);
780 
781     for (num = 0; num < primitives; num++) {
782       for (x = 0; copy_values[num].string[x] != '\0'; x++) {
783 	if (copy_values[num].string[x] == ' ' || copy_values[num].string[x] == '\'') {
784 	  for (y = x + 1; copy_values[num].string[y] != '\0'; y++)
785             copy_values[num].string[y-1] = copy_values[num].string[y];
786           copy_values[num].string[y-1] = '\0';
787           x--;
788         }
789       }
790       copy_values[num].string[x] = '\0';
791     }
792   }
793 
794   return primitives;
795 }
796 
PG_compose_conn_string(struct DBdesc * db,char * host,int port,char * ca_file)797 void PG_compose_conn_string(struct DBdesc *db, char *host, int port, char *ca_file)
798 {
799   char *string;
800   int slen = LONGLONGSRVBUFLEN;
801 
802   if (!db->conn_string) {
803     db->conn_string = (char *) malloc(slen);
804     if (!db->conn_string) {
805       Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (PG_compose_conn_string). Exiting ..\n", config.name, config.type);
806       exit_gracefully(1);
807     }
808     string = db->conn_string;
809 
810     snprintf(string, slen, "dbname=%s user=%s password=%s", config.sql_db, config.sql_user, config.sql_passwd);
811     slen -= strlen(string);
812     string += strlen(string);
813 
814     if (host) snprintf(string, slen, " host=%s", host);
815     if (port) snprintf(string, slen, " port=%u", port);
816     if (ca_file) snprintf(string, slen, " sslmode=verify-full sslrootcert=%s", ca_file);
817   }
818 }
819 
PG_Lock(struct DBdesc * db)820 void PG_Lock(struct DBdesc *db)
821 {
822   PGresult *PGret;
823 
824   if (!db->fail) {
825     PGret = PQexec(db->desc, lock_clause);
826     if (PQresultStatus(PGret) != PGRES_COMMAND_OK) {
827       db->errmsg = PQresultErrorMessage(PGret);
828       sql_db_errmsg(db);
829       sql_db_fail(db);
830     }
831     PQclear(PGret);
832 
833     /* If using COPY, let's initialize it */
834     if (config.sql_use_copy) {
835       PGret = PQexec(db->desc, copy_clause);
836       if (PQresultStatus(PGret) != PGRES_COPY_IN) {
837 	db->errmsg = PQresultErrorMessage(PGret);
838 	sql_db_errmsg(db);
839 	sql_db_fail(db);
840       }
841       else Log(LOG_DEBUG, "DEBUG ( %s/%s ): %s\n", config.name, config.type, copy_clause);
842       PQclear(PGret);
843     }
844   }
845 }
846 
PG_DB_Connect(struct DBdesc * db,char * host)847 void PG_DB_Connect(struct DBdesc *db, char *host)
848 {
849   if (!db->fail) {
850     db->desc = PQconnectdb(db->conn_string);
851     if (PQstatus(db->desc) == CONNECTION_BAD) {
852       char errmsg[64+SRVBUFLEN];
853 
854       sql_db_fail(db);
855       strcpy(errmsg, "Failed connecting to ");
856       strcat(errmsg, db->conn_string);
857       db->errmsg = errmsg;
858       sql_db_errmsg(db);
859     }
860     else sql_db_ok(db);
861   }
862 }
863 
PG_DB_Close(struct BE_descs * bed)864 void PG_DB_Close(struct BE_descs *bed)
865 {
866   if (bed->p->connected) PQfinish(bed->p->desc);
867   if (bed->b->connected) PQfinish(bed->b->desc);
868 }
869 
PG_create_dyn_table(struct DBdesc * db,char * buf)870 void PG_create_dyn_table(struct DBdesc *db, char *buf)
871 {
872   char *err_string;
873   PGresult *PGret;
874 
875   if (!db->fail) {
876     PGret = PQexec(db->desc, buf);
877     if ((PQresultStatus(PGret) != PGRES_COMMAND_OK) &&
878 	(PQresultStatus(PGret) != PGRES_TUPLES_OK)) {
879       err_string = PQresultErrorMessage(PGret);
880       Log(LOG_DEBUG, "DEBUG ( %s/%s ): FAILED query follows:\n%s\n", config.name, config.type, buf);
881       Log(LOG_ERR, "ERROR ( %s/%s ): %s\n\n", config.name, config.type, err_string);
882     }
883     PQclear(PGret);
884   }
885 }
886 
PG_affected_rows(PGresult * result)887 int PG_affected_rows(PGresult *result)
888 {
889   return atoi(PQcmdTuples(result));
890 }
891 
PG_create_backend(struct DBdesc * db)892 void PG_create_backend(struct DBdesc *db)
893 {
894   if (db->type == BE_TYPE_PRIMARY) {
895     PG_compose_conn_string(db, config.sql_host, config.sql_port, config.sql_conn_ca_file);
896   }
897   else if (db->type == BE_TYPE_BACKUP) {
898     if (!config.sql_backup_host) return;
899     else PG_compose_conn_string(db, config.sql_backup_host, config.sql_port, config.sql_conn_ca_file);
900   }
901 }
902 
PG_set_callbacks(struct sqlfunc_cb_registry * cbr)903 void PG_set_callbacks(struct sqlfunc_cb_registry *cbr)
904 {
905   memset(cbr, 0, sizeof(struct sqlfunc_cb_registry));
906 
907   cbr->connect = PG_DB_Connect;
908   cbr->close = PG_DB_Close;
909   cbr->lock = PG_Lock;
910   /* cbr->unlock */
911   if (!config.sql_use_copy) cbr->op = PG_cache_dbop;
912   else cbr->op = PG_cache_dbop_copy;
913   cbr->create_table = PG_create_dyn_table;
914   cbr->purge = PG_cache_purge;
915   cbr->create_backend = PG_create_backend;
916 }
917 
PG_init_default_values(struct insert_data * idata)918 void PG_init_default_values(struct insert_data *idata)
919 {
920   /* Linking database parameters */
921   if (!config.sql_data) config.sql_data = typed_str;
922   if (!config.sql_user) config.sql_user = pgsql_user;
923   if (!config.sql_db) config.sql_db = pgsql_db;
924   if (!config.sql_passwd) config.sql_passwd = pgsql_pwd;
925   if (!config.sql_table) {
926     /* checking 'typed' table constraints */
927     if (!strcmp(config.sql_data, "typed")) {
928       if (config.what_to_count & (COUNT_SRC_AS|COUNT_SUM_AS|COUNT_DST_AS) && config.what_to_count &
929 	(COUNT_SRC_HOST|COUNT_SUM_HOST|COUNT_DST_HOST|COUNT_SRC_NET|COUNT_SUM_NET|COUNT_DST_NET) &&
930 	config.sql_table_version < 6) {
931 	Log(LOG_ERR, "ERROR ( %s/%s ): 'typed' PostgreSQL table in use: unable to mix HOST/NET and AS aggregations.\n", config.name, config.type);
932 	exit_gracefully(1);
933       }
934       typed = TRUE;
935     }
936     else if (!strcmp(config.sql_data, "unified")) typed = FALSE;
937     else {
938       Log(LOG_ERR, "ERROR ( %s/%s ): Ignoring unknown 'sql_data' value '%s'.\n", config.name, config.type, config.sql_data);
939       exit_gracefully(1);
940     }
941 
942     if (typed) {
943       if (config.sql_table_version == (SQL_TABLE_VERSION_BGP+1)) config.sql_table = pgsql_table_bgp;
944       else if (config.sql_table_version == 8) config.sql_table = pgsql_table_v8;
945       else if (config.sql_table_version == 7) config.sql_table = pgsql_table_v7;
946       else if (config.sql_table_version == 6) config.sql_table = pgsql_table_v6;
947       else if (config.sql_table_version == 5) {
948         if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v5;
949         else config.sql_table = pgsql_table_v5;
950       }
951       else if (config.sql_table_version == 4) {
952 	if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v4;
953 	else config.sql_table = pgsql_table_v4;
954       }
955       else if (config.sql_table_version == 3) {
956 	if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v3;
957 	else config.sql_table = pgsql_table_v3;
958       }
959       else if (config.sql_table_version == 2) {
960 	if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as_v2;
961 	else config.sql_table = pgsql_table_v2;
962       }
963       else {
964 	if (config.what_to_count & (COUNT_SRC_AS|COUNT_DST_AS|COUNT_SUM_AS)) config.sql_table = pgsql_table_as;
965 	else config.sql_table = pgsql_table;
966       }
967     }
968     else {
969       if (config.sql_table_version == 8) {
970         Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
971         config.sql_table = pgsql_table_v8;
972       }
973       if (config.sql_table_version == 7) {
974 	Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
975 	config.sql_table = pgsql_table_v7;
976       }
977       else if (config.sql_table_version == 6) {
978 	Log(LOG_WARNING, "WARN ( %s/%s ): Unified data are no longer supported. Switching to typed data.\n", config.name, config.type);
979 	config.sql_table = pgsql_table_v6;
980       }
981       else if (config.sql_table_version == 5) config.sql_table = pgsql_table_uni_v5;
982       else if (config.sql_table_version == 4) config.sql_table = pgsql_table_uni_v4;
983       else if (config.sql_table_version == 3) config.sql_table = pgsql_table_uni_v3;
984       else if (config.sql_table_version == 2) config.sql_table = pgsql_table_uni_v2;
985       else config.sql_table = pgsql_table_uni;
986     }
987   }
988   if (strchr(config.sql_table, '%') || strchr(config.sql_table, '$')) {
989     idata->dyn_table = TRUE;
990     if (!strchr(config.sql_table, '$')) idata->dyn_table_time_only = TRUE;
991   }
992   glob_dyn_table = idata->dyn_table;
993   glob_dyn_table_time_only = idata->dyn_table_time_only;
994 
995   if (config.sql_backup_host) idata->recover = TRUE;
996   if (!config.sql_dont_try_update && config.sql_use_copy) config.sql_use_copy = FALSE;
997 
998   if (config.sql_locking_style) idata->locks = sql_select_locking_style(config.sql_locking_style);
999 }
1000 
PG_postgresql_get_version()1001 void PG_postgresql_get_version()
1002 {
1003 #if defined HAVE_PQLIBVERSION
1004   printf("PostgreSQL %u\n", PQlibVersion());
1005 #else
1006   printf("PostgreSQL\n");
1007 #endif
1008 }
1009