1 /*
2 * Copyright (c) 2010-2021 One Identity
3 * Copyright (c) 2010-2014 Gergely Nagy <algernon@balabit.hu>
4 * Copyright (c) 2021 László Várady
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 as published
8 * by the Free Software Foundation, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "afmongodb-worker.h"
26 #include "afmongodb-private.h"
27 #include "messages.h"
28 #include "value-pairs/evttag.h"
29 #include "value-pairs/value-pairs.h"
30
31 static void
_worker_disconnect(LogThreadedDestWorker * s)32 _worker_disconnect(LogThreadedDestWorker *s)
33 {
34 MongoDBDestWorker *self = (MongoDBDestWorker *)s;
35 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
36
37 if (self->coll_obj)
38 mongoc_collection_destroy(self->coll_obj);
39 self->coll_obj = NULL;
40
41 if (self->client)
42 {
43 mongoc_client_pool_push(owner->pool, self->client);
44 self->client = NULL;
45 }
46 }
47
48 static const gchar *
_format_collection_template(MongoDBDestWorker * self,LogMessage * msg)49 _format_collection_template(MongoDBDestWorker *self, LogMessage *msg)
50 {
51 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
52
53 LogTemplateEvalOptions options = { &owner->template_options, LTZ_SEND, self->super.seq_num, NULL };
54 log_template_format(owner->collection_template, msg, &options, self->collection);
55
56 return self->collection->str;
57 }
58
59 static gboolean
_switch_collection(MongoDBDestWorker * self,const gchar * collection)60 _switch_collection(MongoDBDestWorker *self, const gchar *collection)
61 {
62 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
63
64 if (!self->client)
65 return FALSE;
66
67 if (self->coll_obj)
68 mongoc_collection_destroy(self->coll_obj);
69
70 self->coll_obj = mongoc_client_get_collection(self->client, owner->const_db, collection);
71
72 if (!self->coll_obj)
73 {
74 msg_error("Error getting specified MongoDB collection",
75 evt_tag_str("collection", collection),
76 evt_tag_str("driver", owner->super.super.super.id));
77
78 return FALSE;
79 }
80
81 msg_debug("Switching MongoDB collection", evt_tag_str("new_collection", collection));
82 return TRUE;
83 }
84
85 static gboolean
_check_server_status(MongoDBDestWorker * self,const mongoc_read_prefs_t * read_prefs)86 _check_server_status(MongoDBDestWorker *self, const mongoc_read_prefs_t *read_prefs)
87 {
88 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
89
90 bson_t reply;
91 bson_error_t error;
92
93 if (!self->client)
94 return FALSE;
95
96 bson_t *cmd = BCON_NEW("serverStatus", "1");
97 gboolean ok = mongoc_client_command_simple(self->client, owner->const_db ? : "", cmd, read_prefs, &reply, &error);
98 bson_destroy(&reply);
99 bson_destroy(cmd);
100
101 if (!ok)
102 {
103 msg_error("Error connecting to MongoDB",
104 evt_tag_str("driver", owner->super.super.super.id),
105 evt_tag_str("reason", error.message));
106 return FALSE;
107 }
108
109 return TRUE;
110 }
111
112 static gboolean
_worker_connect(LogThreadedDestWorker * s)113 _worker_connect(LogThreadedDestWorker *s)
114 {
115 MongoDBDestWorker *self = (MongoDBDestWorker *)s;
116 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
117
118 if (!self->client)
119 {
120 self->client = mongoc_client_pool_pop(owner->pool);
121
122 if (!self->client)
123 {
124 msg_error("Error creating MongoDB URI",
125 evt_tag_str("driver", owner->super.super.super.id));
126 return FALSE;
127 }
128 }
129
130 const mongoc_read_prefs_t *read_prefs = NULL;
131
132 if (owner->collection_is_literal_string && !self->coll_obj)
133 {
134 const gchar *collection = log_template_get_literal_value(owner->collection_template, NULL);
135
136 if (!_switch_collection(self, collection))
137 {
138 mongoc_client_pool_push(owner->pool, self->client);
139 self->client = NULL;
140 return FALSE;
141 }
142
143 g_string_assign(self->collection, collection);
144
145 read_prefs = mongoc_collection_get_read_prefs(self->coll_obj);
146 }
147
148
149 if (!_check_server_status(self, read_prefs))
150 {
151 mongoc_collection_destroy(self->coll_obj);
152 self->coll_obj = NULL;
153 mongoc_client_pool_push(owner->pool, self->client);
154 self->client = NULL;
155 return FALSE;
156 }
157
158 return TRUE;
159 }
160
161 /*
162 * Worker thread
163 */
164 static gboolean
_vp_obj_start(const gchar * name,const gchar * prefix,gpointer * prefix_data,const gchar * prev,gpointer * prev_data,gpointer user_data)165 _vp_obj_start(const gchar *name,
166 const gchar *prefix, gpointer *prefix_data,
167 const gchar *prev, gpointer *prev_data,
168 gpointer user_data)
169 {
170 bson_t *o;
171
172 if (prefix_data)
173 {
174 o = bson_new();
175 *prefix_data = o;
176 }
177 return FALSE;
178 }
179
180 static gboolean
_vp_obj_end(const gchar * name,const gchar * prefix,gpointer * prefix_data,const gchar * prev,gpointer * prev_data,gpointer user_data)181 _vp_obj_end(const gchar *name,
182 const gchar *prefix, gpointer *prefix_data,
183 const gchar *prev, gpointer *prev_data,
184 gpointer user_data)
185 {
186 MongoDBDestWorker *self = (MongoDBDestWorker *) user_data;
187 bson_t *root;
188
189 if (prev_data)
190 root = (bson_t *)*prev_data;
191 else
192 root = self->bson;
193
194 if (prefix_data)
195 {
196 bson_t *d = (bson_t *)*prefix_data;
197
198 bson_append_document(root, name, -1, d);
199 bson_destroy(d);
200 }
201 return FALSE;
202 }
203
204 static gboolean
_vp_process_value(const gchar * name,const gchar * prefix,TypeHint type,const gchar * value,gsize value_len,gpointer * prefix_data,gpointer user_data)205 _vp_process_value(const gchar *name, const gchar *prefix, TypeHint type,
206 const gchar *value, gsize value_len, gpointer *prefix_data, gpointer user_data)
207 {
208 bson_t *o;
209 MongoDBDestWorker *self = (MongoDBDestWorker *) user_data;
210 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
211
212 gboolean fallback = owner->template_options.on_error & ON_ERROR_FALLBACK_TO_STRING;
213
214 if (prefix_data)
215 o = (bson_t *)*prefix_data;
216 else
217 o = self->bson;
218
219 switch (type)
220 {
221 case TYPE_HINT_BOOLEAN:
222 {
223 gboolean b;
224
225 if (type_cast_to_boolean(value, &b, NULL))
226 bson_append_bool(o, name, -1, b);
227 else
228 {
229 gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "boolean");
230
231 if (fallback)
232 bson_append_utf8(o, name, -1, value, value_len);
233 else
234 return r;
235 }
236 break;
237 }
238 case TYPE_HINT_INT32:
239 {
240 gint32 i;
241
242 if (type_cast_to_int32(value, &i, NULL))
243 bson_append_int32(o, name, -1, i);
244 else
245 {
246 gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "int32");
247
248 if (fallback)
249 bson_append_utf8(o, name, -1, value, value_len);
250 else
251 return r;
252 }
253 break;
254 }
255 case TYPE_HINT_INT64:
256 {
257 gint64 i;
258
259 if (type_cast_to_int64(value, &i, NULL))
260 bson_append_int64(o, name, -1, i);
261 else
262 {
263 gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "int64");
264
265 if (fallback)
266 bson_append_utf8(o, name, -1, value, value_len);
267 else
268 return r;
269 }
270
271 break;
272 }
273 case TYPE_HINT_DOUBLE:
274 {
275 gdouble d;
276
277 if (type_cast_to_double(value, &d, NULL))
278 bson_append_double(o, name, -1, d);
279 else
280 {
281 gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "double");
282 if (fallback)
283 bson_append_utf8(o, name, -1, value, value_len);
284 else
285 return r;
286 }
287
288 break;
289 }
290 case TYPE_HINT_DATETIME:
291 {
292 guint64 i;
293
294 if (type_cast_to_datetime_int(value, &i, NULL))
295 bson_append_date_time(o, name, -1, (gint64)i);
296 else
297 {
298 gboolean r = type_cast_drop_helper(owner->template_options.on_error, value, "datetime");
299
300 if (fallback)
301 bson_append_utf8(o, name, -1, value, value_len);
302 else
303 return r;
304 }
305
306 break;
307 }
308 case TYPE_HINT_STRING:
309 case TYPE_HINT_LITERAL:
310 bson_append_utf8(o, name, -1, value, value_len);
311 break;
312 default:
313 return TRUE;
314 }
315
316 return FALSE;
317 }
318
319 static LogThreadedResult
_worker_insert(LogThreadedDestWorker * s,LogMessage * msg)320 _worker_insert(LogThreadedDestWorker *s, LogMessage *msg)
321 {
322 MongoDBDestWorker *self = (MongoDBDestWorker *) s;
323 MongoDBDestDriver *owner = (MongoDBDestDriver *) self->super.owner;
324
325 gboolean success;
326 gboolean drop_silently = owner->template_options.on_error & ON_ERROR_SILENT;
327
328 bson_reinit(self->bson);
329
330 LogTemplateEvalOptions options = {&owner->template_options, LTZ_SEND, self->super.seq_num, NULL};
331 success = value_pairs_walk(owner->vp,
332 _vp_obj_start,
333 _vp_process_value,
334 _vp_obj_end,
335 msg, &options,
336 self);
337
338 if (!success)
339 {
340 if (!drop_silently)
341 {
342 msg_error("Failed to format message for MongoDB, dropping message",
343 evt_tag_value_pairs("message", owner->vp, msg, &options),
344 evt_tag_str("driver", owner->super.super.super.id));
345 }
346 return LTR_DROP;
347 }
348
349 msg_debug("Outgoing message to MongoDB destination",
350 evt_tag_value_pairs("message", owner->vp, msg, &options),
351 evt_tag_str("driver", owner->super.super.super.id));
352
353
354 if (!owner->collection_is_literal_string)
355 {
356 const gchar *new_collection = _format_collection_template(self, msg);
357 if (!_switch_collection(self, new_collection))
358 return LTR_ERROR;
359 }
360
361 bson_error_t error;
362 success = mongoc_collection_insert(self->coll_obj, MONGOC_INSERT_NONE,
363 (const bson_t *)self->bson, NULL, &error);
364 if (!success)
365 {
366 if (error.domain == MONGOC_ERROR_STREAM)
367 {
368 msg_error("Network error while inserting into MongoDB",
369 evt_tag_int("time_reopen", self->super.time_reopen),
370 evt_tag_str("reason", error.message),
371 evt_tag_str("driver", owner->super.super.super.id));
372 return LTR_NOT_CONNECTED;
373 }
374 else
375 {
376 msg_error("Failed to insert into MongoDB",
377 evt_tag_int("time_reopen", self->super.time_reopen),
378 evt_tag_str("reason", error.message),
379 evt_tag_str("driver", owner->super.super.super.id));
380 return LTR_ERROR;
381 }
382 }
383
384 return LTR_SUCCESS;
385 }
386
387 static gboolean
_worker_thread_init(LogThreadedDestWorker * s)388 _worker_thread_init(LogThreadedDestWorker *s)
389 {
390 MongoDBDestWorker *self = (MongoDBDestWorker *) s;
391
392 self->collection = g_string_sized_new(64);
393 self->bson = bson_sized_new(4096);
394
395 return log_threaded_dest_worker_init_method(s);
396 }
397
398 static void
_worker_thread_deinit(LogThreadedDestWorker * s)399 _worker_thread_deinit(LogThreadedDestWorker *s)
400 {
401 MongoDBDestWorker *self = (MongoDBDestWorker *) s;
402
403 if (self->bson)
404 bson_destroy(self->bson);
405 self->bson = NULL;
406
407 g_string_free(self->collection, TRUE);
408 self->collection = NULL;
409
410 log_threaded_dest_worker_deinit_method(s);
411 }
412
413 LogThreadedDestWorker *
afmongodb_dw_new(LogThreadedDestDriver * owner,gint worker_index)414 afmongodb_dw_new(LogThreadedDestDriver *owner, gint worker_index)
415 {
416 MongoDBDestWorker *self = g_new0(MongoDBDestWorker, 1);
417
418 log_threaded_dest_worker_init_instance(&self->super, owner, worker_index);
419
420 self->super.thread_init = _worker_thread_init;
421 self->super.thread_deinit = _worker_thread_deinit;
422 self->super.connect = _worker_connect;
423 self->super.disconnect = _worker_disconnect;
424 self->super.insert = _worker_insert;
425
426 return &self->super;
427 }
428