1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "driver.h"
26 #include "logqueue-fifo.h"
27 #include "afinter.h"
28 #include "cfg-tree.h"
29 #include "messages.h"
30
31 #include <string.h>
32
33 /* LogDriverPlugin */
34
35 void
log_driver_plugin_free_method(LogDriverPlugin * self)36 log_driver_plugin_free_method(LogDriverPlugin *self)
37 {
38 g_free(self);
39 }
40
41 void
log_driver_plugin_init_instance(LogDriverPlugin * self,const gchar * name)42 log_driver_plugin_init_instance(LogDriverPlugin *self, const gchar *name)
43 {
44 self->name = name;
45 self->free_fn = log_driver_plugin_free_method;
46 }
47
48 /* LogDriver */
49
50 gboolean
log_driver_add_plugin(LogDriver * self,LogDriverPlugin * plugin)51 log_driver_add_plugin(LogDriver *self, LogDriverPlugin *plugin)
52 {
53 g_assert(plugin->name);
54 if (log_driver_lookup_plugin(self, plugin->name))
55 {
56 msg_error("Another instance of this plugin is registered in this driver, unable to register plugin again",
57 evt_tag_str("driver", self->id),
58 evt_tag_str("plugin", plugin->name));
59 return FALSE;
60
61 }
62 self->plugins = g_list_append(self->plugins, plugin);
63 return TRUE;
64 }
65
66 LogDriverPlugin *
log_driver_lookup_plugin(LogDriver * self,const gchar * plugin_name)67 log_driver_lookup_plugin(LogDriver *self, const gchar *plugin_name)
68 {
69 for (GList *l = self->plugins; l; l = l->next)
70 {
71 LogDriverPlugin *plugin = (LogDriverPlugin *) l->data;
72
73 if (strcmp(plugin->name, plugin_name) == 0)
74 return plugin;
75 }
76 return NULL;
77 }
78
79 static gboolean
log_driver_pre_init_method(LogPipe * s)80 log_driver_pre_init_method(LogPipe *s)
81 {
82 LogDriver *self = (LogDriver *) s;
83 gboolean success = TRUE;
84 GList *l;
85
86 for (l = self->plugins; l; l = l->next)
87 {
88 if (!log_driver_plugin_attach((LogDriverPlugin *) l->data, self))
89 success = FALSE;
90 }
91 return success;
92 }
93
94 static void
log_driver_post_deinit_method(LogPipe * s)95 log_driver_post_deinit_method(LogPipe *s)
96 {
97 LogDriver *self = (LogDriver *) s;
98 GList *l;
99
100 for (l = self->plugins; l; l = l->next)
101 {
102 log_driver_plugin_detach((LogDriverPlugin *) l->data, self);
103 }
104 }
105
106 gboolean
log_driver_init_method(LogPipe * s)107 log_driver_init_method(LogPipe *s)
108 {
109 return TRUE;
110 }
111
112 gboolean
log_driver_deinit_method(LogPipe * s)113 log_driver_deinit_method(LogPipe *s)
114 {
115 return TRUE;
116 }
117
118 /* NOTE: intentionally static, as only cDriver or LogDestDriver will derive from LogDriver */
119 static void
log_driver_free(LogPipe * s)120 log_driver_free(LogPipe *s)
121 {
122 LogDriver *self = (LogDriver *) s;
123 GList *l;
124
125 for (l = self->plugins; l; l = l->next)
126 {
127 log_driver_plugin_free((LogDriverPlugin *) l->data);
128 }
129 if (self->plugins)
130 {
131 g_list_free(self->plugins);
132 }
133 if (self->group)
134 g_free(self->group);
135 if (self->id)
136 g_free(self->id);
137 log_pipe_free_method(s);
138 }
139
140 /* NOTE: intentionally static, as only LogSrcDriver or LogDestDriver will derive from LogDriver */
141 static void
log_driver_init_instance(LogDriver * self,GlobalConfig * cfg)142 log_driver_init_instance(LogDriver *self, GlobalConfig *cfg)
143 {
144 log_pipe_init_instance(&self->super, cfg);
145 self->super.free_fn = log_driver_free;
146 self->super.pre_init = log_driver_pre_init_method;
147 self->super.init = log_driver_init_method;
148 self->super.deinit = log_driver_deinit_method;
149 self->super.post_deinit = log_driver_post_deinit_method;
150 }
151
152 /* LogSrcDriver */
153
154 static gboolean
log_src_driver_pre_init_method(LogPipe * s)155 log_src_driver_pre_init_method(LogPipe *s)
156 {
157 LogSrcDriver *self = (LogSrcDriver *) s;
158 GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
159
160 if (!self->super.group)
161 {
162 self->super.group = cfg_tree_get_rule_name(&cfg->tree, ENC_SOURCE, s->expr_node);
163 self->group_len = strlen(self->super.group);
164 self->super.id = cfg_tree_get_child_id(&cfg->tree, ENC_SOURCE, s->expr_node);
165 }
166 return log_driver_pre_init_method(s);
167 }
168
169 gboolean
log_src_driver_init_method(LogPipe * s)170 log_src_driver_init_method(LogPipe *s)
171 {
172 LogSrcDriver *self = (LogSrcDriver *) s;
173
174 if (!log_driver_init_method(s))
175 return FALSE;
176
177 stats_lock();
178 StatsClusterKey sc_key;
179 stats_cluster_logpipe_key_set(&sc_key, SCS_SOURCE | SCS_GROUP, self->super.group, NULL );
180 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED,
181 &self->super.processed_group_messages);
182 stats_cluster_logpipe_key_set(&sc_key, SCS_CENTER, NULL, "received" );
183 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &self->received_global_messages);
184 stats_unlock();
185
186 return TRUE;
187 }
188
189 gboolean
log_src_driver_deinit_method(LogPipe * s)190 log_src_driver_deinit_method(LogPipe *s)
191 {
192 LogSrcDriver *self = (LogSrcDriver *) s;
193
194 if (!log_driver_deinit_method(s))
195 return FALSE;
196
197 stats_lock();
198 StatsClusterKey sc_key;
199 stats_cluster_logpipe_key_set(&sc_key, SCS_SOURCE | SCS_GROUP, self->super.group, NULL );
200 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED,
201 &self->super.processed_group_messages);
202 stats_cluster_logpipe_key_set(&sc_key, SCS_CENTER, NULL, "received" );
203 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->received_global_messages);
204 stats_unlock();
205 return TRUE;
206 }
207
208 void
log_src_driver_queue_method(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)209 log_src_driver_queue_method(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
210 {
211 LogSrcDriver *self = (LogSrcDriver *) s;
212 GlobalConfig *cfg = log_pipe_get_config(s);
213
214 /* $SOURCE */
215
216 if (msg->flags & LF_LOCAL)
217 afinter_postpone_mark(cfg->mark_freq);
218
219 log_msg_set_value(msg, LM_V_SOURCE, self->super.group, self->group_len);
220 stats_counter_inc(self->super.processed_group_messages);
221 stats_counter_inc(self->received_global_messages);
222 log_pipe_forward_msg(s, msg, path_options);
223 }
224
225 void
log_src_driver_init_instance(LogSrcDriver * self,GlobalConfig * cfg)226 log_src_driver_init_instance(LogSrcDriver *self, GlobalConfig *cfg)
227 {
228 log_driver_init_instance(&self->super, cfg);
229 self->super.super.pre_init = log_src_driver_pre_init_method;
230 self->super.super.init = log_src_driver_init_method;
231 self->super.super.deinit = log_src_driver_deinit_method;
232 self->super.super.queue = log_src_driver_queue_method;
233 self->super.super.flags |= PIF_SOURCE;
234 }
235
236 void
log_src_driver_free(LogPipe * s)237 log_src_driver_free(LogPipe *s)
238 {
239 log_driver_free(s);
240 }
241
242 /* LogDestDriver */
243
244 static LogQueue *
_create_memory_queue(LogDestDriver * self,const gchar * persist_name)245 _create_memory_queue(LogDestDriver *self, const gchar *persist_name)
246 {
247 GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
248
249 gint log_fifo_size = self->log_fifo_size < 0 ? cfg->log_fifo_size : self->log_fifo_size;
250
251 if (cfg_is_config_version_older(cfg, VERSION_VALUE_3_22))
252 {
253 msg_warning_once("WARNING: log-fifo-size() works differently starting with " VERSION_3_22 " to avoid dropping "
254 "flow-controlled messages when log-fifo-size() is misconfigured. From now on, log-fifo-size() "
255 "only affects messages that are not flow-controlled. (Flow-controlled log paths have the "
256 "flags(flow-control) option set.) To enable the new behaviour, update the @version string in "
257 "your configuration and consider lowering the value of log-fifo-size().");
258
259 return log_queue_fifo_legacy_new(log_fifo_size, persist_name);
260 }
261
262 return log_queue_fifo_new(log_fifo_size, persist_name);
263 }
264
265 /* returns a reference */
266 static LogQueue *
log_dest_driver_acquire_memory_queue(LogDestDriver * self,const gchar * persist_name)267 log_dest_driver_acquire_memory_queue(LogDestDriver *self, const gchar *persist_name)
268 {
269 GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
270 LogQueue *queue = NULL;
271
272 if (persist_name)
273 queue = cfg_persist_config_fetch(cfg, persist_name);
274
275 if (queue && !log_queue_has_type(queue, log_queue_fifo_get_type()))
276 {
277 log_queue_unref(queue);
278 queue = NULL;
279 }
280
281 if (!queue)
282 {
283 queue = _create_memory_queue(self, persist_name);
284 log_queue_set_throttle(queue, self->throttle);
285 }
286 return queue;
287 }
288
289 /* consumes the reference in @q */
290 static void
log_dest_driver_release_queue_method(LogDestDriver * self,LogQueue * q)291 log_dest_driver_release_queue_method(LogDestDriver *self, LogQueue *q)
292 {
293 GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
294
295 /* we only save the LogQueue instance if it contains data */
296 if (q->persist_name && log_queue_keep_on_reload(q) > 0)
297 {
298 cfg_persist_config_add(cfg, q->persist_name, q, (GDestroyNotify) log_queue_unref, FALSE);
299 }
300 else
301 {
302 log_queue_unref(q);
303 }
304 }
305
306 void
log_dest_driver_queue_method(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)307 log_dest_driver_queue_method(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
308 {
309 LogDestDriver *self = (LogDestDriver *) s;
310
311 stats_counter_inc(self->super.processed_group_messages);
312 stats_counter_inc(self->queued_global_messages);
313 log_pipe_forward_msg(s, msg, path_options);
314 }
315
316 static gboolean
log_dest_driver_pre_init_method(LogPipe * s)317 log_dest_driver_pre_init_method(LogPipe *s)
318 {
319 LogDestDriver *self = (LogDestDriver *) s;
320 GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
321
322 if (!self->super.group)
323 {
324 self->super.group = cfg_tree_get_rule_name(&cfg->tree, ENC_DESTINATION, s->expr_node);
325 self->super.id = cfg_tree_get_child_id(&cfg->tree, ENC_DESTINATION, s->expr_node);
326 }
327 return log_driver_pre_init_method(s);
328 }
329
330 gboolean
log_dest_driver_init_method(LogPipe * s)331 log_dest_driver_init_method(LogPipe *s)
332 {
333 LogDestDriver *self = (LogDestDriver *) s;
334
335 if (!log_driver_init_method(s))
336 return FALSE;
337
338 stats_lock();
339 StatsClusterKey sc_key;
340 stats_cluster_logpipe_key_set(&sc_key, SCS_DESTINATION | SCS_GROUP, self->super.group, NULL );
341 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED,
342 &self->super.processed_group_messages);
343 stats_cluster_logpipe_key_set(&sc_key, SCS_CENTER, NULL, "queued" );
344 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &self->queued_global_messages);
345 stats_unlock();
346
347 return TRUE;
348 }
349
350 gboolean
log_dest_driver_deinit_method(LogPipe * s)351 log_dest_driver_deinit_method(LogPipe *s)
352 {
353 LogDestDriver *self = (LogDestDriver *) s;
354 GList *l, *l_next;
355
356 for (l = self->queues; l; l = l_next)
357 {
358 LogQueue *q = (LogQueue *) l->data;
359
360 /* the GList struct will be freed by log_dest_driver_release_queue */
361 l_next = l->next;
362
363 /* we have to pass a reference to log_dest_driver_release_queue(),
364 * which automatically frees the ref on the list too */
365 log_dest_driver_release_queue(self, log_queue_ref(q));
366 }
367 g_assert(self->queues == NULL);
368
369 stats_lock();
370 StatsClusterKey sc_key;
371 stats_cluster_logpipe_key_set(&sc_key, SCS_DESTINATION | SCS_GROUP, self->super.group, NULL );
372 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED,
373 &self->super.processed_group_messages);
374 stats_cluster_logpipe_key_set(&sc_key, SCS_CENTER, NULL, "queued" );
375 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->queued_global_messages);
376 stats_unlock();
377
378 if (!log_driver_deinit_method(s))
379 return FALSE;
380 return TRUE;
381 }
382
383 void
log_dest_driver_init_instance(LogDestDriver * self,GlobalConfig * cfg)384 log_dest_driver_init_instance(LogDestDriver *self, GlobalConfig *cfg)
385 {
386 log_driver_init_instance(&self->super, cfg);
387 self->super.super.pre_init = log_dest_driver_pre_init_method;
388 self->super.super.init = log_dest_driver_init_method;
389 self->super.super.deinit = log_dest_driver_deinit_method;
390 self->super.super.queue = log_dest_driver_queue_method;
391 self->acquire_queue = log_dest_driver_acquire_memory_queue;
392 self->release_queue = log_dest_driver_release_queue_method;
393 self->log_fifo_size = -1;
394 self->throttle = 0;
395 }
396
397 void
log_dest_driver_free(LogPipe * s)398 log_dest_driver_free(LogPipe *s)
399 {
400 LogDestDriver *self = (LogDestDriver *) s;
401 GList *l;
402
403 for (l = self->queues; l; l = l->next)
404 {
405 log_queue_unref((LogQueue *) l->data);
406 }
407 g_list_free(self->queues);
408 log_driver_free(s);
409 }
410