1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * As an additional exemption you are allowed to compile & link against the
19 * OpenSSL libraries as published by the OpenSSL project. See the file
20 * COPYING for details.
21 *
22 */
23
24 #include "afsql.h"
25
26 #include "logqueue.h"
27 #include "template/templates.h"
28 #include "messages.h"
29 #include "string-list.h"
30 #include "str-format.h"
31 #include "seqnum.h"
32 #include "stats/stats-registry.h"
33 #include "apphook.h"
34 #include "mainloop-worker.h"
35 #include "str-utils.h"
36
37 #include <string.h>
38 #include <errno.h>
39 #include <openssl/md5.h>
40
41 static gboolean dbi_initialized = FALSE;
42 static const char *s_oracle = "oracle";
43 static const char *s_freetds = "freetds";
44 static dbi_inst dbi_instance;
45 static const gint DEFAULT_SQL_TX_SIZE = 100;
46
47 #define MAX_FAILED_ATTEMPTS 3
48
49 void
afsql_dd_add_dbd_option(LogDriver * s,const gchar * name,const gchar * value)50 afsql_dd_add_dbd_option(LogDriver *s, const gchar *name, const gchar *value)
51 {
52 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
53
54 g_hash_table_insert(self->dbd_options, g_strdup(name), g_strdup(value));
55 }
56
57 void
afsql_dd_add_dbd_option_numeric(LogDriver * s,const gchar * name,gint value)58 afsql_dd_add_dbd_option_numeric(LogDriver *s, const gchar *name, gint value)
59 {
60 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
61
62 g_hash_table_insert(self->dbd_options_numeric, g_strdup(name), GINT_TO_POINTER(value));
63 }
64
65 void
afsql_dd_set_type(LogDriver * s,const gchar * type)66 afsql_dd_set_type(LogDriver *s, const gchar *type)
67 {
68 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
69
70 g_free(self->type);
71 if (strcmp(type, "mssql") == 0)
72 type = s_freetds;
73 self->type = g_strdup(type);
74 }
75
76 void
afsql_dd_set_host(LogDriver * s,const gchar * host)77 afsql_dd_set_host(LogDriver *s, const gchar *host)
78 {
79 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
80
81 g_free(self->host);
82 self->host = g_strdup(host);
83 }
84
85 gboolean
afsql_dd_check_port(const gchar * port)86 afsql_dd_check_port(const gchar *port)
87 {
88 /* only digits (->numbers) are allowed */
89 int len = strlen(port);
90 for (int i = 0; i < len; ++i)
91 if (port[i] < '0' || port[i] > '9')
92 return FALSE;
93 return TRUE;
94 }
95
96 void
afsql_dd_set_port(LogDriver * s,const gchar * port)97 afsql_dd_set_port(LogDriver *s, const gchar *port)
98 {
99 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
100
101 g_free(self->port);
102 self->port = g_strdup(port);
103 }
104
105 void
afsql_dd_set_user(LogDriver * s,const gchar * user)106 afsql_dd_set_user(LogDriver *s, const gchar *user)
107 {
108 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
109
110 g_free(self->user);
111 self->user = g_strdup(user);
112 }
113
114 void
afsql_dd_set_password(LogDriver * s,const gchar * password)115 afsql_dd_set_password(LogDriver *s, const gchar *password)
116 {
117 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
118
119 g_free(self->password);
120 self->password = g_strdup(password);
121 }
122
123 void
afsql_dd_set_database(LogDriver * s,const gchar * database)124 afsql_dd_set_database(LogDriver *s, const gchar *database)
125 {
126 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
127
128 g_free(self->database);
129 self->database = g_strdup(database);
130 }
131
132 void
afsql_dd_set_table(LogDriver * s,LogTemplate * table_template)133 afsql_dd_set_table(LogDriver *s, LogTemplate *table_template)
134 {
135 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
136
137 log_template_unref(self->table);
138 self->table = table_template;
139 }
140
141 void
afsql_dd_set_columns(LogDriver * s,GList * columns)142 afsql_dd_set_columns(LogDriver *s, GList *columns)
143 {
144 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
145
146 string_list_free(self->columns);
147 self->columns = columns;
148 }
149
150 void
afsql_dd_set_indexes(LogDriver * s,GList * indexes)151 afsql_dd_set_indexes(LogDriver *s, GList *indexes)
152 {
153 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
154
155 string_list_free(self->indexes);
156 self->indexes = indexes;
157 }
158
159 void
afsql_dd_set_values(LogDriver * s,GList * values)160 afsql_dd_set_values(LogDriver *s, GList *values)
161 {
162 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
163
164 string_list_free(self->values);
165 self->values = values;
166 }
167
168 void
afsql_dd_set_null_value(LogDriver * s,const gchar * null)169 afsql_dd_set_null_value(LogDriver *s, const gchar *null)
170 {
171 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
172
173 if (self->null_value)
174 g_free(self->null_value);
175 self->null_value = g_strdup(null);
176 }
177
178 void
afsql_dd_set_ignore_tns_config(LogDriver * s,const gboolean ignore_tns_config)179 afsql_dd_set_ignore_tns_config(LogDriver *s, const gboolean ignore_tns_config)
180 {
181 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
182
183 self->ignore_tns_config = ignore_tns_config;
184 }
185
186 void
afsql_dd_set_session_statements(LogDriver * s,GList * session_statements)187 afsql_dd_set_session_statements(LogDriver *s, GList *session_statements)
188 {
189 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
190
191 self->session_statements = session_statements;
192 }
193
194 void
afsql_dd_set_flags(LogDriver * s,gint flags)195 afsql_dd_set_flags(LogDriver *s, gint flags)
196 {
197 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
198
199 self->flags = flags;
200 }
201
202 void
afsql_dd_set_create_statement_append(LogDriver * s,const gchar * create_statement_append)203 afsql_dd_set_create_statement_append(LogDriver *s, const gchar *create_statement_append)
204 {
205 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
206
207 g_free(self->create_statement_append);
208 self->create_statement_append = g_strdup(create_statement_append);
209 }
210
211 /**
212 * afsql_dd_run_query:
213 *
214 * Run an SQL query on the connected database.
215 *
216 * NOTE: This function can only be called from the database thread.
217 **/
218 static gboolean
afsql_dd_run_query(AFSqlDestDriver * self,const gchar * query,gboolean silent,dbi_result * result)219 afsql_dd_run_query(AFSqlDestDriver *self, const gchar *query, gboolean silent, dbi_result *result)
220 {
221 dbi_result db_res;
222
223 msg_debug("Running SQL query",
224 evt_tag_str("query", query));
225
226 db_res = dbi_conn_query(self->dbi_ctx, query);
227 if (!db_res)
228 {
229 const gchar *dbi_error;
230
231 if (!silent)
232 {
233 dbi_conn_error(self->dbi_ctx, &dbi_error);
234 msg_error("Error running SQL query",
235 evt_tag_str("type", self->type),
236 evt_tag_str("host", self->host),
237 evt_tag_str("port", self->port),
238 evt_tag_str("user", self->user),
239 evt_tag_str("database", self->database),
240 evt_tag_str("error", dbi_error),
241 evt_tag_str("query", query));
242 }
243 return FALSE;
244 }
245 if (result)
246 *result = db_res;
247 else
248 dbi_result_free(db_res);
249 return TRUE;
250 }
251
252 /**
253 * afsql_dd_commit_transaction:
254 *
255 * Commit SQL transaction.
256 *
257 * NOTE: This function can only be called from the database thread.
258 **/
259 static gboolean
afsql_dd_commit_transaction(AFSqlDestDriver * self)260 afsql_dd_commit_transaction(AFSqlDestDriver *self)
261 {
262 gboolean success;
263
264 if (!self->transaction_active)
265 return TRUE;
266
267 success = afsql_dd_run_query(self, "COMMIT", FALSE, NULL);
268 if (success)
269 {
270 self->transaction_active = FALSE;
271 }
272 else
273 {
274 msg_error("SQL transaction commit failed, rewinding backlog and starting again");
275 }
276 return success;
277 }
278
279 /**
280 * afsql_dd_begin_transaction:
281 *
282 * Begin SQL transaction.
283 *
284 * NOTE: This function can only be called from the database thread.
285 **/
286 static gboolean
afsql_dd_begin_transaction(AFSqlDestDriver * self)287 afsql_dd_begin_transaction(AFSqlDestDriver *self)
288 {
289 gboolean success = TRUE;
290 const char *s_begin = "BEGIN";
291 if (!strcmp(self->type, s_freetds))
292 {
293 /* the mssql requires this command */
294 s_begin = "BEGIN TRANSACTION";
295 }
296
297 if (strcmp(self->type, s_oracle) != 0)
298 {
299 /* oracle db has no BEGIN TRANSACTION command, it implicitly starts one, after every commit. */
300 success = afsql_dd_run_query(self, s_begin, FALSE, NULL);
301 }
302
303 self->transaction_active = success;
304
305 return success;
306 }
307
308 static gboolean
afsql_dd_rollback_transaction(AFSqlDestDriver * self)309 afsql_dd_rollback_transaction(AFSqlDestDriver *self)
310 {
311 if (!self->transaction_active)
312 return TRUE;
313
314 self->transaction_active = FALSE;
315
316 return afsql_dd_run_query(self, "ROLLBACK", FALSE, NULL);
317 }
318
319 static gboolean
afsql_dd_begin_new_transaction(AFSqlDestDriver * self)320 afsql_dd_begin_new_transaction(AFSqlDestDriver *self)
321 {
322 if (self->transaction_active)
323 {
324 if (!afsql_dd_commit_transaction(self))
325 {
326 afsql_dd_rollback_transaction(self);
327 return FALSE;
328 }
329 }
330
331 return afsql_dd_begin_transaction(self);
332 }
333
_sql_identifier_is_valid_char(gchar c)334 static gboolean _sql_identifier_is_valid_char(gchar c)
335 {
336 return ((c == '.') ||
337 (c == '_') ||
338 (c >= '0' && c <= '9') ||
339 (g_ascii_tolower(c) >= 'a' && g_ascii_tolower(c) <= 'z'));
340 }
341
342 static gboolean
_is_sql_identifier_sanitized(const gchar * token)343 _is_sql_identifier_sanitized(const gchar *token)
344 {
345 gint i;
346
347 for (i = 0; token[i]; i++)
348 {
349 if (!_sql_identifier_is_valid_char(token[i]))
350 return FALSE;
351 }
352
353 return TRUE;
354 }
355
356 static void
_sanitize_sql_identifier(gchar * token)357 _sanitize_sql_identifier(gchar *token)
358 {
359 gint i;
360
361 for (i = 0; token[i]; i++)
362 {
363 if (!_sql_identifier_is_valid_char(token[i]))
364 token[i] = '_';
365 }
366 }
367
368 /**
369 * afsql_dd_create_index:
370 *
371 * This function creates an index for the column specified and returns
372 * TRUE to indicate success.
373 *
374 * NOTE: This function can only be called from the database thread.
375 **/
376 static gboolean
afsql_dd_create_index(AFSqlDestDriver * self,const gchar * table,const gchar * column)377 afsql_dd_create_index(AFSqlDestDriver *self, const gchar *table, const gchar *column)
378 {
379 GString *query_string;
380 gboolean success = TRUE;
381
382 query_string = g_string_sized_new(64);
383
384 if (strcmp(self->type, s_oracle) == 0)
385 {
386 /* NOTE: oracle index indentifier length is max 30 characters
387 * so we use the first 30 characters of the table_column md5 hash */
388 if ((strlen(table) + strlen(column)) > 25)
389 {
390
391 guchar hash[MD5_DIGEST_LENGTH];
392 gchar hash_str[31];
393 gchar *cat = g_strjoin("_", table, column, NULL);
394
395 MD5((guchar *)cat, strlen(cat), hash);
396 g_free(cat);
397
398 format_hex_string(hash, sizeof(hash), hash_str, sizeof(hash_str));
399 hash_str[0] = 'i';
400 g_string_printf(query_string, "CREATE INDEX %s ON %s (%s)",
401 hash_str, table, column);
402 }
403 else
404 g_string_printf(query_string, "CREATE INDEX %s_%s_idx ON %s (%s)",
405 table, column, table, column);
406 }
407 else
408 g_string_printf(query_string, "CREATE INDEX %s_%s_idx ON %s (%s)",
409 table, column, table, column);
410 if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
411 {
412 msg_error("Error adding missing index",
413 evt_tag_str("table", table),
414 evt_tag_str("column", column));
415 success = FALSE;
416 }
417 g_string_free(query_string, TRUE);
418 return success;
419 }
420
421 static inline gboolean
_is_table_syslogng_conform(AFSqlDestDriver * self,const gchar * table)422 _is_table_syslogng_conform(AFSqlDestDriver *self, const gchar *table)
423 {
424 return (g_hash_table_lookup(self->syslogng_conform_tables, table) != NULL);
425 }
426
427 static inline void
_remember_table_as_syslogng_conform(AFSqlDestDriver * self,const gchar * table)428 _remember_table_as_syslogng_conform(AFSqlDestDriver *self, const gchar *table)
429 {
430 g_hash_table_insert(self->syslogng_conform_tables, g_strdup(table), GUINT_TO_POINTER(TRUE));
431 }
432
433 static gboolean
_is_table_present(AFSqlDestDriver * self,const gchar * table,dbi_result * metadata)434 _is_table_present(AFSqlDestDriver *self, const gchar *table, dbi_result *metadata)
435 {
436 gboolean res = FALSE;
437 GString *query_string;
438
439 if (!afsql_dd_begin_new_transaction(self))
440 {
441 msg_error("Starting new transaction has failed");
442
443 return FALSE;
444 }
445
446 query_string = g_string_sized_new(32);
447 g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table);
448 res = afsql_dd_run_query(self, query_string->str, TRUE, metadata);
449 g_string_free(query_string, TRUE);
450
451 afsql_dd_commit_transaction(self);
452
453 return res;
454 }
455
456 static gboolean
_ensure_table_is_syslogng_conform(AFSqlDestDriver * self,dbi_result db_res,const gchar * table)457 _ensure_table_is_syslogng_conform(AFSqlDestDriver *self, dbi_result db_res, const gchar *table)
458 {
459 gboolean success = TRUE;
460 gboolean new_transaction_started = FALSE;
461 gint i;
462 GString *query_string = g_string_sized_new(32);
463
464 for (i = 0; success && (i < self->fields_len); i++)
465 {
466 if (dbi_result_get_field_idx(db_res, self->fields[i].name) == 0)
467 {
468 GList *l;
469 if (!new_transaction_started)
470 {
471 if (!afsql_dd_begin_new_transaction(self))
472 {
473 msg_error("Starting new transaction for modifying(ALTER) table has failed",
474 evt_tag_str("table", table));
475 success = FALSE;
476 break;
477 }
478 new_transaction_started = TRUE;
479 }
480 /* field does not exist, add this column */
481 g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type);
482 if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
483 {
484 msg_error("Error adding missing column, giving up",
485 evt_tag_str("table", table),
486 evt_tag_str("column", self->fields[i].name));
487 success = FALSE;
488 break;
489 }
490 for (l = self->indexes; l; l = l->next)
491 {
492 if (strcmp((gchar *) l->data, self->fields[i].name) == 0)
493 {
494 /* this is an indexed column, create index */
495 afsql_dd_create_index(self, table, self->fields[i].name);
496 }
497 }
498 }
499 }
500
501 if (new_transaction_started && ( !success || !afsql_dd_commit_transaction(self)))
502 {
503 afsql_dd_rollback_transaction(self);
504 success = FALSE;
505 }
506
507 g_string_free(query_string, TRUE);
508
509 return success;
510 }
511
512 static gboolean
_table_create_indexes(AFSqlDestDriver * self,const gchar * table)513 _table_create_indexes(AFSqlDestDriver *self, const gchar *table)
514 {
515 gboolean success = TRUE;
516 GList *l;
517
518 if (!afsql_dd_begin_new_transaction(self))
519 {
520 msg_error("Starting new transaction for table creation has failed",
521 evt_tag_str("table", table));
522 return FALSE;
523 }
524
525 for (l = self->indexes; l && success; l = l->next)
526 {
527 success = afsql_dd_create_index(self, table, (gchar *) l->data);
528 }
529
530 if (!success || !afsql_dd_commit_transaction(self))
531 {
532 afsql_dd_rollback_transaction(self);
533 }
534
535 return success;
536 }
537
538 static gboolean
_table_create(AFSqlDestDriver * self,const gchar * table)539 _table_create(AFSqlDestDriver *self, const gchar *table)
540 {
541 gint i;
542 GString *query_string = g_string_sized_new(32);
543 gboolean success = FALSE;
544 if (!afsql_dd_begin_new_transaction(self))
545 {
546 msg_error("Starting new transaction for table creation has failed",
547 evt_tag_str("table", table));
548 return FALSE;
549 }
550
551 g_string_printf(query_string, "CREATE TABLE %s (", table);
552 for (i = 0; i < self->fields_len; i++)
553 {
554 g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type);
555 if (i != self->fields_len - 1)
556 g_string_append(query_string, ", ");
557 }
558 g_string_append(query_string, ")");
559 if (self->create_statement_append)
560 g_string_append(query_string, self->create_statement_append);
561 if (afsql_dd_run_query(self, query_string->str, FALSE, NULL))
562 {
563 success = TRUE;
564 }
565 else
566 {
567 msg_error("Error creating table, giving up",
568 evt_tag_str("table", table));
569 }
570
571 if (!success || !afsql_dd_commit_transaction(self))
572 {
573 afsql_dd_rollback_transaction(self);
574 }
575
576 g_string_free(query_string, TRUE);
577
578 return success;
579 }
580
581 /**
582 * afsql_dd_validate_table:
583 *
584 * Check if the given table exists in the database. If it doesn't
585 * create it, if it does, check if all the required fields are
586 * present and create them if they don't.
587 *
588 * NOTE: This function can only be called from the database thread.
589 **/
590 static gboolean
afsql_dd_ensure_table_is_syslogng_conform(AFSqlDestDriver * self,GString * table)591 afsql_dd_ensure_table_is_syslogng_conform(AFSqlDestDriver *self, GString *table)
592 {
593 dbi_result db_res = NULL;
594 gboolean success = FALSE;
595
596 if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
597 return TRUE;
598
599 _sanitize_sql_identifier(table->str);
600
601 if (_is_table_syslogng_conform(self, table->str))
602 return TRUE;
603
604 if (_is_table_present(self, table->str, &db_res))
605 {
606 /* table exists, check structure */
607 success = _ensure_table_is_syslogng_conform(self, db_res, table->str);
608 if (db_res)
609 dbi_result_free(db_res);
610 }
611 else
612 {
613 /* table does not exist, create it */
614 success = _table_create(self, table->str) && _table_create_indexes(self, table->str);
615 }
616
617 if (success)
618 {
619 /* we have successfully created/altered the destination table, record this information */
620 _remember_table_as_syslogng_conform(self, table->str);
621 }
622
623 return success;
624 }
625
626 static void
afsql_dd_set_dbd_opt(gpointer key,gpointer value,gpointer user_data)627 afsql_dd_set_dbd_opt(gpointer key, gpointer value, gpointer user_data)
628 {
629 dbi_conn_set_option((dbi_conn)user_data, (gchar *)key, (gchar *)value);
630 }
631
632 static void
afsql_dd_set_dbd_opt_numeric(gpointer key,gpointer value,gpointer user_data)633 afsql_dd_set_dbd_opt_numeric(gpointer key, gpointer value, gpointer user_data)
634 {
635 dbi_conn_set_option_numeric((dbi_conn)user_data, (gchar *)key,
636 GPOINTER_TO_INT(value));
637 }
638
639 /*
640 * NOTE: there's a bug in libdbd-sqlite3 and this function basically works
641 * that around. The issue is that the database path cannot be an empty
642 * string as it causes an invalid read (it is using -1 as an index in that
643 * case). What we do is that if database is a fully specified path to a
644 * filename (e.g. starts with a slash), we use another slash, so it remains
645 * a root-relative filename. Otherwise, we simply use the current
646 * directory.
647 */
648 static const gchar *
_sqlite_db_dir(const gchar * database,gchar * buf,gsize buflen)649 _sqlite_db_dir(const gchar *database, gchar *buf, gsize buflen)
650 {
651 if (database[0] == '/')
652 return strncpy(buf, "/", buflen);
653 else
654 return getcwd(buf, buflen);
655 return buf;
656 }
657
658 static void
_enable_database_specific_hacks(AFSqlDestDriver * self)659 _enable_database_specific_hacks(AFSqlDestDriver *self)
660 {
661 gchar buf[1024];
662
663 if (strcmp(self->type, "sqlite") == 0)
664 dbi_conn_set_option(self->dbi_ctx, "sqlite_dbdir", _sqlite_db_dir(self->database, buf, sizeof(buf)));
665 else if (strcmp(self->type, "sqlite3") == 0)
666 dbi_conn_set_option(self->dbi_ctx, "sqlite3_dbdir", _sqlite_db_dir(self->database, buf, sizeof(buf)));
667 else if (strcmp(self->type, s_oracle) == 0)
668 dbi_conn_set_option_numeric(self->dbi_ctx, "oracle_ignore_tns_config", self->ignore_tns_config);
669 }
670
671 static gboolean
afsql_dd_connect(LogThreadedDestDriver * s)672 afsql_dd_connect(LogThreadedDestDriver *s)
673 {
674 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
675
676 self->dbi_ctx = dbi_conn_new_r(self->type, dbi_instance);
677
678 if (!self->dbi_ctx)
679 {
680 msg_error("No such DBI driver",
681 evt_tag_str("type", self->type));
682 return FALSE;
683 }
684
685 dbi_conn_set_option(self->dbi_ctx, "host", self->host);
686
687 if (strcmp(self->type, "mysql"))
688 dbi_conn_set_option(self->dbi_ctx, "port", self->port);
689 else
690 dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port));
691
692 dbi_conn_set_option(self->dbi_ctx, "username", self->user);
693 dbi_conn_set_option(self->dbi_ctx, "password", self->password);
694 dbi_conn_set_option(self->dbi_ctx, "dbname", self->database);
695 dbi_conn_set_option(self->dbi_ctx, "encoding", self->encoding);
696 dbi_conn_set_option(self->dbi_ctx, "auto-commit", self->flags & AFSQL_DDF_EXPLICIT_COMMITS ? "false" : "true");
697
698 _enable_database_specific_hacks(self);
699
700 /* Set user-specified options */
701 g_hash_table_foreach(self->dbd_options, afsql_dd_set_dbd_opt, self->dbi_ctx);
702 g_hash_table_foreach(self->dbd_options_numeric, afsql_dd_set_dbd_opt_numeric, self->dbi_ctx);
703
704 if (dbi_conn_connect(self->dbi_ctx) < 0)
705 {
706 const gchar *dbi_error;
707
708 dbi_conn_error(self->dbi_ctx, &dbi_error);
709
710 msg_error("Error establishing SQL connection",
711 evt_tag_str("type", self->type),
712 evt_tag_str("host", self->host),
713 evt_tag_str("port", self->port),
714 evt_tag_str("username", self->user),
715 evt_tag_str("database", self->database),
716 evt_tag_str("error", dbi_error));
717
718 return FALSE;
719 }
720
721 if (self->session_statements != NULL)
722 {
723 GList *l;
724
725 for (l = self->session_statements; l; l = l->next)
726 {
727 if (!afsql_dd_run_query(self, (gchar *) l->data, FALSE, NULL))
728 {
729 msg_error("Error executing SQL connection statement",
730 evt_tag_str("statement", (gchar *) l->data));
731
732 return FALSE;
733 }
734 }
735 }
736
737 return TRUE;
738 }
739
740 static void
afsql_dd_disconnect(LogThreadedDestDriver * s)741 afsql_dd_disconnect(LogThreadedDestDriver *s)
742 {
743 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
744
745 dbi_conn_close(self->dbi_ctx);
746 self->dbi_ctx = NULL;
747 }
748
749 static GString *
afsql_dd_ensure_accessible_database_table(AFSqlDestDriver * self,LogMessage * msg)750 afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg)
751 {
752 GString *table = g_string_sized_new(32);
753
754 LogTemplateEvalOptions options = {&self->template_options, LTZ_LOCAL, 0, NULL};
755 log_template_format(self->table, msg, &options, table);
756
757 if (!afsql_dd_ensure_table_is_syslogng_conform(self, table))
758 {
759 /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
760 msg_error("Error checking table, disconnecting from database, trying again shortly",
761 evt_tag_int("time_reopen", self->super.time_reopen));
762 g_string_free(table, TRUE);
763 return NULL;
764 }
765
766 return table;
767 }
768
769 static GString *
afsql_dd_build_insert_command(AFSqlDestDriver * self,LogMessage * msg,GString * table)770 afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table)
771 {
772 GString *insert_command = g_string_sized_new(256);
773 GString *value = g_string_sized_new(512);
774 gint i, j;
775
776 g_string_printf(insert_command, "INSERT INTO %s (", table->str);
777
778 for (i = 0; i < self->fields_len; i++)
779 {
780 if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
781 {
782 g_string_append(insert_command, self->fields[i].name);
783
784 j = i + 1;
785 while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
786 j++;
787
788 if (j < self->fields_len)
789 g_string_append(insert_command, ", ");
790 }
791 }
792
793 g_string_append(insert_command, ") VALUES (");
794
795 for (i = 0; i < self->fields_len; i++)
796 {
797 gchar *quoted;
798
799 if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
800 {
801 LogTemplateEvalOptions options = {&self->template_options, LTZ_SEND, self->super.worker.instance.seq_num, NULL};
802 log_template_format(self->fields[i].value, msg, &options, value);
803 if (self->null_value && strcmp(self->null_value, value->str) == 0)
804 {
805 g_string_append(insert_command, "NULL");
806 }
807 else
808 {
809 dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed);
810 if (quoted)
811 {
812 g_string_append(insert_command, quoted);
813 free(quoted);
814 }
815 else
816 {
817 g_string_append(insert_command, "''");
818 }
819 }
820
821 j = i + 1;
822 while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
823 j++;
824 if (j < self->fields_len)
825 g_string_append(insert_command, ", ");
826 }
827 }
828
829 g_string_append(insert_command, ")");
830
831 g_string_free(value, TRUE);
832
833 return insert_command;
834 }
835
836 static inline gboolean
afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver * self)837 afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self)
838 {
839 return !!(self->flags & AFSQL_DDF_EXPLICIT_COMMITS);
840 }
841
842 static inline gboolean
afsql_dd_should_begin_new_transaction(const AFSqlDestDriver * self)843 afsql_dd_should_begin_new_transaction(const AFSqlDestDriver *self)
844 {
845 return afsql_dd_is_transaction_handling_enabled(self) && self->super.worker.instance.batch_size == 1;
846 }
847
848 static gint
_batch_lines(const AFSqlDestDriver * self)849 _batch_lines(const AFSqlDestDriver *self)
850 {
851 if (self->super.batch_lines <= 0)
852 return DEFAULT_SQL_TX_SIZE;
853
854 return self->super.batch_lines;
855 }
856
857 static LogThreadedResult
afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver * self)858 afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self)
859 {
860 const gchar *dbi_error, *error_message;
861
862 if (dbi_conn_ping(self->dbi_ctx) == 1)
863 {
864 return LTR_ERROR;
865 }
866
867 if (afsql_dd_is_transaction_handling_enabled(self))
868 {
869 error_message = "SQL connection lost in the middle of a transaction,"
870 " rewinding backlog and starting again";
871 }
872 else
873 {
874 error_message = "Error, no SQL connection after failed query attempt";
875 }
876
877 dbi_conn_error(self->dbi_ctx, &dbi_error);
878 msg_error(error_message,
879 evt_tag_str("type", self->type),
880 evt_tag_str("host", self->host),
881 evt_tag_str("port", self->port),
882 evt_tag_str("username", self->user),
883 evt_tag_str("database", self->database),
884 evt_tag_str("error", dbi_error));
885
886 return LTR_ERROR;
887 }
888
889 static LogThreadedResult
afsql_dd_flush(LogThreadedDestDriver * s)890 afsql_dd_flush(LogThreadedDestDriver *s)
891 {
892 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
893
894 if (!afsql_dd_is_transaction_handling_enabled(self))
895 return LTR_SUCCESS;
896
897 if (!afsql_dd_commit_transaction(self))
898 {
899 /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_transaction() */
900 afsql_dd_rollback_transaction(self);
901 return LTR_ERROR;
902 }
903 return LTR_SUCCESS;
904 }
905
906 static gboolean
afsql_dd_run_insert_query(AFSqlDestDriver * self,GString * table,LogMessage * msg)907 afsql_dd_run_insert_query(AFSqlDestDriver *self, GString *table, LogMessage *msg)
908 {
909 GString *insert_command;
910
911 insert_command = afsql_dd_build_insert_command(self, msg, table);
912 gboolean success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL);
913 g_string_free(insert_command, TRUE);
914 return success;
915 }
916
917 /**
918 * afsql_dd_insert_db:
919 *
920 * This function is running in the database thread
921 *
922 * Returns: FALSE to indicate that the connection should be closed and
923 * this destination suspended for time_reopen() time.
924 **/
925 static LogThreadedResult
afsql_dd_insert(LogThreadedDestDriver * s,LogMessage * msg)926 afsql_dd_insert(LogThreadedDestDriver *s, LogMessage *msg)
927 {
928 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
929 GString *table = NULL;
930 LogThreadedResult retval = LTR_ERROR;
931
932 table = afsql_dd_ensure_accessible_database_table(self, msg);
933 if (!table)
934 goto error;
935
936 if (afsql_dd_should_begin_new_transaction(self) && !afsql_dd_begin_transaction(self))
937 goto error;
938
939 if (!afsql_dd_run_insert_query(self, table, msg))
940 {
941 retval = afsql_dd_handle_insert_row_error_depending_on_connection_availability(self);
942 goto error;
943 }
944
945 retval = afsql_dd_is_transaction_handling_enabled(self)
946 ? LTR_QUEUED
947 : LTR_SUCCESS;
948
949 error:
950
951 if (table != NULL)
952 g_string_free(table, TRUE);
953
954 return retval;
955 }
956
957 static const gchar *
afsql_dd_format_stats_instance(LogThreadedDestDriver * s)958 afsql_dd_format_stats_instance(LogThreadedDestDriver *s)
959 {
960 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
961 static gchar persist_name[64];
962
963 g_snprintf(persist_name, sizeof(persist_name),
964 "%s,%s,%s,%s,%s",
965 self->type, self->host, self->port, self->database, self->table->template);
966 return persist_name;
967 }
968
969 static const gchar *
afsql_dd_format_persist_name(const LogPipe * s)970 afsql_dd_format_persist_name(const LogPipe *s)
971 {
972 AFSqlDestDriver *self = (AFSqlDestDriver *)s;
973 static gchar persist_name[256];
974
975 if (s->persist_name)
976 g_snprintf(persist_name, sizeof(persist_name), "afsql_dd.%s", s->persist_name);
977 else
978 g_snprintf(persist_name, sizeof(persist_name), "afsql_dd(%s,%s,%s,%s,%s)", self->type,
979 self->host, self->port, self->database, self->table->template);
980
981 return persist_name;
982 }
983
984 static const gchar *
_afsql_dd_format_legacy_persist_name(const AFSqlDestDriver * self)985 _afsql_dd_format_legacy_persist_name(const AFSqlDestDriver *self)
986 {
987 static gchar legacy_persist_name[256];
988
989 g_snprintf(legacy_persist_name, sizeof(legacy_persist_name),
990 "afsql_dd_qfile(%s,%s,%s,%s,%s)",
991 self->type, self->host, self->port, self->database, self->table->template);
992
993 return legacy_persist_name;
994 }
995
996 static gboolean
_update_legacy_persist_name_if_exists(AFSqlDestDriver * self)997 _update_legacy_persist_name_if_exists(AFSqlDestDriver *self)
998 {
999 GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
1000 const gchar *current_persist_name = afsql_dd_format_persist_name(&self->super.super.super.super);
1001 const gchar *legacy_persist_name = _afsql_dd_format_legacy_persist_name(self);
1002
1003 if (persist_state_entry_exists(cfg->state, current_persist_name))
1004 return TRUE;
1005
1006 if (!persist_state_entry_exists(cfg->state, legacy_persist_name))
1007 return TRUE;
1008
1009 return persist_state_move_entry(cfg->state, legacy_persist_name, current_persist_name);
1010 }
1011
1012 static gboolean
_init_fields_from_columns_and_values(AFSqlDestDriver * self)1013 _init_fields_from_columns_and_values(AFSqlDestDriver *self)
1014 {
1015 GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
1016 GList *col, *value;
1017 gint len_cols, len_values;
1018 gint i;
1019
1020 if (self->fields)
1021 return TRUE;
1022
1023 len_cols = g_list_length(self->columns);
1024 len_values = g_list_length(self->values);
1025 if (len_cols != len_values)
1026 {
1027 msg_error("The number of columns and values do not match",
1028 evt_tag_int("len_columns", len_cols),
1029 evt_tag_int("len_values", len_values));
1030 return FALSE;
1031 }
1032 self->fields_len = len_cols;
1033 self->fields = g_new0(AFSqlField, len_cols);
1034
1035 for (i = 0, col = self->columns, value = self->values; col && value; i++, col = col->next, value = value->next)
1036 {
1037 gchar *space;
1038
1039 space = strchr(col->data, ' ');
1040 if (space)
1041 {
1042 self->fields[i].name = g_strndup(col->data, space - (gchar *) col->data);
1043 while (*space == ' ')
1044 space++;
1045 if (*space != '\0')
1046 self->fields[i].type = g_strdup(space);
1047 else
1048 self->fields[i].type = g_strdup("text");
1049 }
1050 else
1051 {
1052 self->fields[i].name = g_strdup(col->data);
1053 self->fields[i].type = g_strdup("text");
1054 }
1055 if (!_is_sql_identifier_sanitized(self->fields[i].name))
1056 {
1057 msg_error("Column name is not a proper SQL name",
1058 evt_tag_str("column", self->fields[i].name));
1059 return FALSE;
1060 }
1061
1062 if (GPOINTER_TO_UINT(value->data) > 4096)
1063 {
1064 self->fields[i].value = log_template_new(cfg, NULL);
1065 log_template_compile(self->fields[i].value, (gchar *) value->data, NULL);
1066 }
1067 else
1068 {
1069 switch (GPOINTER_TO_UINT(value->data))
1070 {
1071 case AFSQL_COLUMN_DEFAULT:
1072 self->fields[i].flags |= AFSQL_FF_DEFAULT;
1073 break;
1074 default:
1075 g_assert_not_reached();
1076 break;
1077 }
1078 }
1079 }
1080 return TRUE;
1081 }
1082
1083 static gboolean
_initialize_dbi(void)1084 _initialize_dbi(void)
1085 {
1086 if (!dbi_initialized)
1087 {
1088 errno = 0;
1089 gint rc = dbi_initialize_r(NULL, &dbi_instance);
1090
1091 if (rc < 0)
1092 {
1093 /* NOTE: errno might be unreliable, but that's all we have */
1094 msg_error("Unable to initialize database access (DBI)",
1095 evt_tag_int("rc", rc),
1096 evt_tag_error("error"));
1097 return FALSE;
1098 }
1099 else if (rc == 0)
1100 {
1101 msg_error("The database access library (DBI) reports no usable SQL drivers, perhaps DBI drivers are not installed properly");
1102 return FALSE;
1103 }
1104 else
1105 {
1106 dbi_initialized = TRUE;
1107 }
1108 }
1109 return TRUE;
1110 }
1111
1112 static gboolean
afsql_dd_init(LogPipe * s)1113 afsql_dd_init(LogPipe *s)
1114 {
1115 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
1116 GlobalConfig *cfg = log_pipe_get_config(s);
1117
1118 if (!_update_legacy_persist_name_if_exists(self))
1119 return FALSE;
1120 if (!_initialize_dbi())
1121 return FALSE;
1122
1123 if (!self->columns || !self->values)
1124 {
1125 msg_error("Default columns and values must be specified for database destinations",
1126 evt_tag_str("type", self->type));
1127 return FALSE;
1128 }
1129
1130 if (self->ignore_tns_config && strcmp(self->type, s_oracle) != 0)
1131 {
1132 msg_warning("WARNING: Option ignore_tns_config was skipped because database type is not Oracle",
1133 evt_tag_str("type", self->type));
1134 }
1135
1136 if (!_init_fields_from_columns_and_values(self))
1137 return FALSE;
1138
1139 if (!log_threaded_dest_driver_init_method(s))
1140 return FALSE;
1141
1142 log_template_options_init(&self->template_options, cfg);
1143
1144 if (afsql_dd_is_transaction_handling_enabled(self))
1145 log_threaded_dest_driver_set_batch_lines((LogDriver *)self, _batch_lines(self));
1146
1147 return TRUE;
1148 }
1149
1150 static void
afsql_dd_free(LogPipe * s)1151 afsql_dd_free(LogPipe *s)
1152 {
1153 AFSqlDestDriver *self = (AFSqlDestDriver *) s;
1154 gint i;
1155
1156 log_template_options_destroy(&self->template_options);
1157 for (i = 0; i < self->fields_len; i++)
1158 {
1159 g_free(self->fields[i].name);
1160 g_free(self->fields[i].type);
1161 log_template_unref(self->fields[i].value);
1162 }
1163
1164 g_free(self->fields);
1165 g_free(self->type);
1166 g_free(self->host);
1167 g_free(self->port);
1168 g_free(self->user);
1169 g_free(self->password);
1170 g_free(self->database);
1171 g_free(self->encoding);
1172 g_free(self->create_statement_append);
1173 if (self->null_value)
1174 g_free(self->null_value);
1175 string_list_free(self->columns);
1176 string_list_free(self->indexes);
1177 string_list_free(self->values);
1178 log_template_unref(self->table);
1179 g_hash_table_destroy(self->syslogng_conform_tables);
1180 g_hash_table_destroy(self->dbd_options);
1181 g_hash_table_destroy(self->dbd_options_numeric);
1182 if (self->session_statements)
1183 string_list_free(self->session_statements);
1184 log_threaded_dest_driver_free(s);
1185 }
1186
1187 LogDriver *
afsql_dd_new(GlobalConfig * cfg)1188 afsql_dd_new(GlobalConfig *cfg)
1189 {
1190 AFSqlDestDriver *self = g_new0(AFSqlDestDriver, 1);
1191
1192 log_threaded_dest_driver_init_instance(&self->super, cfg);
1193
1194 self->super.super.super.super.init = afsql_dd_init;
1195 self->super.super.super.super.free_fn = afsql_dd_free;
1196 self->super.super.super.super.generate_persist_name = afsql_dd_format_persist_name;
1197 self->super.format_stats_instance = afsql_dd_format_stats_instance;
1198 self->super.worker.connect = afsql_dd_connect;
1199 self->super.worker.disconnect = afsql_dd_disconnect;
1200 self->super.worker.insert = afsql_dd_insert;
1201 self->super.worker.flush = afsql_dd_flush;
1202
1203 self->type = g_strdup("mysql");
1204 self->host = g_strdup("");
1205 self->port = g_strdup("");
1206 self->user = g_strdup("syslog-ng");
1207 self->password = g_strdup("");
1208 self->database = g_strdup("logs");
1209 self->encoding = g_strdup("UTF-8");
1210 self->transaction_active = FALSE;
1211 self->ignore_tns_config = FALSE;
1212
1213 self->table = log_template_new(configuration, NULL);
1214 log_template_compile_literal_string(self->table, "messages");
1215 self->failed_message_counter = 0;
1216
1217 self->session_statements = NULL;
1218
1219 self->syslogng_conform_tables = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
1220 self->dbd_options = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
1221 self->dbd_options_numeric = g_hash_table_new_full(g_str_hash, g_int_equal, g_free, NULL);
1222
1223 log_template_options_defaults(&self->template_options);
1224 self->super.stats_source = stats_register_type("sql");
1225
1226 return &self->super.super.super;
1227 }
1228
1229 gint
afsql_dd_lookup_flag(const gchar * flag)1230 afsql_dd_lookup_flag(const gchar *flag)
1231 {
1232 if (strcmp(flag, "explicit-commits") == 0)
1233 return AFSQL_DDF_EXPLICIT_COMMITS;
1234 else if (strcmp(flag, "dont-create-tables") == 0)
1235 return AFSQL_DDF_DONT_CREATE_TABLES;
1236 else
1237 msg_warning("Unknown SQL flag",
1238 evt_tag_str("flag", flag));
1239 return 0;
1240 }
1241