1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * As an additional exemption you are allowed to compile & link against the
19  * OpenSSL libraries as published by the OpenSSL project. See the file
20  * COPYING for details.
21  *
22  */
23 
24 #include "afsql.h"
25 
26 #include "logqueue.h"
27 #include "template/templates.h"
28 #include "messages.h"
29 #include "string-list.h"
30 #include "str-format.h"
31 #include "seqnum.h"
32 #include "stats/stats-registry.h"
33 #include "apphook.h"
34 #include "mainloop-worker.h"
35 #include "str-utils.h"
36 
37 #include <string.h>
38 #include <errno.h>
39 #include <openssl/md5.h>
40 
41 static gboolean dbi_initialized = FALSE;
42 static const char *s_oracle = "oracle";
43 static const char *s_freetds = "freetds";
44 static dbi_inst dbi_instance;
45 static const gint DEFAULT_SQL_TX_SIZE = 100;
46 
47 #define MAX_FAILED_ATTEMPTS 3
48 
49 void
afsql_dd_add_dbd_option(LogDriver * s,const gchar * name,const gchar * value)50 afsql_dd_add_dbd_option(LogDriver *s, const gchar *name, const gchar *value)
51 {
52   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
53 
54   g_hash_table_insert(self->dbd_options, g_strdup(name), g_strdup(value));
55 }
56 
57 void
afsql_dd_add_dbd_option_numeric(LogDriver * s,const gchar * name,gint value)58 afsql_dd_add_dbd_option_numeric(LogDriver *s, const gchar *name, gint value)
59 {
60   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
61 
62   g_hash_table_insert(self->dbd_options_numeric, g_strdup(name), GINT_TO_POINTER(value));
63 }
64 
65 void
afsql_dd_set_type(LogDriver * s,const gchar * type)66 afsql_dd_set_type(LogDriver *s, const gchar *type)
67 {
68   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
69 
70   g_free(self->type);
71   if (strcmp(type, "mssql") == 0)
72     type = s_freetds;
73   self->type = g_strdup(type);
74 }
75 
76 void
afsql_dd_set_host(LogDriver * s,const gchar * host)77 afsql_dd_set_host(LogDriver *s, const gchar *host)
78 {
79   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
80 
81   g_free(self->host);
82   self->host = g_strdup(host);
83 }
84 
85 gboolean
afsql_dd_check_port(const gchar * port)86 afsql_dd_check_port(const gchar *port)
87 {
88   /* only digits (->numbers) are allowed */
89   int len = strlen(port);
90   for (int i = 0; i < len; ++i)
91     if (port[i] < '0' || port[i] > '9')
92       return FALSE;
93   return TRUE;
94 }
95 
96 void
afsql_dd_set_port(LogDriver * s,const gchar * port)97 afsql_dd_set_port(LogDriver *s, const gchar *port)
98 {
99   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
100 
101   g_free(self->port);
102   self->port = g_strdup(port);
103 }
104 
105 void
afsql_dd_set_user(LogDriver * s,const gchar * user)106 afsql_dd_set_user(LogDriver *s, const gchar *user)
107 {
108   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
109 
110   g_free(self->user);
111   self->user = g_strdup(user);
112 }
113 
114 void
afsql_dd_set_password(LogDriver * s,const gchar * password)115 afsql_dd_set_password(LogDriver *s, const gchar *password)
116 {
117   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
118 
119   g_free(self->password);
120   self->password = g_strdup(password);
121 }
122 
123 void
afsql_dd_set_database(LogDriver * s,const gchar * database)124 afsql_dd_set_database(LogDriver *s, const gchar *database)
125 {
126   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
127 
128   g_free(self->database);
129   self->database = g_strdup(database);
130 }
131 
132 void
afsql_dd_set_table(LogDriver * s,LogTemplate * table_template)133 afsql_dd_set_table(LogDriver *s, LogTemplate *table_template)
134 {
135   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
136 
137   log_template_unref(self->table);
138   self->table = table_template;
139 }
140 
141 void
afsql_dd_set_columns(LogDriver * s,GList * columns)142 afsql_dd_set_columns(LogDriver *s, GList *columns)
143 {
144   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
145 
146   string_list_free(self->columns);
147   self->columns = columns;
148 }
149 
150 void
afsql_dd_set_indexes(LogDriver * s,GList * indexes)151 afsql_dd_set_indexes(LogDriver *s, GList *indexes)
152 {
153   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
154 
155   string_list_free(self->indexes);
156   self->indexes = indexes;
157 }
158 
159 void
afsql_dd_set_values(LogDriver * s,GList * values)160 afsql_dd_set_values(LogDriver *s, GList *values)
161 {
162   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
163 
164   string_list_free(self->values);
165   self->values = values;
166 }
167 
168 void
afsql_dd_set_null_value(LogDriver * s,const gchar * null)169 afsql_dd_set_null_value(LogDriver *s, const gchar *null)
170 {
171   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
172 
173   if (self->null_value)
174     g_free(self->null_value);
175   self->null_value = g_strdup(null);
176 }
177 
178 void
afsql_dd_set_ignore_tns_config(LogDriver * s,const gboolean ignore_tns_config)179 afsql_dd_set_ignore_tns_config(LogDriver *s, const gboolean ignore_tns_config)
180 {
181   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
182 
183   self->ignore_tns_config = ignore_tns_config;
184 }
185 
186 void
afsql_dd_set_session_statements(LogDriver * s,GList * session_statements)187 afsql_dd_set_session_statements(LogDriver *s, GList *session_statements)
188 {
189   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
190 
191   self->session_statements = session_statements;
192 }
193 
194 void
afsql_dd_set_flags(LogDriver * s,gint flags)195 afsql_dd_set_flags(LogDriver *s, gint flags)
196 {
197   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
198 
199   self->flags = flags;
200 }
201 
202 void
afsql_dd_set_create_statement_append(LogDriver * s,const gchar * create_statement_append)203 afsql_dd_set_create_statement_append(LogDriver *s, const gchar *create_statement_append)
204 {
205   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
206 
207   g_free(self->create_statement_append);
208   self->create_statement_append = g_strdup(create_statement_append);
209 }
210 
211 /**
212  * afsql_dd_run_query:
213  *
214  * Run an SQL query on the connected database.
215  *
216  * NOTE: This function can only be called from the database thread.
217  **/
218 static gboolean
afsql_dd_run_query(AFSqlDestDriver * self,const gchar * query,gboolean silent,dbi_result * result)219 afsql_dd_run_query(AFSqlDestDriver *self, const gchar *query, gboolean silent, dbi_result *result)
220 {
221   dbi_result db_res;
222 
223   msg_debug("Running SQL query",
224             evt_tag_str("query", query));
225 
226   db_res = dbi_conn_query(self->dbi_ctx, query);
227   if (!db_res)
228     {
229       const gchar *dbi_error;
230 
231       if (!silent)
232         {
233           dbi_conn_error(self->dbi_ctx, &dbi_error);
234           msg_error("Error running SQL query",
235                     evt_tag_str("type", self->type),
236                     evt_tag_str("host", self->host),
237                     evt_tag_str("port", self->port),
238                     evt_tag_str("user", self->user),
239                     evt_tag_str("database", self->database),
240                     evt_tag_str("error", dbi_error),
241                     evt_tag_str("query", query));
242         }
243       return FALSE;
244     }
245   if (result)
246     *result = db_res;
247   else
248     dbi_result_free(db_res);
249   return TRUE;
250 }
251 
252 /**
253  * afsql_dd_commit_transaction:
254  *
255  * Commit SQL transaction.
256  *
257  * NOTE: This function can only be called from the database thread.
258  **/
259 static gboolean
afsql_dd_commit_transaction(AFSqlDestDriver * self)260 afsql_dd_commit_transaction(AFSqlDestDriver *self)
261 {
262   gboolean success;
263 
264   if (!self->transaction_active)
265     return TRUE;
266 
267   success = afsql_dd_run_query(self, "COMMIT", FALSE, NULL);
268   if (success)
269     {
270       self->transaction_active = FALSE;
271     }
272   else
273     {
274       msg_error("SQL transaction commit failed, rewinding backlog and starting again");
275     }
276   return success;
277 }
278 
279 /**
280  * afsql_dd_begin_transaction:
281  *
282  * Begin SQL transaction.
283  *
284  * NOTE: This function can only be called from the database thread.
285  **/
286 static gboolean
afsql_dd_begin_transaction(AFSqlDestDriver * self)287 afsql_dd_begin_transaction(AFSqlDestDriver *self)
288 {
289   gboolean success = TRUE;
290   const char *s_begin = "BEGIN";
291   if (!strcmp(self->type, s_freetds))
292     {
293       /* the mssql requires this command */
294       s_begin = "BEGIN TRANSACTION";
295     }
296 
297   if (strcmp(self->type, s_oracle) != 0)
298     {
299       /* oracle db has no BEGIN TRANSACTION command, it implicitly starts one, after every commit. */
300       success = afsql_dd_run_query(self, s_begin, FALSE, NULL);
301     }
302 
303   self->transaction_active = success;
304 
305   return success;
306 }
307 
308 static gboolean
afsql_dd_rollback_transaction(AFSqlDestDriver * self)309 afsql_dd_rollback_transaction(AFSqlDestDriver *self)
310 {
311   if (!self->transaction_active)
312     return TRUE;
313 
314   self->transaction_active = FALSE;
315 
316   return afsql_dd_run_query(self, "ROLLBACK", FALSE, NULL);
317 }
318 
319 static gboolean
afsql_dd_begin_new_transaction(AFSqlDestDriver * self)320 afsql_dd_begin_new_transaction(AFSqlDestDriver *self)
321 {
322   if (self->transaction_active)
323     {
324       if (!afsql_dd_commit_transaction(self))
325         {
326           afsql_dd_rollback_transaction(self);
327           return FALSE;
328         }
329     }
330 
331   return afsql_dd_begin_transaction(self);
332 }
333 
_sql_identifier_is_valid_char(gchar c)334 static gboolean _sql_identifier_is_valid_char(gchar c)
335 {
336   return ((c == '.') ||
337           (c == '_') ||
338           (c >= '0' && c <= '9') ||
339           (g_ascii_tolower(c) >= 'a' && g_ascii_tolower(c) <= 'z'));
340 }
341 
342 static gboolean
_is_sql_identifier_sanitized(const gchar * token)343 _is_sql_identifier_sanitized(const gchar *token)
344 {
345   gint i;
346 
347   for (i = 0; token[i]; i++)
348     {
349       if (!_sql_identifier_is_valid_char(token[i]))
350         return FALSE;
351     }
352 
353   return TRUE;
354 }
355 
356 static void
_sanitize_sql_identifier(gchar * token)357 _sanitize_sql_identifier(gchar *token)
358 {
359   gint i;
360 
361   for (i = 0; token[i]; i++)
362     {
363       if (!_sql_identifier_is_valid_char(token[i]))
364         token[i] = '_';
365     }
366 }
367 
368 /**
369  * afsql_dd_create_index:
370  *
371  * This function creates an index for the column specified and returns
372  * TRUE to indicate success.
373  *
374  * NOTE: This function can only be called from the database thread.
375  **/
376 static gboolean
afsql_dd_create_index(AFSqlDestDriver * self,const gchar * table,const gchar * column)377 afsql_dd_create_index(AFSqlDestDriver *self, const gchar *table, const gchar *column)
378 {
379   GString *query_string;
380   gboolean success = TRUE;
381 
382   query_string = g_string_sized_new(64);
383 
384   if (strcmp(self->type, s_oracle) == 0)
385     {
386       /* NOTE: oracle index indentifier length is max 30 characters
387        * so we use the first 30 characters of the table_column md5 hash */
388       if ((strlen(table) + strlen(column)) > 25)
389         {
390 
391           guchar hash[MD5_DIGEST_LENGTH];
392           gchar hash_str[31];
393           gchar *cat = g_strjoin("_", table, column, NULL);
394 
395           MD5((guchar *)cat, strlen(cat), hash);
396           g_free(cat);
397 
398           format_hex_string(hash, sizeof(hash), hash_str, sizeof(hash_str));
399           hash_str[0] = 'i';
400           g_string_printf(query_string, "CREATE INDEX %s ON %s (%s)",
401                           hash_str, table, column);
402         }
403       else
404         g_string_printf(query_string, "CREATE INDEX %s_%s_idx ON %s (%s)",
405                         table, column, table, column);
406     }
407   else
408     g_string_printf(query_string, "CREATE INDEX %s_%s_idx ON %s (%s)",
409                     table, column, table, column);
410   if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
411     {
412       msg_error("Error adding missing index",
413                 evt_tag_str("table", table),
414                 evt_tag_str("column", column));
415       success = FALSE;
416     }
417   g_string_free(query_string, TRUE);
418   return success;
419 }
420 
421 static inline gboolean
_is_table_syslogng_conform(AFSqlDestDriver * self,const gchar * table)422 _is_table_syslogng_conform(AFSqlDestDriver *self, const gchar *table)
423 {
424   return (g_hash_table_lookup(self->syslogng_conform_tables, table) != NULL);
425 }
426 
427 static inline void
_remember_table_as_syslogng_conform(AFSqlDestDriver * self,const gchar * table)428 _remember_table_as_syslogng_conform(AFSqlDestDriver *self, const gchar *table)
429 {
430   g_hash_table_insert(self->syslogng_conform_tables, g_strdup(table), GUINT_TO_POINTER(TRUE));
431 }
432 
433 static gboolean
_is_table_present(AFSqlDestDriver * self,const gchar * table,dbi_result * metadata)434 _is_table_present(AFSqlDestDriver *self, const gchar *table, dbi_result *metadata)
435 {
436   gboolean res = FALSE;
437   GString *query_string;
438 
439   if (!afsql_dd_begin_new_transaction(self))
440     {
441       msg_error("Starting new transaction has failed");
442 
443       return FALSE;
444     }
445 
446   query_string = g_string_sized_new(32);
447   g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table);
448   res = afsql_dd_run_query(self, query_string->str, TRUE, metadata);
449   g_string_free(query_string, TRUE);
450 
451   afsql_dd_commit_transaction(self);
452 
453   return res;
454 }
455 
456 static gboolean
_ensure_table_is_syslogng_conform(AFSqlDestDriver * self,dbi_result db_res,const gchar * table)457 _ensure_table_is_syslogng_conform(AFSqlDestDriver *self, dbi_result db_res, const gchar *table)
458 {
459   gboolean success = TRUE;
460   gboolean new_transaction_started = FALSE;
461   gint i;
462   GString *query_string = g_string_sized_new(32);
463 
464   for (i = 0; success && (i < self->fields_len); i++)
465     {
466       if (dbi_result_get_field_idx(db_res, self->fields[i].name) == 0)
467         {
468           GList *l;
469           if (!new_transaction_started)
470             {
471               if (!afsql_dd_begin_new_transaction(self))
472                 {
473                   msg_error("Starting new transaction for modifying(ALTER) table has failed",
474                             evt_tag_str("table", table));
475                   success = FALSE;
476                   break;
477                 }
478               new_transaction_started = TRUE;
479             }
480           /* field does not exist, add this column */
481           g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type);
482           if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
483             {
484               msg_error("Error adding missing column, giving up",
485                         evt_tag_str("table", table),
486                         evt_tag_str("column", self->fields[i].name));
487               success = FALSE;
488               break;
489             }
490           for (l = self->indexes; l; l = l->next)
491             {
492               if (strcmp((gchar *) l->data, self->fields[i].name) == 0)
493                 {
494                   /* this is an indexed column, create index */
495                   afsql_dd_create_index(self, table, self->fields[i].name);
496                 }
497             }
498         }
499     }
500 
501   if (new_transaction_started && ( !success || !afsql_dd_commit_transaction(self)))
502     {
503       afsql_dd_rollback_transaction(self);
504       success = FALSE;
505     }
506 
507   g_string_free(query_string, TRUE);
508 
509   return success;
510 }
511 
512 static gboolean
_table_create_indexes(AFSqlDestDriver * self,const gchar * table)513 _table_create_indexes(AFSqlDestDriver *self, const gchar *table)
514 {
515   gboolean success = TRUE;
516   GList *l;
517 
518   if (!afsql_dd_begin_new_transaction(self))
519     {
520       msg_error("Starting new transaction for table creation has failed",
521                 evt_tag_str("table", table));
522       return FALSE;
523     }
524 
525   for (l = self->indexes; l && success; l = l->next)
526     {
527       success = afsql_dd_create_index(self, table, (gchar *) l->data);
528     }
529 
530   if (!success || !afsql_dd_commit_transaction(self))
531     {
532       afsql_dd_rollback_transaction(self);
533     }
534 
535   return success;
536 }
537 
538 static gboolean
_table_create(AFSqlDestDriver * self,const gchar * table)539 _table_create(AFSqlDestDriver *self, const gchar *table)
540 {
541   gint i;
542   GString *query_string = g_string_sized_new(32);
543   gboolean success = FALSE;
544   if (!afsql_dd_begin_new_transaction(self))
545     {
546       msg_error("Starting new transaction for table creation has failed",
547                 evt_tag_str("table", table));
548       return FALSE;
549     }
550 
551   g_string_printf(query_string, "CREATE TABLE %s (", table);
552   for (i = 0; i < self->fields_len; i++)
553     {
554       g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type);
555       if (i != self->fields_len - 1)
556         g_string_append(query_string, ", ");
557     }
558   g_string_append(query_string, ")");
559   if (self->create_statement_append)
560     g_string_append(query_string, self->create_statement_append);
561   if (afsql_dd_run_query(self, query_string->str, FALSE, NULL))
562     {
563       success = TRUE;
564     }
565   else
566     {
567       msg_error("Error creating table, giving up",
568                 evt_tag_str("table", table));
569     }
570 
571   if (!success || !afsql_dd_commit_transaction(self))
572     {
573       afsql_dd_rollback_transaction(self);
574     }
575 
576   g_string_free(query_string, TRUE);
577 
578   return success;
579 }
580 
581 /**
582  * afsql_dd_validate_table:
583  *
584  * Check if the given table exists in the database. If it doesn't
585  * create it, if it does, check if all the required fields are
586  * present and create them if they don't.
587  *
588  * NOTE: This function can only be called from the database thread.
589  **/
590 static gboolean
afsql_dd_ensure_table_is_syslogng_conform(AFSqlDestDriver * self,GString * table)591 afsql_dd_ensure_table_is_syslogng_conform(AFSqlDestDriver *self, GString *table)
592 {
593   dbi_result db_res = NULL;
594   gboolean success = FALSE;
595 
596   if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
597     return TRUE;
598 
599   _sanitize_sql_identifier(table->str);
600 
601   if (_is_table_syslogng_conform(self, table->str))
602     return TRUE;
603 
604   if (_is_table_present(self, table->str, &db_res))
605     {
606       /* table exists, check structure */
607       success = _ensure_table_is_syslogng_conform(self, db_res, table->str);
608       if (db_res)
609         dbi_result_free(db_res);
610     }
611   else
612     {
613       /* table does not exist, create it */
614       success = _table_create(self, table->str) && _table_create_indexes(self, table->str);
615     }
616 
617   if (success)
618     {
619       /* we have successfully created/altered the destination table, record this information */
620       _remember_table_as_syslogng_conform(self, table->str);
621     }
622 
623   return success;
624 }
625 
626 static void
afsql_dd_set_dbd_opt(gpointer key,gpointer value,gpointer user_data)627 afsql_dd_set_dbd_opt(gpointer key, gpointer value, gpointer user_data)
628 {
629   dbi_conn_set_option((dbi_conn)user_data, (gchar *)key, (gchar *)value);
630 }
631 
632 static void
afsql_dd_set_dbd_opt_numeric(gpointer key,gpointer value,gpointer user_data)633 afsql_dd_set_dbd_opt_numeric(gpointer key, gpointer value, gpointer user_data)
634 {
635   dbi_conn_set_option_numeric((dbi_conn)user_data, (gchar *)key,
636                               GPOINTER_TO_INT(value));
637 }
638 
639 /*
640  * NOTE: there's a bug in libdbd-sqlite3 and this function basically works
641  * that around.  The issue is that the database path cannot be an empty
642  * string as it causes an invalid read (it is using -1 as an index in that
643  * case).  What we do is that if database is a fully specified path to a
644  * filename (e.g.  starts with a slash), we use another slash, so it remains
645  * a root-relative filename.  Otherwise, we simply use the current
646  * directory.
647  */
648 static const gchar *
_sqlite_db_dir(const gchar * database,gchar * buf,gsize buflen)649 _sqlite_db_dir(const gchar *database, gchar *buf, gsize buflen)
650 {
651   if (database[0] == '/')
652     return strncpy(buf, "/", buflen);
653   else
654     return getcwd(buf, buflen);
655   return buf;
656 }
657 
658 static void
_enable_database_specific_hacks(AFSqlDestDriver * self)659 _enable_database_specific_hacks(AFSqlDestDriver *self)
660 {
661   gchar buf[1024];
662 
663   if (strcmp(self->type, "sqlite") == 0)
664     dbi_conn_set_option(self->dbi_ctx, "sqlite_dbdir", _sqlite_db_dir(self->database, buf, sizeof(buf)));
665   else if (strcmp(self->type, "sqlite3") == 0)
666     dbi_conn_set_option(self->dbi_ctx, "sqlite3_dbdir", _sqlite_db_dir(self->database, buf, sizeof(buf)));
667   else if (strcmp(self->type, s_oracle) == 0)
668     dbi_conn_set_option_numeric(self->dbi_ctx, "oracle_ignore_tns_config", self->ignore_tns_config);
669 }
670 
671 static gboolean
afsql_dd_connect(LogThreadedDestDriver * s)672 afsql_dd_connect(LogThreadedDestDriver *s)
673 {
674   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
675 
676   self->dbi_ctx = dbi_conn_new_r(self->type, dbi_instance);
677 
678   if (!self->dbi_ctx)
679     {
680       msg_error("No such DBI driver",
681                 evt_tag_str("type", self->type));
682       return FALSE;
683     }
684 
685   dbi_conn_set_option(self->dbi_ctx, "host", self->host);
686 
687   if (strcmp(self->type, "mysql"))
688     dbi_conn_set_option(self->dbi_ctx, "port", self->port);
689   else
690     dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port));
691 
692   dbi_conn_set_option(self->dbi_ctx, "username", self->user);
693   dbi_conn_set_option(self->dbi_ctx, "password", self->password);
694   dbi_conn_set_option(self->dbi_ctx, "dbname", self->database);
695   dbi_conn_set_option(self->dbi_ctx, "encoding", self->encoding);
696   dbi_conn_set_option(self->dbi_ctx, "auto-commit", self->flags & AFSQL_DDF_EXPLICIT_COMMITS ? "false" : "true");
697 
698   _enable_database_specific_hacks(self);
699 
700   /* Set user-specified options */
701   g_hash_table_foreach(self->dbd_options, afsql_dd_set_dbd_opt, self->dbi_ctx);
702   g_hash_table_foreach(self->dbd_options_numeric, afsql_dd_set_dbd_opt_numeric, self->dbi_ctx);
703 
704   if (dbi_conn_connect(self->dbi_ctx) < 0)
705     {
706       const gchar *dbi_error;
707 
708       dbi_conn_error(self->dbi_ctx, &dbi_error);
709 
710       msg_error("Error establishing SQL connection",
711                 evt_tag_str("type", self->type),
712                 evt_tag_str("host", self->host),
713                 evt_tag_str("port", self->port),
714                 evt_tag_str("username", self->user),
715                 evt_tag_str("database", self->database),
716                 evt_tag_str("error", dbi_error));
717 
718       return FALSE;
719     }
720 
721   if (self->session_statements != NULL)
722     {
723       GList *l;
724 
725       for (l = self->session_statements; l; l = l->next)
726         {
727           if (!afsql_dd_run_query(self, (gchar *) l->data, FALSE, NULL))
728             {
729               msg_error("Error executing SQL connection statement",
730                         evt_tag_str("statement", (gchar *) l->data));
731 
732               return FALSE;
733             }
734         }
735     }
736 
737   return TRUE;
738 }
739 
740 static void
afsql_dd_disconnect(LogThreadedDestDriver * s)741 afsql_dd_disconnect(LogThreadedDestDriver *s)
742 {
743   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
744 
745   dbi_conn_close(self->dbi_ctx);
746   self->dbi_ctx = NULL;
747 }
748 
749 static GString *
afsql_dd_ensure_accessible_database_table(AFSqlDestDriver * self,LogMessage * msg)750 afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg)
751 {
752   GString *table = g_string_sized_new(32);
753 
754   LogTemplateEvalOptions options = {&self->template_options, LTZ_LOCAL, 0, NULL};
755   log_template_format(self->table, msg, &options, table);
756 
757   if (!afsql_dd_ensure_table_is_syslogng_conform(self, table))
758     {
759       /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
760       msg_error("Error checking table, disconnecting from database, trying again shortly",
761                 evt_tag_int("time_reopen", self->super.time_reopen));
762       g_string_free(table, TRUE);
763       return NULL;
764     }
765 
766   return table;
767 }
768 
769 static GString *
afsql_dd_build_insert_command(AFSqlDestDriver * self,LogMessage * msg,GString * table)770 afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table)
771 {
772   GString *insert_command = g_string_sized_new(256);
773   GString *value = g_string_sized_new(512);
774   gint i, j;
775 
776   g_string_printf(insert_command, "INSERT INTO %s (", table->str);
777 
778   for (i = 0; i < self->fields_len; i++)
779     {
780       if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
781         {
782           g_string_append(insert_command, self->fields[i].name);
783 
784           j = i + 1;
785           while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
786             j++;
787 
788           if (j < self->fields_len)
789             g_string_append(insert_command, ", ");
790         }
791     }
792 
793   g_string_append(insert_command, ") VALUES (");
794 
795   for (i = 0; i < self->fields_len; i++)
796     {
797       gchar *quoted;
798 
799       if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
800         {
801           LogTemplateEvalOptions options = {&self->template_options, LTZ_SEND, self->super.worker.instance.seq_num, NULL};
802           log_template_format(self->fields[i].value, msg, &options, value);
803           if (self->null_value && strcmp(self->null_value, value->str) == 0)
804             {
805               g_string_append(insert_command, "NULL");
806             }
807           else
808             {
809               dbi_conn_quote_string_copy(self->dbi_ctx, value->str, &quoted);
810               if (quoted)
811                 {
812                   g_string_append(insert_command, quoted);
813                   free(quoted);
814                 }
815               else
816                 {
817                   g_string_append(insert_command, "''");
818                 }
819             }
820 
821           j = i + 1;
822           while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
823             j++;
824           if (j < self->fields_len)
825             g_string_append(insert_command, ", ");
826         }
827     }
828 
829   g_string_append(insert_command, ")");
830 
831   g_string_free(value, TRUE);
832 
833   return insert_command;
834 }
835 
836 static inline gboolean
afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver * self)837 afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self)
838 {
839   return !!(self->flags & AFSQL_DDF_EXPLICIT_COMMITS);
840 }
841 
842 static inline gboolean
afsql_dd_should_begin_new_transaction(const AFSqlDestDriver * self)843 afsql_dd_should_begin_new_transaction(const AFSqlDestDriver *self)
844 {
845   return afsql_dd_is_transaction_handling_enabled(self) && self->super.worker.instance.batch_size == 1;
846 }
847 
848 static gint
_batch_lines(const AFSqlDestDriver * self)849 _batch_lines(const AFSqlDestDriver *self)
850 {
851   if (self->super.batch_lines <= 0)
852     return DEFAULT_SQL_TX_SIZE;
853 
854   return self->super.batch_lines;
855 }
856 
857 static LogThreadedResult
afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver * self)858 afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self)
859 {
860   const gchar *dbi_error, *error_message;
861 
862   if (dbi_conn_ping(self->dbi_ctx) == 1)
863     {
864       return LTR_ERROR;
865     }
866 
867   if (afsql_dd_is_transaction_handling_enabled(self))
868     {
869       error_message = "SQL connection lost in the middle of a transaction,"
870                       " rewinding backlog and starting again";
871     }
872   else
873     {
874       error_message = "Error, no SQL connection after failed query attempt";
875     }
876 
877   dbi_conn_error(self->dbi_ctx, &dbi_error);
878   msg_error(error_message,
879             evt_tag_str("type", self->type),
880             evt_tag_str("host", self->host),
881             evt_tag_str("port", self->port),
882             evt_tag_str("username", self->user),
883             evt_tag_str("database", self->database),
884             evt_tag_str("error", dbi_error));
885 
886   return LTR_ERROR;
887 }
888 
889 static LogThreadedResult
afsql_dd_flush(LogThreadedDestDriver * s)890 afsql_dd_flush(LogThreadedDestDriver *s)
891 {
892   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
893 
894   if (!afsql_dd_is_transaction_handling_enabled(self))
895     return LTR_SUCCESS;
896 
897   if (!afsql_dd_commit_transaction(self))
898     {
899       /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_transaction() */
900       afsql_dd_rollback_transaction(self);
901       return LTR_ERROR;
902     }
903   return LTR_SUCCESS;
904 }
905 
906 static gboolean
afsql_dd_run_insert_query(AFSqlDestDriver * self,GString * table,LogMessage * msg)907 afsql_dd_run_insert_query(AFSqlDestDriver *self, GString *table, LogMessage *msg)
908 {
909   GString *insert_command;
910 
911   insert_command = afsql_dd_build_insert_command(self, msg, table);
912   gboolean success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL);
913   g_string_free(insert_command, TRUE);
914   return success;
915 }
916 
917 /**
918  * afsql_dd_insert_db:
919  *
920  * This function is running in the database thread
921  *
922  * Returns: FALSE to indicate that the connection should be closed and
923  * this destination suspended for time_reopen() time.
924  **/
925 static LogThreadedResult
afsql_dd_insert(LogThreadedDestDriver * s,LogMessage * msg)926 afsql_dd_insert(LogThreadedDestDriver *s, LogMessage *msg)
927 {
928   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
929   GString *table = NULL;
930   LogThreadedResult retval = LTR_ERROR;
931 
932   table = afsql_dd_ensure_accessible_database_table(self, msg);
933   if (!table)
934     goto error;
935 
936   if (afsql_dd_should_begin_new_transaction(self) && !afsql_dd_begin_transaction(self))
937     goto error;
938 
939   if (!afsql_dd_run_insert_query(self, table, msg))
940     {
941       retval = afsql_dd_handle_insert_row_error_depending_on_connection_availability(self);
942       goto error;
943     }
944 
945   retval = afsql_dd_is_transaction_handling_enabled(self)
946            ? LTR_QUEUED
947            : LTR_SUCCESS;
948 
949 error:
950 
951   if (table != NULL)
952     g_string_free(table, TRUE);
953 
954   return retval;
955 }
956 
957 static const gchar *
afsql_dd_format_stats_instance(LogThreadedDestDriver * s)958 afsql_dd_format_stats_instance(LogThreadedDestDriver *s)
959 {
960   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
961   static gchar persist_name[64];
962 
963   g_snprintf(persist_name, sizeof(persist_name),
964              "%s,%s,%s,%s,%s",
965              self->type, self->host, self->port, self->database, self->table->template);
966   return persist_name;
967 }
968 
969 static const gchar *
afsql_dd_format_persist_name(const LogPipe * s)970 afsql_dd_format_persist_name(const LogPipe *s)
971 {
972   AFSqlDestDriver *self = (AFSqlDestDriver *)s;
973   static gchar persist_name[256];
974 
975   if (s->persist_name)
976     g_snprintf(persist_name, sizeof(persist_name), "afsql_dd.%s", s->persist_name);
977   else
978     g_snprintf(persist_name, sizeof(persist_name), "afsql_dd(%s,%s,%s,%s,%s)", self->type,
979                self->host, self->port, self->database, self->table->template);
980 
981   return persist_name;
982 }
983 
984 static const gchar *
_afsql_dd_format_legacy_persist_name(const AFSqlDestDriver * self)985 _afsql_dd_format_legacy_persist_name(const AFSqlDestDriver *self)
986 {
987   static gchar legacy_persist_name[256];
988 
989   g_snprintf(legacy_persist_name, sizeof(legacy_persist_name),
990              "afsql_dd_qfile(%s,%s,%s,%s,%s)",
991              self->type, self->host, self->port, self->database, self->table->template);
992 
993   return legacy_persist_name;
994 }
995 
996 static gboolean
_update_legacy_persist_name_if_exists(AFSqlDestDriver * self)997 _update_legacy_persist_name_if_exists(AFSqlDestDriver *self)
998 {
999   GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
1000   const gchar *current_persist_name = afsql_dd_format_persist_name(&self->super.super.super.super);
1001   const gchar *legacy_persist_name = _afsql_dd_format_legacy_persist_name(self);
1002 
1003   if (persist_state_entry_exists(cfg->state, current_persist_name))
1004     return TRUE;
1005 
1006   if (!persist_state_entry_exists(cfg->state, legacy_persist_name))
1007     return TRUE;
1008 
1009   return persist_state_move_entry(cfg->state, legacy_persist_name, current_persist_name);
1010 }
1011 
1012 static gboolean
_init_fields_from_columns_and_values(AFSqlDestDriver * self)1013 _init_fields_from_columns_and_values(AFSqlDestDriver *self)
1014 {
1015   GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
1016   GList *col, *value;
1017   gint len_cols, len_values;
1018   gint i;
1019 
1020   if (self->fields)
1021     return TRUE;
1022 
1023   len_cols = g_list_length(self->columns);
1024   len_values = g_list_length(self->values);
1025   if (len_cols != len_values)
1026     {
1027       msg_error("The number of columns and values do not match",
1028                 evt_tag_int("len_columns", len_cols),
1029                 evt_tag_int("len_values", len_values));
1030       return FALSE;
1031     }
1032   self->fields_len = len_cols;
1033   self->fields = g_new0(AFSqlField, len_cols);
1034 
1035   for (i = 0, col = self->columns, value = self->values; col && value; i++, col = col->next, value = value->next)
1036     {
1037       gchar *space;
1038 
1039       space = strchr(col->data, ' ');
1040       if (space)
1041         {
1042           self->fields[i].name = g_strndup(col->data, space - (gchar *) col->data);
1043           while (*space == ' ')
1044             space++;
1045           if (*space != '\0')
1046             self->fields[i].type = g_strdup(space);
1047           else
1048             self->fields[i].type = g_strdup("text");
1049         }
1050       else
1051         {
1052           self->fields[i].name = g_strdup(col->data);
1053           self->fields[i].type = g_strdup("text");
1054         }
1055       if (!_is_sql_identifier_sanitized(self->fields[i].name))
1056         {
1057           msg_error("Column name is not a proper SQL name",
1058                     evt_tag_str("column", self->fields[i].name));
1059           return FALSE;
1060         }
1061 
1062       if (GPOINTER_TO_UINT(value->data) > 4096)
1063         {
1064           self->fields[i].value = log_template_new(cfg, NULL);
1065           log_template_compile(self->fields[i].value, (gchar *) value->data, NULL);
1066         }
1067       else
1068         {
1069           switch (GPOINTER_TO_UINT(value->data))
1070             {
1071             case AFSQL_COLUMN_DEFAULT:
1072               self->fields[i].flags |= AFSQL_FF_DEFAULT;
1073               break;
1074             default:
1075               g_assert_not_reached();
1076               break;
1077             }
1078         }
1079     }
1080   return TRUE;
1081 }
1082 
1083 static gboolean
_initialize_dbi(void)1084 _initialize_dbi(void)
1085 {
1086   if (!dbi_initialized)
1087     {
1088       errno = 0;
1089       gint rc = dbi_initialize_r(NULL, &dbi_instance);
1090 
1091       if (rc < 0)
1092         {
1093           /* NOTE: errno might be unreliable, but that's all we have */
1094           msg_error("Unable to initialize database access (DBI)",
1095                     evt_tag_int("rc", rc),
1096                     evt_tag_error("error"));
1097           return FALSE;
1098         }
1099       else if (rc == 0)
1100         {
1101           msg_error("The database access library (DBI) reports no usable SQL drivers, perhaps DBI drivers are not installed properly");
1102           return FALSE;
1103         }
1104       else
1105         {
1106           dbi_initialized = TRUE;
1107         }
1108     }
1109   return TRUE;
1110 }
1111 
1112 static gboolean
afsql_dd_init(LogPipe * s)1113 afsql_dd_init(LogPipe *s)
1114 {
1115   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
1116   GlobalConfig *cfg = log_pipe_get_config(s);
1117 
1118   if (!_update_legacy_persist_name_if_exists(self))
1119     return FALSE;
1120   if (!_initialize_dbi())
1121     return FALSE;
1122 
1123   if (!self->columns || !self->values)
1124     {
1125       msg_error("Default columns and values must be specified for database destinations",
1126                 evt_tag_str("type", self->type));
1127       return FALSE;
1128     }
1129 
1130   if (self->ignore_tns_config && strcmp(self->type, s_oracle) != 0)
1131     {
1132       msg_warning("WARNING: Option ignore_tns_config was skipped because database type is not Oracle",
1133                   evt_tag_str("type", self->type));
1134     }
1135 
1136   if (!_init_fields_from_columns_and_values(self))
1137     return FALSE;
1138 
1139   if (!log_threaded_dest_driver_init_method(s))
1140     return FALSE;
1141 
1142   log_template_options_init(&self->template_options, cfg);
1143 
1144   if (afsql_dd_is_transaction_handling_enabled(self))
1145     log_threaded_dest_driver_set_batch_lines((LogDriver *)self, _batch_lines(self));
1146 
1147   return TRUE;
1148 }
1149 
1150 static void
afsql_dd_free(LogPipe * s)1151 afsql_dd_free(LogPipe *s)
1152 {
1153   AFSqlDestDriver *self = (AFSqlDestDriver *) s;
1154   gint i;
1155 
1156   log_template_options_destroy(&self->template_options);
1157   for (i = 0; i < self->fields_len; i++)
1158     {
1159       g_free(self->fields[i].name);
1160       g_free(self->fields[i].type);
1161       log_template_unref(self->fields[i].value);
1162     }
1163 
1164   g_free(self->fields);
1165   g_free(self->type);
1166   g_free(self->host);
1167   g_free(self->port);
1168   g_free(self->user);
1169   g_free(self->password);
1170   g_free(self->database);
1171   g_free(self->encoding);
1172   g_free(self->create_statement_append);
1173   if (self->null_value)
1174     g_free(self->null_value);
1175   string_list_free(self->columns);
1176   string_list_free(self->indexes);
1177   string_list_free(self->values);
1178   log_template_unref(self->table);
1179   g_hash_table_destroy(self->syslogng_conform_tables);
1180   g_hash_table_destroy(self->dbd_options);
1181   g_hash_table_destroy(self->dbd_options_numeric);
1182   if (self->session_statements)
1183     string_list_free(self->session_statements);
1184   log_threaded_dest_driver_free(s);
1185 }
1186 
1187 LogDriver *
afsql_dd_new(GlobalConfig * cfg)1188 afsql_dd_new(GlobalConfig *cfg)
1189 {
1190   AFSqlDestDriver *self = g_new0(AFSqlDestDriver, 1);
1191 
1192   log_threaded_dest_driver_init_instance(&self->super, cfg);
1193 
1194   self->super.super.super.super.init = afsql_dd_init;
1195   self->super.super.super.super.free_fn = afsql_dd_free;
1196   self->super.super.super.super.generate_persist_name = afsql_dd_format_persist_name;
1197   self->super.format_stats_instance = afsql_dd_format_stats_instance;
1198   self->super.worker.connect = afsql_dd_connect;
1199   self->super.worker.disconnect = afsql_dd_disconnect;
1200   self->super.worker.insert = afsql_dd_insert;
1201   self->super.worker.flush = afsql_dd_flush;
1202 
1203   self->type = g_strdup("mysql");
1204   self->host = g_strdup("");
1205   self->port = g_strdup("");
1206   self->user = g_strdup("syslog-ng");
1207   self->password = g_strdup("");
1208   self->database = g_strdup("logs");
1209   self->encoding = g_strdup("UTF-8");
1210   self->transaction_active = FALSE;
1211   self->ignore_tns_config = FALSE;
1212 
1213   self->table = log_template_new(configuration, NULL);
1214   log_template_compile_literal_string(self->table, "messages");
1215   self->failed_message_counter = 0;
1216 
1217   self->session_statements = NULL;
1218 
1219   self->syslogng_conform_tables = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
1220   self->dbd_options = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
1221   self->dbd_options_numeric = g_hash_table_new_full(g_str_hash, g_int_equal, g_free, NULL);
1222 
1223   log_template_options_defaults(&self->template_options);
1224   self->super.stats_source = stats_register_type("sql");
1225 
1226   return &self->super.super.super;
1227 }
1228 
1229 gint
afsql_dd_lookup_flag(const gchar * flag)1230 afsql_dd_lookup_flag(const gchar *flag)
1231 {
1232   if (strcmp(flag, "explicit-commits") == 0)
1233     return AFSQL_DDF_EXPLICIT_COMMITS;
1234   else if (strcmp(flag, "dont-create-tables") == 0)
1235     return AFSQL_DDF_DONT_CREATE_TABLES;
1236   else
1237     msg_warning("Unknown SQL flag",
1238                 evt_tag_str("flag", flag));
1239   return 0;
1240 }
1241