1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "afinter.h"
26 #include "logreader.h"
27 #include "stats/stats-registry.h"
28 #include "messages.h"
29 #include "apphook.h"
30 #include "mainloop.h"
31
32 #include <iv_event.h>
33
34 typedef struct _AFInterSource AFInterSource;
35
36 static GMutex internal_msg_lock;
37 static GQueue *internal_msg_queue;
38 static AFInterSource *current_internal_source;
39 static StatsCounterItem *internal_queue_length;
40 static StatsCounterItem *internal_queue_dropped;
41
42 /* the expiration timer of the next MARK message */
43 static struct timespec next_mark_target = { -1, 0 };
44 /* as different sources from different threads can call afinter_postpone_mark,
45 and we use the value in the init thread, we need to synchronize the value references
46 */
47 static GMutex internal_mark_target_lock;
48
49 /*
50 * This class is parallel to LogReader, e.g. it hangs on the
51 * AFInterSourceDriver on the left (e.g. feeds messages to
52 * AFInterSourceDriver).
53 *
54 * Threading:
55 * ==========
56 *
57 * syslog-ng can generate internal() messages in any of its threads, thus
58 * some care must be taken to make the internal() source multithreaded.
59 * This is how it works:
60 *
61 * Whenever a thread decides to send a message using the msg_() API, it puts
62 * an entry into the internal_msg_queue under the protection of
63 * "internal_msg_lock".
64 *
65 * The receiving side of this queue is in the main thread, where the
66 * internal() source is operating. This object will publish a pointer to
67 * itself into current_internal_source. This pointer will be set under the
68 * protection of the internal_msg_lock. The internal source will define an
69 * ivykis event, a post is submitted to this event whenever a new message is
70 * added to the queue.
71 *
72 * Once the event arrives to the main loop, it wakes up and feeds all
73 * internal messages into the log path.
74 *
75 * If the window is depleted (e.g. flow control is enabled and the
76 * destination is unable to process any more messages),
77 * free_to_send will be set to FALSE, which means that messages
78 * will be added to the queue, but the wakeup will not be done.
79 *
80 * When the window becomes free, log_source_wakeup() is called, which
81 * restores the free_to_send flag (e.g. further messages will
82 * wake up the source) and also starts emptying the messages accumulated in the queue.
83 *
84 * Possible races:
85 * ===============
86 *
87 * The biggest offenders are when a client thread submits a message and
88 * we're in the process of getting asleep (no window), or waking up. There
89 * are notes in the code how we handle these cases (search for "Possible race").
90 *
91 */
92 struct _AFInterSource
93 {
94 LogSource super;
95 gint mark_freq;
96 const AFInterSourceOptions *options;
97 struct iv_event post;
98 struct iv_event schedule_wakeup;
99 struct iv_event exit;
100 struct iv_timer mark_timer;
101 struct iv_task restart_task;
102 gboolean watches_running:1, free_to_send:1;
103 };
104
105 static void afinter_source_update_watches(AFInterSource *self);
106 void afinter_message_posted(LogMessage *msg);
107
108 static void
afinter_source_post(gpointer s)109 afinter_source_post(gpointer s)
110 {
111 AFInterSource *self = (AFInterSource *) s;
112 LogMessage *msg;
113
114 while (log_source_free_to_send(&self->super))
115 {
116 g_mutex_lock(&internal_msg_lock);
117 msg = g_queue_pop_head(internal_msg_queue);
118 g_mutex_unlock(&internal_msg_lock);
119 if (!msg)
120 break;
121
122 stats_counter_dec(internal_queue_length);
123 log_source_post(&self->super, msg);
124 }
125 afinter_source_update_watches(self);
126 }
127
128 static void afinter_source_start_watches(AFInterSource *self);
129 static void afinter_source_stop_watches(AFInterSource *self);
130
131 static void
afinter_source_run(gpointer s)132 afinter_source_run(gpointer s)
133 {
134 AFInterSource *self = (AFInterSource *) s;
135
136 iv_init();
137
138 /* post event is used by other threads and can only be unregistered if
139 * current_afinter_source is set to NULL in a thread safe manner */
140 iv_event_register(&self->post);
141 iv_event_register(&self->schedule_wakeup);
142 iv_event_register(&self->exit);
143
144 g_mutex_lock(&internal_msg_lock);
145 self->free_to_send = TRUE;
146 g_mutex_unlock(&internal_msg_lock);
147
148 afinter_source_start_watches(self);
149 afinter_source_update_watches(self);
150
151 iv_main();
152
153 g_mutex_lock(&internal_msg_lock);
154 current_internal_source = NULL;
155 g_mutex_unlock(&internal_msg_lock);
156
157 iv_event_unregister(&self->exit);
158 iv_event_unregister(&self->post);
159 iv_event_unregister(&self->schedule_wakeup);
160
161 afinter_source_stop_watches(self);
162
163 iv_deinit();
164 }
165
166 static void
afinter_source_request_exit(gpointer s)167 afinter_source_request_exit(gpointer s)
168 {
169 AFInterSource *self = (AFInterSource *) s;
170
171 iv_event_post(&self->exit);
172 }
173
174
175 static gboolean
afinter_sd_start_thread(LogPipe * s)176 afinter_sd_start_thread(LogPipe *s)
177 {
178 AFInterSourceDriver *self = (AFInterSourceDriver *) s;
179
180 self->worker_options.is_external_input = TRUE;
181
182 main_loop_create_worker_thread((WorkerThreadFunc) afinter_source_run,
183 (WorkerExitNotificationFunc) afinter_source_request_exit,
184 self->source, &self->worker_options);
185
186 return TRUE;
187 }
188
189
190 static void
afinter_source_mark(gpointer s)191 afinter_source_mark(gpointer s)
192 {
193 AFInterSource *self = (AFInterSource *) s;
194 struct timespec nmt;
195
196 g_mutex_lock(&internal_mark_target_lock);
197 nmt = next_mark_target;
198 g_mutex_unlock(&internal_mark_target_lock);
199
200 if (nmt.tv_sec <= self->mark_timer.expires.tv_sec)
201 {
202 /* the internal_mark_target has not been overwritten by an incoming message in afinter_postpone_mark
203 (there was no msg in the meantime) -> the mark msg can be sent */
204 afinter_message_posted(log_msg_new_mark());
205
206 /* the next_mark_target will be increased in afinter_postpone_mark */
207 }
208 }
209
210 static void
afinter_source_wakeup(LogSource * s)211 afinter_source_wakeup(LogSource *s)
212 {
213 AFInterSource *self = (AFInterSource *) s;
214
215 /*
216 * We might get called even after this AFInterSource has been
217 * deinitialized, in which case we must not do anything (since the
218 * iv_event triggered here is not registered).
219 *
220 * This happens when log_writer_deinit() flushes its output queue
221 * after the internal source which produced the message has already been
222 * deinited. Since init/deinit calls are made in the main thread, no
223 * locking is needed.
224 *
225 */
226 if (self->super.super.flags & PIF_INITIALIZED)
227 iv_event_post(&self->schedule_wakeup);
228 }
229
230 static void
afinter_source_init_watches(AFInterSource * self)231 afinter_source_init_watches(AFInterSource *self)
232 {
233 IV_EVENT_INIT(&self->post);
234 self->post.cookie = self;
235 self->post.handler = afinter_source_post;
236 IV_TIMER_INIT(&self->mark_timer);
237 self->mark_timer.cookie = self;
238 self->mark_timer.handler = afinter_source_mark;
239 IV_EVENT_INIT(&self->schedule_wakeup);
240 self->schedule_wakeup.cookie = self;
241 self->schedule_wakeup.handler = (void (*)(void *)) afinter_source_update_watches;
242 IV_EVENT_INIT(&self->exit);
243 self->exit.cookie = NULL;
244 self->exit.handler = (void (*)(void *)) iv_quit;
245 IV_TASK_INIT(&self->restart_task);
246 self->restart_task.cookie = self;
247 self->restart_task.handler = afinter_source_post;
248 }
249
250 static void
afinter_source_start_watches(AFInterSource * self)251 afinter_source_start_watches(AFInterSource *self)
252 {
253 if (!self->watches_running)
254 {
255 if (self->mark_timer.expires.tv_sec >= 0)
256 iv_timer_register(&self->mark_timer);
257 self->watches_running = TRUE;
258 }
259 }
260
261 static void
afinter_source_stop_watches(AFInterSource * self)262 afinter_source_stop_watches(AFInterSource *self)
263 {
264 if (self->watches_running)
265 {
266 if (iv_task_registered(&self->restart_task))
267 iv_task_unregister(&self->restart_task);
268 if (iv_timer_registered(&self->mark_timer))
269 iv_timer_unregister(&self->mark_timer);
270 self->watches_running = FALSE;
271 }
272 }
273
274 static void
afinter_source_update_watches(AFInterSource * self)275 afinter_source_update_watches(AFInterSource *self)
276 {
277 if (!log_source_free_to_send(&self->super))
278 {
279 /* ok, we go to sleep now. let's disable the post event.
280 * Messages get accumulated into internal_msg_queue. */
281 g_mutex_lock(&internal_msg_lock);
282 self->free_to_send = FALSE;
283 g_mutex_unlock(&internal_msg_lock);
284
285 /* Possible race:
286 *
287 * After the check log_source_free_to_send() above, the destination
288 * may actually write out a message, thus by the time we get here, the
289 * window may have space again. This is taken care of by the fact
290 * that the wakeup is running in the main thread, which we do too. So
291 * the wakeup is either completely performed before we entered this
292 * function, or after we return.
293 *
294 * In case it happened earlier, the check above will find that we have
295 * window space, in case it's going to be happening afterwards, we
296 * will be woken up by the schedule_wakeup event (which calls
297 * update_watches again).
298 */
299
300 /* MARK events also get disabled */
301 afinter_source_stop_watches(self);
302 }
303 else
304 {
305 /* ok we can send our stuff. make sure we wake up */
306 afinter_source_stop_watches(self);
307 self->mark_timer.expires = next_mark_target;
308 afinter_source_start_watches(self);
309
310 /* Possible race:
311 *
312 * The free_to_send flag is set to FALSE here (in case
313 * we're just waking up). In case the sender submits a message, it'll
314 * not trigger the self->post (since free_to_send is FALSE). This is
315 * taken care of by the queue-length check in the locked region below.
316 * If the queue has elements, we need to wake up, because we may have
317 * lost a wakeup call. If it happens after the locked region, that
318 * doesn't matter, in that case we already set
319 * free_to_send = TRUE to ourselves, thus the post event will also
320 * be triggered.
321 */
322
323 g_mutex_lock(&internal_msg_lock);
324 if (internal_msg_queue && g_queue_get_length(internal_msg_queue) > 0)
325 iv_task_register(&self->restart_task);
326 self->free_to_send = TRUE;
327 g_mutex_unlock(&internal_msg_lock);
328 }
329 }
330
331 static gboolean
afinter_source_init(LogPipe * s)332 afinter_source_init(LogPipe *s)
333 {
334 AFInterSource *self = (AFInterSource *) s;
335 GlobalConfig *cfg = log_pipe_get_config(s);
336
337 if (!log_source_init(s))
338 return FALSE;
339
340 self->mark_freq = cfg->mark_freq;
341 afinter_postpone_mark(self->mark_freq);
342 self->mark_timer.expires = next_mark_target;
343
344 g_mutex_lock(&internal_msg_lock);
345 current_internal_source = self;
346 g_mutex_unlock(&internal_msg_lock);
347
348 return TRUE;
349 }
350
351 static gboolean
afinter_source_deinit(LogPipe * s)352 afinter_source_deinit(LogPipe *s)
353 {
354 AFInterSource *self = (AFInterSource *) s;
355
356 g_mutex_lock(&internal_msg_lock);
357 current_internal_source = NULL;
358 g_mutex_unlock(&internal_msg_lock);
359
360 return log_source_deinit(&self->super.super);
361 }
362
363 static LogSource *
afinter_source_new(AFInterSourceDriver * owner,AFInterSourceOptions * options)364 afinter_source_new(AFInterSourceDriver *owner, AFInterSourceOptions *options)
365 {
366 AFInterSource *self = g_new0(AFInterSource, 1);
367
368 log_source_init_instance(&self->super, owner->super.super.super.cfg);
369 log_source_set_options(&self->super, &options->super, owner->super.super.id, NULL, FALSE,
370 owner->super.super.super.expr_node);
371 afinter_source_init_watches(self);
372 self->super.super.init = afinter_source_init;
373 self->super.super.deinit = afinter_source_deinit;
374 self->super.wakeup = afinter_source_wakeup;
375
376 self->options = options;
377
378 return &self->super;
379 }
380
381
382 void
afinter_source_options_defaults(AFInterSourceOptions * options)383 afinter_source_options_defaults(AFInterSourceOptions *options)
384 {
385 log_source_options_defaults(&options->super);
386 options->queue_capacity = 10000;
387 }
388
389 static gboolean
afinter_sd_init(LogPipe * s)390 afinter_sd_init(LogPipe *s)
391 {
392 AFInterSourceDriver *self = (AFInterSourceDriver *) s;
393 GlobalConfig *cfg = log_pipe_get_config(s);
394
395 if (cfg_is_config_version_older(cfg, VERSION_VALUE_3_29))
396 {
397 msg_warning_once("WARNING: The internal_queue_length stat counter has been renamed to internal_source.queued. "
398 "The old name will be removed in future versions", cfg_format_config_version_tag(cfg));
399 }
400
401 if (!log_src_driver_init_method(s))
402 return FALSE;
403
404 if (current_internal_source != NULL)
405 {
406 msg_error("Multiple internal() sources were detected, this is not possible");
407 return FALSE;
408 }
409
410 log_source_options_init(&self->source_options.super, cfg, self->super.super.group);
411 self->source_options.super.stats_level = STATS_LEVEL0;
412 self->source_options.super.stats_source = stats_register_type("internal");
413 self->source = afinter_source_new(self, &self->source_options);
414 log_pipe_append(&self->source->super, s);
415
416 if (!log_pipe_init(&self->source->super))
417 {
418 log_pipe_unref(&self->source->super);
419 self->source = NULL;
420 return FALSE;
421 }
422
423 return TRUE;
424 }
425
426 static gboolean
afinter_sd_deinit(LogPipe * s)427 afinter_sd_deinit(LogPipe *s)
428 {
429 AFInterSourceDriver *self = (AFInterSourceDriver *) s;
430
431 if (self->source)
432 {
433 log_pipe_deinit(&self->source->super);
434 /* break circular reference created during _init */
435 log_pipe_unref(&self->source->super);
436 self->source = NULL;
437 }
438
439 if (!log_src_driver_deinit_method(s))
440 return FALSE;
441
442 return TRUE;
443 }
444
445 static void
afinter_sd_free(LogPipe * s)446 afinter_sd_free(LogPipe *s)
447 {
448 AFInterSourceDriver *self = (AFInterSourceDriver *) s;
449
450 g_assert(!self->source);
451 log_src_driver_free(s);
452 }
453
454
455 LogDriver *
afinter_sd_new(GlobalConfig * cfg)456 afinter_sd_new(GlobalConfig *cfg)
457 {
458 AFInterSourceDriver *self = g_new0(AFInterSourceDriver, 1);
459
460 log_src_driver_init_instance((LogSrcDriver *)&self->super, cfg);
461 self->super.super.super.init = afinter_sd_init;
462 self->super.super.super.deinit = afinter_sd_deinit;
463 self->super.super.super.free_fn = afinter_sd_free;
464 self->super.super.super.on_config_inited = afinter_sd_start_thread;
465
466 afinter_source_options_defaults(&self->source_options);
467
468 return (LogDriver *)&self->super.super;
469 }
470
471 /****************************************************************************
472 * Global entry points, without an AFInterSourceDriver instance.
473 ****************************************************************************/
474
475 void
afinter_postpone_mark(gint mark_freq)476 afinter_postpone_mark(gint mark_freq)
477 {
478 if (mark_freq > 0)
479 {
480 iv_validate_now();
481 g_mutex_lock(&internal_mark_target_lock);
482 next_mark_target = iv_now;
483 next_mark_target.tv_sec += mark_freq;
484 g_mutex_unlock(&internal_mark_target_lock);
485 }
486 else
487 {
488 g_mutex_lock(&internal_mark_target_lock);
489 next_mark_target.tv_sec = -1;
490 g_mutex_unlock(&internal_mark_target_lock);
491 }
492 }
493
494 static void
_release_internal_msg_queue(void)495 _release_internal_msg_queue(void)
496 {
497 LogMessage *internal_message = g_queue_pop_head(internal_msg_queue);
498 while (internal_message)
499 {
500 stats_counter_dec(internal_queue_length);
501 log_msg_unref(internal_message);
502
503 internal_message = g_queue_pop_head(internal_msg_queue);
504 }
505 g_queue_free(internal_msg_queue);
506 internal_msg_queue = NULL;
507 }
508
509 static inline void
_register_obsolete_stats_alias(StatsCounterItem * internal_queued_ctr)510 _register_obsolete_stats_alias(StatsCounterItem *internal_queued_ctr)
511 {
512 stats_lock();
513 StatsClusterKey sc_key;
514 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_queue_length", NULL);
515 stats_register_alias_counter(0, &sc_key, SC_TYPE_PROCESSED, internal_queued_ctr);
516 stats_unlock();
517 }
518
519 static inline void
_unregister_obsolete_stats_alias(StatsCounterItem * internal_queued_ctr)520 _unregister_obsolete_stats_alias(StatsCounterItem *internal_queued_ctr)
521 {
522 stats_lock();
523 StatsClusterKey sc_key;
524 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_queue_length", NULL);
525 stats_unregister_alias_counter(&sc_key, SC_TYPE_PROCESSED, internal_queued_ctr);
526 stats_unlock();
527 }
528
529 void
afinter_message_posted(LogMessage * msg)530 afinter_message_posted(LogMessage *msg)
531 {
532 g_mutex_lock(&internal_msg_lock);
533 if (!current_internal_source)
534 {
535 if (internal_msg_queue)
536 {
537 _release_internal_msg_queue();
538 }
539 log_msg_unref(msg);
540 goto exit;
541 }
542
543 if (!internal_msg_queue)
544 {
545 internal_msg_queue = g_queue_new();
546
547 stats_lock();
548 StatsClusterKey sc_key;
549 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_source", NULL );
550 stats_register_counter(0, &sc_key, SC_TYPE_QUEUED, &internal_queue_length);
551 stats_register_counter(0, &sc_key, SC_TYPE_DROPPED, &internal_queue_dropped);
552 stats_unlock();
553
554 _register_obsolete_stats_alias(internal_queue_length);
555 }
556
557 if (g_queue_get_length(internal_msg_queue) >= current_internal_source->options->queue_capacity)
558 {
559 stats_counter_inc(internal_queue_dropped);
560 log_msg_unref(msg);
561 goto exit;
562 }
563
564 g_queue_push_tail(internal_msg_queue, msg);
565 stats_counter_inc(internal_queue_length);
566
567 if (current_internal_source->free_to_send)
568 iv_event_post(¤t_internal_source->post);
569 exit:
570 g_mutex_unlock(&internal_msg_lock);
571 }
572
573 static void
afinter_register_posted_hook(gint hook_type,gpointer user_data)574 afinter_register_posted_hook(gint hook_type, gpointer user_data)
575 {
576 msg_set_post_func(afinter_message_posted);
577 }
578
579 void
afinter_global_init(void)580 afinter_global_init(void)
581 {
582 register_application_hook(AH_CONFIG_CHANGED, afinter_register_posted_hook, NULL, AHM_RUN_ONCE);
583 }
584
585 void
afinter_global_deinit(void)586 afinter_global_deinit(void)
587 {
588 if (internal_msg_queue)
589 {
590 _unregister_obsolete_stats_alias(internal_queue_length);
591
592 stats_lock();
593 StatsClusterKey sc_key;
594 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_source", NULL );
595 stats_unregister_counter(&sc_key, SC_TYPE_QUEUED, &internal_queue_length);
596 stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &internal_queue_dropped);
597 stats_unlock();
598 g_queue_free_full(internal_msg_queue, (GDestroyNotify)log_msg_unref);
599 internal_msg_queue = NULL;
600 }
601 current_internal_source = NULL;
602 }
603