1 /*
2  * Copyright (c) 2010-2021 One Identity
3  * Copyright (c) 2010-2014 Gergely Nagy <algernon@balabit.hu>
4  * Copyright (c) 2021 László Várady
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License version 2 as published
8  * by the Free Software Foundation, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "afmongodb-worker.h"
26 #include "afmongodb-private.h"
27 #include "messages.h"
28 #include "value-pairs/evttag.h"
29 #include "value-pairs/value-pairs.h"
30 
31 static void
_worker_disconnect(LogThreadedDestWorker * s)32 _worker_disconnect(LogThreadedDestWorker *s)
33 {
34   MongoDBDestWorker *self = (MongoDBDestWorker *)s;
35   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
36 
37   if (self->coll_obj)
38     mongoc_collection_destroy(self->coll_obj);
39   self->coll_obj = NULL;
40 
41   if (self->client)
42     {
43       mongoc_client_pool_push(owner->pool, self->client);
44       self->client = NULL;
45     }
46 }
47 
48 static const gchar *
_format_collection_template(MongoDBDestWorker * self,LogMessage * msg)49 _format_collection_template(MongoDBDestWorker *self, LogMessage *msg)
50 {
51   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
52 
53   LogTemplateEvalOptions options = { &owner->template_options, LTZ_SEND, self->super.seq_num, NULL };
54   log_template_format(owner->collection_template, msg, &options, self->collection);
55 
56   return self->collection->str;
57 }
58 
59 static gboolean
_switch_collection(MongoDBDestWorker * self,const gchar * collection)60 _switch_collection(MongoDBDestWorker *self, const gchar *collection)
61 {
62   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
63 
64   if (!self->client)
65     return FALSE;
66 
67   if (self->coll_obj)
68     mongoc_collection_destroy(self->coll_obj);
69 
70   self->coll_obj = mongoc_client_get_collection(self->client, owner->const_db, collection);
71 
72   if (!self->coll_obj)
73     {
74       msg_error("Error getting specified MongoDB collection",
75                 evt_tag_str("collection", collection),
76                 evt_tag_str("driver", owner->super.super.super.id));
77 
78       return FALSE;
79     }
80 
81   msg_debug("Switching MongoDB collection", evt_tag_str("new_collection", collection));
82   return TRUE;
83 }
84 
85 static gboolean
_check_server_status(MongoDBDestWorker * self,const mongoc_read_prefs_t * read_prefs)86 _check_server_status(MongoDBDestWorker *self, const mongoc_read_prefs_t *read_prefs)
87 {
88   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
89 
90   bson_t reply;
91   bson_error_t error;
92 
93   if (!self->client)
94     return FALSE;
95 
96   bson_t *cmd = BCON_NEW("serverStatus", "1");
97   gboolean ok = mongoc_client_command_simple(self->client, owner->const_db ? : "", cmd, read_prefs, &reply, &error);
98   bson_destroy(&reply);
99   bson_destroy(cmd);
100 
101   if (!ok)
102     {
103       msg_error("Error connecting to MongoDB",
104                 evt_tag_str("driver", owner->super.super.super.id),
105                 evt_tag_str("reason", error.message));
106       return FALSE;
107     }
108 
109   return TRUE;
110 }
111 
112 static gboolean
_worker_connect(LogThreadedDestWorker * s)113 _worker_connect(LogThreadedDestWorker *s)
114 {
115   MongoDBDestWorker *self = (MongoDBDestWorker *)s;
116   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
117 
118   if (!self->client)
119     {
120       self->client = mongoc_client_pool_pop(owner->pool);
121 
122       if (!self->client)
123         {
124           msg_error("Error creating MongoDB URI",
125                     evt_tag_str("driver", owner->super.super.super.id));
126           return FALSE;
127         }
128     }
129 
130   const mongoc_read_prefs_t *read_prefs = NULL;
131 
132   if (owner->collection_is_literal_string && !self->coll_obj)
133     {
134       const gchar *collection = log_template_get_literal_value(owner->collection_template, NULL);
135 
136       if (!_switch_collection(self, collection))
137         {
138           mongoc_client_pool_push(owner->pool, self->client);
139           self->client = NULL;
140           return FALSE;
141         }
142 
143       g_string_assign(self->collection, collection);
144 
145       read_prefs = mongoc_collection_get_read_prefs(self->coll_obj);
146     }
147 
148 
149   if (!_check_server_status(self, read_prefs))
150     {
151       mongoc_collection_destroy(self->coll_obj);
152       self->coll_obj = NULL;
153       mongoc_client_pool_push(owner->pool, self->client);
154       self->client = NULL;
155       return FALSE;
156     }
157 
158   return TRUE;
159 }
160 
161 /*
162  * Worker thread
163  */
164 static gboolean
_vp_obj_start(const gchar * name,const gchar * prefix,gpointer * prefix_data,const gchar * prev,gpointer * prev_data,gpointer user_data)165 _vp_obj_start(const gchar *name,
166               const gchar *prefix, gpointer *prefix_data,
167               const gchar *prev, gpointer *prev_data,
168               gpointer user_data)
169 {
170   bson_t *o;
171 
172   if (prefix_data)
173     {
174       o = bson_new();
175       *prefix_data = o;
176     }
177   return FALSE;
178 }
179 
180 static gboolean
_vp_obj_end(const gchar * name,const gchar * prefix,gpointer * prefix_data,const gchar * prev,gpointer * prev_data,gpointer user_data)181 _vp_obj_end(const gchar *name,
182             const gchar *prefix, gpointer *prefix_data,
183             const gchar *prev, gpointer *prev_data,
184             gpointer user_data)
185 {
186   MongoDBDestWorker *self = (MongoDBDestWorker *) user_data;
187   bson_t *root;
188 
189   if (prev_data)
190     root = (bson_t *)*prev_data;
191   else
192     root = self->bson;
193 
194   if (prefix_data)
195     {
196       bson_t *d = (bson_t *)*prefix_data;
197 
198       bson_append_document(root, name, -1, d);
199       bson_destroy(d);
200     }
201   return FALSE;
202 }
203 
204 static gboolean
_vp_process_value(const gchar * name,const gchar * prefix,TypeHint type,const gchar * value,gsize value_len,gpointer * prefix_data,gpointer user_data)205 _vp_process_value(const gchar *name, const gchar *prefix, TypeHint type,
206                   const gchar *value, gsize value_len, gpointer *prefix_data, gpointer user_data)
207 {
208   bson_t *o;
209   MongoDBDestWorker *self = (MongoDBDestWorker *) user_data;
210   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
211 
212   gboolean fallback = owner->template_options.on_error & ON_ERROR_FALLBACK_TO_STRING;
213 
214   if (prefix_data)
215     o = (bson_t *)*prefix_data;
216   else
217     o = self->bson;
218 
219   switch (type)
220     {
221     case TYPE_HINT_BOOLEAN:
222     {
223       gboolean b;
224 
225       if (type_cast_to_boolean(value, &b, NULL))
226         bson_append_bool(o, name, -1, b);
227       else
228         {
229           gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "boolean");
230 
231           if (fallback)
232             bson_append_utf8(o, name, -1, value, value_len);
233           else
234             return r;
235         }
236       break;
237     }
238     case TYPE_HINT_INT32:
239     {
240       gint32 i;
241 
242       if (type_cast_to_int32(value, &i, NULL))
243         bson_append_int32(o, name, -1, i);
244       else
245         {
246           gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "int32");
247 
248           if (fallback)
249             bson_append_utf8(o, name, -1, value, value_len);
250           else
251             return r;
252         }
253       break;
254     }
255     case TYPE_HINT_INT64:
256     {
257       gint64 i;
258 
259       if (type_cast_to_int64(value, &i, NULL))
260         bson_append_int64(o, name, -1, i);
261       else
262         {
263           gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "int64");
264 
265           if (fallback)
266             bson_append_utf8(o, name, -1, value, value_len);
267           else
268             return r;
269         }
270 
271       break;
272     }
273     case TYPE_HINT_DOUBLE:
274     {
275       gdouble d;
276 
277       if (type_cast_to_double(value, &d, NULL))
278         bson_append_double(o, name, -1, d);
279       else
280         {
281           gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "double");
282           if (fallback)
283             bson_append_utf8(o, name, -1, value, value_len);
284           else
285             return r;
286         }
287 
288       break;
289     }
290     case TYPE_HINT_DATETIME:
291     {
292       guint64 i;
293 
294       if (type_cast_to_datetime_int(value, &i, NULL))
295         bson_append_date_time(o, name, -1, (gint64)i);
296       else
297         {
298           gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "datetime");
299 
300           if (fallback)
301             bson_append_utf8(o, name, -1, value, value_len);
302           else
303             return r;
304         }
305 
306       break;
307     }
308     case TYPE_HINT_STRING:
309     case TYPE_HINT_LITERAL:
310       bson_append_utf8(o, name, -1, value, value_len);
311       break;
312     default:
313       return TRUE;
314     }
315 
316   return FALSE;
317 }
318 
319 static LogThreadedResult
_worker_insert(LogThreadedDestWorker * s,LogMessage * msg)320 _worker_insert(LogThreadedDestWorker *s, LogMessage *msg)
321 {
322   MongoDBDestWorker *self = (MongoDBDestWorker *) s;
323   MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
324 
325   gboolean success;
326   gboolean drop_silently = owner->template_options.on_error & ON_ERROR_SILENT;
327 
328   bson_reinit(self->bson);
329 
330   LogTemplateEvalOptions options = {&owner->template_options, LTZ_SEND, self->super.seq_num, NULL};
331   success = value_pairs_walk(owner->vp,
332                              _vp_obj_start,
333                              _vp_process_value,
334                              _vp_obj_end,
335                              msg, &options,
336                              self);
337 
338   if (!success)
339     {
340       if (!drop_silently)
341         {
342           msg_error("Failed to format message for MongoDB, dropping message",
343                     evt_tag_value_pairs("message", owner->vp, msg, &options),
344                     evt_tag_str("driver", owner->super.super.super.id));
345         }
346       return LTR_DROP;
347     }
348 
349   msg_debug("Outgoing message to MongoDB destination",
350             evt_tag_value_pairs("message", owner->vp, msg, &options),
351             evt_tag_str("driver", owner->super.super.super.id));
352 
353 
354   if (!owner->collection_is_literal_string)
355     {
356       const gchar *new_collection = _format_collection_template(self, msg);
357       if (!_switch_collection(self, new_collection))
358         return LTR_ERROR;
359     }
360 
361   bson_error_t error;
362   success = mongoc_collection_insert(self->coll_obj, MONGOC_INSERT_NONE,
363                                      (const bson_t *)self->bson, NULL, &error);
364   if (!success)
365     {
366       if (error.domain == MONGOC_ERROR_STREAM)
367         {
368           msg_error("Network error while inserting into MongoDB",
369                     evt_tag_int("time_reopen", self->super.time_reopen),
370                     evt_tag_str("reason", error.message),
371                     evt_tag_str("driver", owner->super.super.super.id));
372           return LTR_NOT_CONNECTED;
373         }
374       else
375         {
376           msg_error("Failed to insert into MongoDB",
377                     evt_tag_int("time_reopen", self->super.time_reopen),
378                     evt_tag_str("reason", error.message),
379                     evt_tag_str("driver", owner->super.super.super.id));
380           return LTR_ERROR;
381         }
382     }
383 
384   return LTR_SUCCESS;
385 }
386 
387 static gboolean
_worker_thread_init(LogThreadedDestWorker * s)388 _worker_thread_init(LogThreadedDestWorker *s)
389 {
390   MongoDBDestWorker *self = (MongoDBDestWorker *) s;
391 
392   self->collection = g_string_sized_new(64);
393   self->bson = bson_sized_new(4096);
394 
395   return log_threaded_dest_worker_init_method(s);
396 }
397 
398 static void
_worker_thread_deinit(LogThreadedDestWorker * s)399 _worker_thread_deinit(LogThreadedDestWorker *s)
400 {
401   MongoDBDestWorker *self = (MongoDBDestWorker *) s;
402 
403   if (self->bson)
404     bson_destroy(self->bson);
405   self->bson = NULL;
406 
407   g_string_free(self->collection, TRUE);
408   self->collection = NULL;
409 
410   log_threaded_dest_worker_deinit_method(s);
411 }
412 
413 LogThreadedDestWorker *
afmongodb_dw_new(LogThreadedDestDriver * owner,gint worker_index)414 afmongodb_dw_new(LogThreadedDestDriver *owner, gint worker_index)
415 {
416   MongoDBDestWorker *self = g_new0(MongoDBDestWorker, 1);
417 
418   log_threaded_dest_worker_init_instance(&self->super, owner, worker_index);
419 
420   self->super.thread_init = _worker_thread_init;
421   self->super.thread_deinit = _worker_thread_deinit;
422   self->super.connect = _worker_connect;
423   self->super.disconnect = _worker_disconnect;
424   self->super.insert = _worker_insert;
425 
426   return &self->super;
427 }
428