1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "afinter.h"
26 #include "logreader.h"
27 #include "stats/stats-registry.h"
28 #include "messages.h"
29 #include "apphook.h"
30 #include "mainloop.h"
31 
32 #include <iv_event.h>
33 
34 typedef struct _AFInterSource AFInterSource;
35 
36 static GMutex internal_msg_lock;
37 static GQueue *internal_msg_queue;
38 static AFInterSource *current_internal_source;
39 static StatsCounterItem *internal_queue_length;
40 static StatsCounterItem *internal_queue_dropped;
41 
42 /* the expiration timer of the next MARK message */
43 static struct timespec next_mark_target = { -1, 0 };
44 /* as different sources from different threads can call afinter_postpone_mark,
45    and we use the value in the init thread, we need to synchronize the value references
46 */
47 static GMutex internal_mark_target_lock;
48 
49 /*
50  * This class is parallel to LogReader, e.g. it hangs on the
51  * AFInterSourceDriver on the left (e.g.  feeds messages to
52  * AFInterSourceDriver).
53  *
54  * Threading:
55  * ==========
56  *
57  * syslog-ng can generate internal() messages in any of its threads, thus
58  * some care must be taken to make the internal() source multithreaded.
59  * This is how it works:
60  *
61  * Whenever a thread decides to send a message using the msg_() API, it puts
62  * an entry into the internal_msg_queue under the protection of
63  * "internal_msg_lock".
64  *
65  * The receiving side of this queue is in the main thread, where the
66  * internal() source is operating.  This object will publish a pointer to
67  * itself into current_internal_source.  This pointer will be set under the
68  * protection of the internal_msg_lock.  The internal source will define an
69  * ivykis event, a post is submitted to this event whenever a new message is
70  * added to the queue.
71  *
72  * Once the event arrives to the main loop, it wakes up and feeds all
73  * internal messages into the log path.
74  *
75  * If the window is depleted (e.g. flow control is enabled and the
76  * destination is unable to process any more messages),
77  * free_to_send will be set to FALSE, which means that messages
78  * will be added to the queue, but the wakeup will not be done.
79  *
80  * When the window becomes free, log_source_wakeup() is called, which
81  * restores the free_to_send flag (e.g.  further messages will
82  * wake up the source) and also starts emptying the messages accumulated in the queue.
83  *
84  * Possible races:
85  * ===============
86  *
87  * The biggest offenders are when a client thread submits a message and
88  * we're in the process of getting asleep (no window), or waking up.  There
89  * are notes in the code how we handle these cases (search for "Possible race").
90  *
91  */
92 struct _AFInterSource
93 {
94   LogSource super;
95   gint mark_freq;
96   const AFInterSourceOptions *options;
97   struct iv_event post;
98   struct iv_event schedule_wakeup;
99   struct iv_event exit;
100   struct iv_timer mark_timer;
101   struct iv_task restart_task;
102   gboolean watches_running:1, free_to_send:1;
103 };
104 
105 static void afinter_source_update_watches(AFInterSource *self);
106 void afinter_message_posted(LogMessage *msg);
107 
108 static void
afinter_source_post(gpointer s)109 afinter_source_post(gpointer s)
110 {
111   AFInterSource *self = (AFInterSource *) s;
112   LogMessage *msg;
113 
114   while (log_source_free_to_send(&self->super))
115     {
116       g_mutex_lock(&internal_msg_lock);
117       msg = g_queue_pop_head(internal_msg_queue);
118       g_mutex_unlock(&internal_msg_lock);
119       if (!msg)
120         break;
121 
122       stats_counter_dec(internal_queue_length);
123       log_source_post(&self->super, msg);
124     }
125   afinter_source_update_watches(self);
126 }
127 
128 static void afinter_source_start_watches(AFInterSource *self);
129 static void afinter_source_stop_watches(AFInterSource *self);
130 
131 static void
afinter_source_run(gpointer s)132 afinter_source_run(gpointer s)
133 {
134   AFInterSource *self = (AFInterSource *) s;
135 
136   iv_init();
137 
138   /* post event is used by other threads and can only be unregistered if
139    * current_afinter_source is set to NULL in a thread safe manner */
140   iv_event_register(&self->post);
141   iv_event_register(&self->schedule_wakeup);
142   iv_event_register(&self->exit);
143 
144   g_mutex_lock(&internal_msg_lock);
145   self->free_to_send = TRUE;
146   g_mutex_unlock(&internal_msg_lock);
147 
148   afinter_source_start_watches(self);
149   afinter_source_update_watches(self);
150 
151   iv_main();
152 
153   g_mutex_lock(&internal_msg_lock);
154   current_internal_source = NULL;
155   g_mutex_unlock(&internal_msg_lock);
156 
157   iv_event_unregister(&self->exit);
158   iv_event_unregister(&self->post);
159   iv_event_unregister(&self->schedule_wakeup);
160 
161   afinter_source_stop_watches(self);
162 
163   iv_deinit();
164 }
165 
166 static void
afinter_source_request_exit(gpointer s)167 afinter_source_request_exit(gpointer s)
168 {
169   AFInterSource *self = (AFInterSource *) s;
170 
171   iv_event_post(&self->exit);
172 }
173 
174 
175 static gboolean
afinter_sd_start_thread(LogPipe * s)176 afinter_sd_start_thread(LogPipe *s)
177 {
178   AFInterSourceDriver *self = (AFInterSourceDriver *) s;
179 
180   self->worker_options.is_external_input = TRUE;
181 
182   main_loop_create_worker_thread((WorkerThreadFunc) afinter_source_run,
183                                  (WorkerExitNotificationFunc) afinter_source_request_exit,
184                                  self->source, &self->worker_options);
185 
186   return TRUE;
187 }
188 
189 
190 static void
afinter_source_mark(gpointer s)191 afinter_source_mark(gpointer s)
192 {
193   AFInterSource *self = (AFInterSource *) s;
194   struct timespec nmt;
195 
196   g_mutex_lock(&internal_mark_target_lock);
197   nmt = next_mark_target;
198   g_mutex_unlock(&internal_mark_target_lock);
199 
200   if (nmt.tv_sec <= self->mark_timer.expires.tv_sec)
201     {
202       /* the internal_mark_target has not been overwritten by an incoming message in afinter_postpone_mark
203          (there was no msg in the meantime) -> the mark msg can be sent */
204       afinter_message_posted(log_msg_new_mark());
205 
206       /* the next_mark_target will be increased in afinter_postpone_mark */
207     }
208 }
209 
210 static void
afinter_source_wakeup(LogSource * s)211 afinter_source_wakeup(LogSource *s)
212 {
213   AFInterSource *self = (AFInterSource *) s;
214 
215   /*
216    * We might get called even after this AFInterSource has been
217    * deinitialized, in which case we must not do anything (since the
218    * iv_event triggered here is not registered).
219    *
220    * This happens when log_writer_deinit() flushes its output queue
221    * after the internal source which produced the message has already been
222    * deinited. Since init/deinit calls are made in the main thread, no
223    * locking is needed.
224    *
225    */
226   if (self->super.super.flags & PIF_INITIALIZED)
227     iv_event_post(&self->schedule_wakeup);
228 }
229 
230 static void
afinter_source_init_watches(AFInterSource * self)231 afinter_source_init_watches(AFInterSource *self)
232 {
233   IV_EVENT_INIT(&self->post);
234   self->post.cookie = self;
235   self->post.handler = afinter_source_post;
236   IV_TIMER_INIT(&self->mark_timer);
237   self->mark_timer.cookie = self;
238   self->mark_timer.handler = afinter_source_mark;
239   IV_EVENT_INIT(&self->schedule_wakeup);
240   self->schedule_wakeup.cookie = self;
241   self->schedule_wakeup.handler = (void (*)(void *)) afinter_source_update_watches;
242   IV_EVENT_INIT(&self->exit);
243   self->exit.cookie = NULL;
244   self->exit.handler = (void (*)(void *)) iv_quit;
245   IV_TASK_INIT(&self->restart_task);
246   self->restart_task.cookie = self;
247   self->restart_task.handler = afinter_source_post;
248 }
249 
250 static void
afinter_source_start_watches(AFInterSource * self)251 afinter_source_start_watches(AFInterSource *self)
252 {
253   if (!self->watches_running)
254     {
255       if (self->mark_timer.expires.tv_sec >= 0)
256         iv_timer_register(&self->mark_timer);
257       self->watches_running = TRUE;
258     }
259 }
260 
261 static void
afinter_source_stop_watches(AFInterSource * self)262 afinter_source_stop_watches(AFInterSource *self)
263 {
264   if (self->watches_running)
265     {
266       if (iv_task_registered(&self->restart_task))
267         iv_task_unregister(&self->restart_task);
268       if (iv_timer_registered(&self->mark_timer))
269         iv_timer_unregister(&self->mark_timer);
270       self->watches_running = FALSE;
271     }
272 }
273 
274 static void
afinter_source_update_watches(AFInterSource * self)275 afinter_source_update_watches(AFInterSource *self)
276 {
277   if (!log_source_free_to_send(&self->super))
278     {
279       /* ok, we go to sleep now. let's disable the post event.
280        * Messages get accumulated into internal_msg_queue.  */
281       g_mutex_lock(&internal_msg_lock);
282       self->free_to_send = FALSE;
283       g_mutex_unlock(&internal_msg_lock);
284 
285       /* Possible race:
286        *
287        * After the check log_source_free_to_send() above, the destination
288        * may actually write out a message, thus by the time we get here, the
289        * window may have space again.  This is taken care of by the fact
290        * that the wakeup is running in the main thread, which we do too.  So
291        * the wakeup is either completely performed before we entered this
292        * function, or after we return.
293        *
294        * In case it happened earlier, the check above will find that we have
295        * window space, in case it's going to be happening afterwards, we
296        * will be woken up by the schedule_wakeup event (which calls
297        * update_watches again).
298        */
299 
300       /* MARK events also get disabled */
301       afinter_source_stop_watches(self);
302     }
303   else
304     {
305       /* ok we can send our stuff. make sure we wake up */
306       afinter_source_stop_watches(self);
307       self->mark_timer.expires = next_mark_target;
308       afinter_source_start_watches(self);
309 
310       /* Possible race:
311        *
312        * The free_to_send flag is set to FALSE here (in case
313        * we're just waking up).  In case the sender submits a message, it'll
314        * not trigger the self->post (since free_to_send is FALSE).  This is
315        * taken care of by the queue-length check in the locked region below.
316        * If the queue has elements, we need to wake up, because we may have
317        * lost a wakeup call.  If it happens after the locked region, that
318        * doesn't matter, in that case we already set
319        * free_to_send = TRUE to ourselves, thus the post event will also
320        * be triggered.
321        */
322 
323       g_mutex_lock(&internal_msg_lock);
324       if (internal_msg_queue && g_queue_get_length(internal_msg_queue) > 0)
325         iv_task_register(&self->restart_task);
326       self->free_to_send = TRUE;
327       g_mutex_unlock(&internal_msg_lock);
328     }
329 }
330 
331 static gboolean
afinter_source_init(LogPipe * s)332 afinter_source_init(LogPipe *s)
333 {
334   AFInterSource *self = (AFInterSource *) s;
335   GlobalConfig *cfg = log_pipe_get_config(s);
336 
337   if (!log_source_init(s))
338     return FALSE;
339 
340   self->mark_freq = cfg->mark_freq;
341   afinter_postpone_mark(self->mark_freq);
342   self->mark_timer.expires = next_mark_target;
343 
344   g_mutex_lock(&internal_msg_lock);
345   current_internal_source = self;
346   g_mutex_unlock(&internal_msg_lock);
347 
348   return TRUE;
349 }
350 
351 static gboolean
afinter_source_deinit(LogPipe * s)352 afinter_source_deinit(LogPipe *s)
353 {
354   AFInterSource *self = (AFInterSource *) s;
355 
356   g_mutex_lock(&internal_msg_lock);
357   current_internal_source = NULL;
358   g_mutex_unlock(&internal_msg_lock);
359 
360   return log_source_deinit(&self->super.super);
361 }
362 
363 static LogSource *
afinter_source_new(AFInterSourceDriver * owner,AFInterSourceOptions * options)364 afinter_source_new(AFInterSourceDriver *owner, AFInterSourceOptions *options)
365 {
366   AFInterSource *self = g_new0(AFInterSource, 1);
367 
368   log_source_init_instance(&self->super, owner->super.super.super.cfg);
369   log_source_set_options(&self->super, &options->super, owner->super.super.id, NULL, FALSE,
370                          owner->super.super.super.expr_node);
371   afinter_source_init_watches(self);
372   self->super.super.init = afinter_source_init;
373   self->super.super.deinit = afinter_source_deinit;
374   self->super.wakeup = afinter_source_wakeup;
375 
376   self->options = options;
377 
378   return &self->super;
379 }
380 
381 
382 void
afinter_source_options_defaults(AFInterSourceOptions * options)383 afinter_source_options_defaults(AFInterSourceOptions *options)
384 {
385   log_source_options_defaults(&options->super);
386   options->queue_capacity = 10000;
387 }
388 
389 static gboolean
afinter_sd_init(LogPipe * s)390 afinter_sd_init(LogPipe *s)
391 {
392   AFInterSourceDriver *self = (AFInterSourceDriver *) s;
393   GlobalConfig *cfg = log_pipe_get_config(s);
394 
395   if (cfg_is_config_version_older(cfg, VERSION_VALUE_3_29))
396     {
397       msg_warning_once("WARNING: The internal_queue_length stat counter has been renamed to internal_source.queued. "
398                        "The old name will be removed in future versions", cfg_format_config_version_tag(cfg));
399     }
400 
401   if (!log_src_driver_init_method(s))
402     return FALSE;
403 
404   if (current_internal_source != NULL)
405     {
406       msg_error("Multiple internal() sources were detected, this is not possible");
407       return FALSE;
408     }
409 
410   log_source_options_init(&self->source_options.super, cfg, self->super.super.group);
411   self->source_options.super.stats_level = STATS_LEVEL0;
412   self->source_options.super.stats_source = stats_register_type("internal");
413   self->source = afinter_source_new(self, &self->source_options);
414   log_pipe_append(&self->source->super, s);
415 
416   if (!log_pipe_init(&self->source->super))
417     {
418       log_pipe_unref(&self->source->super);
419       self->source = NULL;
420       return FALSE;
421     }
422 
423   return TRUE;
424 }
425 
426 static gboolean
afinter_sd_deinit(LogPipe * s)427 afinter_sd_deinit(LogPipe *s)
428 {
429   AFInterSourceDriver *self = (AFInterSourceDriver *) s;
430 
431   if (self->source)
432     {
433       log_pipe_deinit(&self->source->super);
434       /* break circular reference created during _init */
435       log_pipe_unref(&self->source->super);
436       self->source = NULL;
437     }
438 
439   if (!log_src_driver_deinit_method(s))
440     return FALSE;
441 
442   return TRUE;
443 }
444 
445 static void
afinter_sd_free(LogPipe * s)446 afinter_sd_free(LogPipe *s)
447 {
448   AFInterSourceDriver *self = (AFInterSourceDriver *) s;
449 
450   g_assert(!self->source);
451   log_src_driver_free(s);
452 }
453 
454 
455 LogDriver *
afinter_sd_new(GlobalConfig * cfg)456 afinter_sd_new(GlobalConfig *cfg)
457 {
458   AFInterSourceDriver *self = g_new0(AFInterSourceDriver, 1);
459 
460   log_src_driver_init_instance((LogSrcDriver *)&self->super, cfg);
461   self->super.super.super.init = afinter_sd_init;
462   self->super.super.super.deinit = afinter_sd_deinit;
463   self->super.super.super.free_fn = afinter_sd_free;
464   self->super.super.super.on_config_inited = afinter_sd_start_thread;
465 
466   afinter_source_options_defaults(&self->source_options);
467 
468   return (LogDriver *)&self->super.super;
469 }
470 
471 /****************************************************************************
472  * Global entry points, without an AFInterSourceDriver instance.
473  ****************************************************************************/
474 
475 void
afinter_postpone_mark(gint mark_freq)476 afinter_postpone_mark(gint mark_freq)
477 {
478   if (mark_freq > 0)
479     {
480       iv_validate_now();
481       g_mutex_lock(&internal_mark_target_lock);
482       next_mark_target = iv_now;
483       next_mark_target.tv_sec += mark_freq;
484       g_mutex_unlock(&internal_mark_target_lock);
485     }
486   else
487     {
488       g_mutex_lock(&internal_mark_target_lock);
489       next_mark_target.tv_sec = -1;
490       g_mutex_unlock(&internal_mark_target_lock);
491     }
492 }
493 
494 static void
_release_internal_msg_queue(void)495 _release_internal_msg_queue(void)
496 {
497   LogMessage *internal_message = g_queue_pop_head(internal_msg_queue);
498   while (internal_message)
499     {
500       stats_counter_dec(internal_queue_length);
501       log_msg_unref(internal_message);
502 
503       internal_message = g_queue_pop_head(internal_msg_queue);
504     }
505   g_queue_free(internal_msg_queue);
506   internal_msg_queue = NULL;
507 }
508 
509 static inline void
_register_obsolete_stats_alias(StatsCounterItem * internal_queued_ctr)510 _register_obsolete_stats_alias(StatsCounterItem *internal_queued_ctr)
511 {
512   stats_lock();
513   StatsClusterKey sc_key;
514   stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_queue_length", NULL);
515   stats_register_alias_counter(0, &sc_key, SC_TYPE_PROCESSED, internal_queued_ctr);
516   stats_unlock();
517 }
518 
519 static inline void
_unregister_obsolete_stats_alias(StatsCounterItem * internal_queued_ctr)520 _unregister_obsolete_stats_alias(StatsCounterItem *internal_queued_ctr)
521 {
522   stats_lock();
523   StatsClusterKey sc_key;
524   stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_queue_length", NULL);
525   stats_unregister_alias_counter(&sc_key, SC_TYPE_PROCESSED, internal_queued_ctr);
526   stats_unlock();
527 }
528 
529 void
afinter_message_posted(LogMessage * msg)530 afinter_message_posted(LogMessage *msg)
531 {
532   g_mutex_lock(&internal_msg_lock);
533   if (!current_internal_source)
534     {
535       if (internal_msg_queue)
536         {
537           _release_internal_msg_queue();
538         }
539       log_msg_unref(msg);
540       goto exit;
541     }
542 
543   if (!internal_msg_queue)
544     {
545       internal_msg_queue = g_queue_new();
546 
547       stats_lock();
548       StatsClusterKey sc_key;
549       stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_source", NULL );
550       stats_register_counter(0, &sc_key, SC_TYPE_QUEUED, &internal_queue_length);
551       stats_register_counter(0, &sc_key, SC_TYPE_DROPPED, &internal_queue_dropped);
552       stats_unlock();
553 
554       _register_obsolete_stats_alias(internal_queue_length);
555     }
556 
557   if (g_queue_get_length(internal_msg_queue) >= current_internal_source->options->queue_capacity)
558     {
559       stats_counter_inc(internal_queue_dropped);
560       log_msg_unref(msg);
561       goto exit;
562     }
563 
564   g_queue_push_tail(internal_msg_queue, msg);
565   stats_counter_inc(internal_queue_length);
566 
567   if (current_internal_source->free_to_send)
568     iv_event_post(&current_internal_source->post);
569 exit:
570   g_mutex_unlock(&internal_msg_lock);
571 }
572 
573 static void
afinter_register_posted_hook(gint hook_type,gpointer user_data)574 afinter_register_posted_hook(gint hook_type, gpointer user_data)
575 {
576   msg_set_post_func(afinter_message_posted);
577 }
578 
579 void
afinter_global_init(void)580 afinter_global_init(void)
581 {
582   register_application_hook(AH_CONFIG_CHANGED, afinter_register_posted_hook, NULL, AHM_RUN_ONCE);
583 }
584 
585 void
afinter_global_deinit(void)586 afinter_global_deinit(void)
587 {
588   if (internal_msg_queue)
589     {
590       _unregister_obsolete_stats_alias(internal_queue_length);
591 
592       stats_lock();
593       StatsClusterKey sc_key;
594       stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "internal_source", NULL );
595       stats_unregister_counter(&sc_key, SC_TYPE_QUEUED, &internal_queue_length);
596       stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &internal_queue_dropped);
597       stats_unlock();
598       g_queue_free_full(internal_msg_queue, (GDestroyNotify)log_msg_unref);
599       internal_msg_queue = NULL;
600     }
601   current_internal_source = NULL;
602 }
603