1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logreader.h"
26 #include "stats/stats-cluster-single.h"
27 #include "mainloop-call.h"
28 #include "ack-tracker/ack_tracker.h"
29 #include "ack-tracker/ack_tracker_factory.h"
30 
31 static void log_reader_io_handle_in(gpointer s);
32 static gboolean log_reader_fetch_log(LogReader *self);
33 static void log_reader_update_watches(LogReader *self);
34 
35 /*****************************************************************************
36  * LogReader setters
37  *****************************************************************************/
38 
39 
40 void
log_reader_set_peer_addr(LogReader * s,GSockAddr * peer_addr)41 log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr)
42 {
43   LogReader *self = (LogReader *) s;
44 
45   g_sockaddr_unref(self->peer_addr);
46   self->peer_addr = g_sockaddr_ref(peer_addr);
47 }
48 
49 void
log_reader_set_local_addr(LogReader * s,GSockAddr * local_addr)50 log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr)
51 {
52   LogReader *self = (LogReader *) s;
53 
54   g_sockaddr_unref(self->local_addr);
55   self->local_addr = g_sockaddr_ref(local_addr);
56 }
57 
58 void
log_reader_set_immediate_check(LogReader * s)59 log_reader_set_immediate_check(LogReader *s)
60 {
61   LogReader *self = (LogReader *) s;
62 
63   self->immediate_check = TRUE;
64 }
65 
66 void
log_reader_set_options(LogReader * s,LogPipe * control,LogReaderOptions * options,const gchar * stats_id,const gchar * stats_instance)67 log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options,
68                        const gchar *stats_id, const gchar *stats_instance)
69 {
70   LogReader *self = (LogReader *) s;
71 
72   /* log_reader_reopen() needs to be called prior to set_options.  This is
73    * an ugly hack, but at least it is more explicitly than what used to be
74    * here, which silently ignored if self->proto was NULL.
75    */
76 
77   g_assert(self->proto != NULL);
78 
79   log_source_set_options(&self->super, &options->super, stats_id, stats_instance,
80                          (options->flags & LR_THREADED), control->expr_node);
81   AckTrackerFactory *factory = log_proto_server_get_ack_tracker_factory(self->proto);
82   log_source_set_ack_tracker_factory(&self->super, ack_tracker_factory_ref(factory));
83 
84   log_pipe_unref(self->control);
85   self->control = log_pipe_ref(control);
86 
87   self->options = options;
88   log_proto_server_set_options(self->proto, &self->options->proto_options.super);
89 }
90 
91 void
log_reader_set_name(LogReader * self,const gchar * name)92 log_reader_set_name(LogReader *self, const gchar *name)
93 {
94   log_source_set_name(&self->super, name);
95 }
96 
97 void
log_reader_disable_bookmark_saving(LogReader * s)98 log_reader_disable_bookmark_saving(LogReader *s)
99 {
100   log_source_disable_bookmark_saving(&s->super);
101 }
102 
103 /****************************************************************************
104  * Watches: the poll_events instance and the idle timer
105  ***************************************************************************/
106 
107 static void
log_reader_idle_timeout(void * cookie)108 log_reader_idle_timeout(void *cookie)
109 {
110   LogReader *self = (LogReader *) cookie;
111 
112   g_assert(!self->io_job.working);
113   msg_notice("Source timeout has elapsed, closing connection",
114              evt_tag_int("fd", log_proto_server_get_fd(self->proto)));
115 
116   log_pipe_notify(self->control, NC_CLOSE, self);
117 }
118 
119 static void
log_reader_stop_idle_timer(LogReader * self)120 log_reader_stop_idle_timer(LogReader *self)
121 {
122   if (iv_timer_registered(&self->idle_timer))
123     iv_timer_unregister(&self->idle_timer);
124 }
125 
126 static void
log_reader_start_watches(LogReader * self)127 log_reader_start_watches(LogReader *self)
128 {
129   g_assert(!self->watches_running);
130   if (self->poll_events)
131     poll_events_start_watches(self->poll_events);
132   self->watches_running = TRUE;
133   log_reader_update_watches(self);
134 }
135 
136 static void
log_reader_stop_watches(LogReader * self)137 log_reader_stop_watches(LogReader *self)
138 {
139   g_assert(self->watches_running);
140   if (self->poll_events)
141     poll_events_stop_watches(self->poll_events);
142   log_reader_stop_idle_timer(self);
143 
144   self->watches_running = FALSE;
145 }
146 
147 static void
log_reader_disable_watches(LogReader * self)148 log_reader_disable_watches(LogReader *self)
149 {
150   g_assert(self->watches_running);
151   if (self->poll_events)
152     poll_events_suspend_watches(self->poll_events);
153   log_reader_stop_idle_timer(self);
154 }
155 
156 /****************************************************************************
157  * Suspend/wakeup
158  ****************************************************************************/
159 
160 static void
log_reader_suspend_until_awoken(LogReader * self)161 log_reader_suspend_until_awoken(LogReader *self)
162 {
163   self->immediate_check = FALSE;
164   log_reader_disable_watches(self);
165   self->suspended = TRUE;
166 }
167 
168 static void
log_reader_force_check_in_next_poll(LogReader * self)169 log_reader_force_check_in_next_poll(LogReader *self)
170 {
171   self->immediate_check = FALSE;
172   log_reader_disable_watches(self);
173   self->suspended = FALSE;
174 
175   if (!iv_task_registered(&self->restart_task))
176     {
177       iv_task_register(&self->restart_task);
178     }
179 }
180 
181 static void
log_reader_wakeup_triggered(gpointer s)182 log_reader_wakeup_triggered(gpointer s)
183 {
184   LogReader *self = (LogReader *) s;
185 
186   if (!self->io_job.working && self->suspended)
187     {
188       /* NOTE: by the time working is set to FALSE we're over an
189        * update_watches call.  So it is called either here (when
190        * work_finished has done its work) or from work_finished above. The
191        * two are not racing as both run in the main thread
192        */
193       log_reader_update_watches(self);
194     }
195 }
196 
197 /* NOTE: may be running in the destination's thread, thus proper locking must be used */
198 static void
log_reader_wakeup(LogSource * s)199 log_reader_wakeup(LogSource *s)
200 {
201   LogReader *self = (LogReader *) s;
202 
203   /*
204    * We might get called even after this LogReader has been
205    * deinitialized, in which case we must not do anything (since the
206    * iv_event triggered here is not registered).
207    *
208    * This happens when log_writer_deinit() flushes its output queue
209    * after the reader which produced the message has already been
210    * deinited. Since init/deinit calls are made in the main thread, no
211    * locking is needed.
212    *
213    */
214 
215   if (self->super.super.flags & PIF_INITIALIZED)
216     iv_event_post(&self->schedule_wakeup);
217 }
218 
219 /****************************************************************************
220  * Open/close/reopen
221  ***************************************************************************/
222 
223 static void
log_reader_apply_proto_and_poll_events(LogReader * self,LogProtoServer * proto,PollEvents * poll_events)224 log_reader_apply_proto_and_poll_events(LogReader *self, LogProtoServer *proto, PollEvents *poll_events)
225 {
226   if (self->proto)
227     log_proto_server_free(self->proto);
228   if (self->poll_events)
229     poll_events_free(self->poll_events);
230 
231   self->proto = proto;
232 
233   if (self->proto)
234     log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
235 
236   self->poll_events = poll_events;
237 }
238 
239 
240 /* run in the main thread in reaction to a log_reader_reopen to change
241  * the source LogProtoServer instance. It needs to be ran in the main
242  * thread as it reregisters the watches associated with the main
243  * thread. */
244 void
log_reader_close_proto_deferred(gpointer s)245 log_reader_close_proto_deferred(gpointer s)
246 {
247   LogReader *self = (LogReader *) s;
248 
249   if (self->io_job.working)
250     {
251       self->pending_close = TRUE;
252       return;
253     }
254 
255   log_reader_stop_watches(self);
256   log_reader_apply_proto_and_poll_events(self, NULL, NULL);
257   log_reader_start_watches(self);
258 }
259 
260 void
log_reader_close_proto(LogReader * self)261 log_reader_close_proto(LogReader *self)
262 {
263   g_assert(self->watches_running);
264   main_loop_call((MainLoopTaskFunc) log_reader_close_proto_deferred, self, TRUE);
265 
266   if (!main_loop_is_main_thread())
267     {
268       g_mutex_lock(&self->pending_close_lock);
269       while (self->pending_close)
270         {
271           g_cond_wait(&self->pending_close_cond, &self->pending_close_lock);
272         }
273       g_mutex_unlock(&self->pending_close_lock);
274     }
275 }
276 
277 void
log_reader_open(LogReader * self,LogProtoServer * proto,PollEvents * poll_events)278 log_reader_open(LogReader *self, LogProtoServer *proto, PollEvents *poll_events)
279 {
280   g_assert(!self->watches_running);
281   poll_events_set_callback(poll_events, log_reader_io_handle_in, self);
282 
283   log_reader_apply_proto_and_poll_events(self, proto, poll_events);
284 }
285 
286 static gboolean
log_reader_is_opened(LogReader * self)287 log_reader_is_opened(LogReader *self)
288 {
289   return self->proto && self->poll_events;
290 }
291 
292 /*****************************************************************************
293  * Set watches state so we are polling the event(s) that comes next.
294  *****************************************************************************/
295 
296 static void
log_reader_update_watches(LogReader * self)297 log_reader_update_watches(LogReader *self)
298 {
299   GIOCondition cond;
300   gint idle_timeout = -1;
301 
302   main_loop_assert_main_thread();
303   g_assert(self->watches_running);
304 
305   log_reader_disable_watches(self);
306 
307   if (!log_reader_is_opened(self))
308     return;
309 
310   gboolean free_to_send = log_source_free_to_send(&self->super);
311   if (!free_to_send)
312     {
313       log_reader_suspend_until_awoken(self);
314       return;
315     }
316 
317   LogProtoPrepareAction prepare_action = log_proto_server_prepare(self->proto, &cond, &idle_timeout);
318 
319   if (idle_timeout > 0)
320     {
321       iv_validate_now();
322 
323       self->idle_timer.expires = iv_now;
324       self->idle_timer.expires.tv_sec += idle_timeout;
325 
326       iv_timer_register(&self->idle_timer);
327     }
328 
329   if (self->immediate_check)
330     {
331       log_reader_force_check_in_next_poll(self);
332       return;
333     }
334 
335   switch (prepare_action)
336     {
337     case LPPA_POLL_IO:
338       poll_events_update_watches(self->poll_events, cond);
339       break;
340     case LPPA_FORCE_SCHEDULE_FETCH:
341       log_reader_force_check_in_next_poll(self);
342       break;
343     case LPPA_SUSPEND:
344       log_reader_suspend_until_awoken(self);
345       break;
346     default:
347       g_assert_not_reached();
348       break;
349     }
350 }
351 
352 /*****************************************************************************
353  * Glue into MainLoopIOWorker
354  *****************************************************************************/
355 
356 static void
log_reader_work_perform(void * s,GIOCondition cond)357 log_reader_work_perform(void *s, GIOCondition cond)
358 {
359   LogReader *self = (LogReader *) s;
360 
361   self->notify_code = log_reader_fetch_log(self);
362 }
363 
364 static void
log_reader_work_finished(void * s)365 log_reader_work_finished(void *s)
366 {
367   LogReader *self = (LogReader *) s;
368 
369   if (self->pending_close)
370     {
371       /* pending proto is only set in the main thread, so no need to
372        * lock it before coming here. After we're syncing with the
373        * log_writer_reopen() call, quite possibly coming from a
374        * non-main thread. */
375 
376       g_mutex_lock(&self->pending_close_lock);
377 
378       log_reader_apply_proto_and_poll_events(self, NULL, NULL);
379       self->pending_close = FALSE;
380 
381       g_cond_signal(&self->pending_close_cond);
382       g_mutex_unlock(&self->pending_close_lock);
383     }
384 
385   if (self->notify_code)
386     {
387       gint notify_code = self->notify_code;
388 
389       self->notify_code = 0;
390       log_pipe_notify(self->control, notify_code, self);
391     }
392   if (self->super.super.flags & PIF_INITIALIZED)
393     {
394       /* reenable polling the source assuming that we're still in
395        * business (e.g. the reader hasn't been uninitialized) */
396 
397       if (self->realloc_window_after_fetch)
398         {
399           self->realloc_window_after_fetch = FALSE;
400           log_source_dynamic_window_realloc(s);
401         }
402       log_proto_server_reset_error(self->proto);
403       log_reader_update_watches(self);
404     }
405 }
406 
407 /*****************************************************************************
408  * Input processing, the main function of LogReader
409  *****************************************************************************/
410 
411 static void
_add_aux_nvpair(const gchar * name,const gchar * value,gsize value_len,gpointer user_data)412 _add_aux_nvpair(const gchar *name, const gchar *value, gsize value_len, gpointer user_data)
413 {
414   LogMessage *msg = (LogMessage *) user_data;
415 
416   log_msg_set_value_by_name(msg, name, value, value_len);;
417 }
418 
419 static inline gint
log_reader_process_handshake(LogReader * self)420 log_reader_process_handshake(LogReader *self)
421 {
422   LogProtoStatus status = log_proto_server_handshake(self->proto);
423 
424   switch (status)
425     {
426     case LPS_EOF:
427     case LPS_ERROR:
428       return status == LPS_ERROR ? NC_READ_ERROR : NC_CLOSE;
429     case LPS_SUCCESS:
430       break;
431     case LPS_AGAIN:
432       break;
433     default:
434       g_assert_not_reached();
435       break;
436     }
437   return 0;
438 }
439 
440 static void
_log_reader_insert_msg_length_stats(LogReader * self,gsize len)441 _log_reader_insert_msg_length_stats(LogReader *self, gsize len)
442 {
443   stats_aggregator_insert_data(self->max_message_size, len);
444   stats_aggregator_insert_data(self->average_messages_size, len);
445 }
446 
447 static gboolean
log_reader_handle_line(LogReader * self,const guchar * line,gint length,LogTransportAuxData * aux)448 log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTransportAuxData *aux)
449 {
450   LogMessage *m;
451 
452   msg_debug("Incoming log entry",
453             evt_tag_printf("line", "%.*s", length, line));
454   /* use the current time to get the time zone offset */
455   m = log_msg_new((gchar *) line, length,
456                   &self->options->parse_options);
457 
458   _log_reader_insert_msg_length_stats(self, length);
459   if (aux)
460     {
461       log_msg_set_saddr(m, aux->peer_addr ? : self->peer_addr);
462       log_msg_set_daddr(m, aux->local_addr ? : self->local_addr);
463       m->proto = aux->proto;
464     }
465   log_msg_refcache_start_producer(m);
466 
467   log_transport_aux_data_foreach(aux, _add_aux_nvpair, m);
468 
469   log_source_post(&self->super, m);
470   log_msg_refcache_stop();
471   return log_source_free_to_send(&self->super);
472 }
473 
474 /* returns: notify_code (NC_XXXX) or 0 for success */
475 static gint
log_reader_fetch_log(LogReader * self)476 log_reader_fetch_log(LogReader *self)
477 {
478   gint msg_count = 0;
479   gboolean may_read = TRUE;
480   LogTransportAuxData aux_storage, *aux = &aux_storage;
481 
482   if ((self->options->flags & LR_IGNORE_AUX_DATA))
483     aux = NULL;
484 
485   log_transport_aux_data_init(aux);
486   if (log_proto_server_handshake_in_progress(self->proto))
487     {
488       return log_reader_process_handshake(self);
489     }
490 
491   /* NOTE: this loop is here to decrease the load on the main loop, we try
492    * to fetch a couple of messages in a single run (but only up to
493    * fetch_limit).
494    */
495   while (msg_count < self->options->fetch_limit && !main_loop_worker_job_quit())
496     {
497       Bookmark *bookmark;
498       const guchar *msg;
499       gsize msg_len;
500       LogProtoStatus status;
501 
502       msg = NULL;
503 
504       /* NOTE: may_read is used to implement multi-read checking. It
505        * is initialized to TRUE to indicate that the protocol is
506        * allowed to issue a read(). If multi-read is disallowed in the
507        * protocol, it resets may_read to FALSE after the first read was issued.
508        */
509 
510       log_transport_aux_data_reinit(aux);
511       bookmark = ack_tracker_request_bookmark(self->super.ack_tracker);
512       status = log_proto_server_fetch(self->proto, &msg, &msg_len, &may_read, aux, bookmark);
513       switch (status)
514         {
515         case LPS_EOF:
516           log_transport_aux_data_destroy(aux);
517           return NC_CLOSE;
518         case LPS_ERROR:
519           log_transport_aux_data_destroy(aux);
520           return NC_READ_ERROR;
521         case LPS_SUCCESS:
522           break;
523         case LPS_AGAIN:
524           break;
525         default:
526           g_assert_not_reached();
527           break;
528         }
529 
530       if (!msg)
531         {
532           /* no more messages for now */
533           break;
534         }
535       if (msg_len > 0 || (self->options->flags & LR_EMPTY_LINES))
536         {
537           msg_count++;
538 
539           if (!log_reader_handle_line(self, msg, msg_len, aux))
540             {
541               /* window is full, don't generate further messages */
542               break;
543             }
544         }
545     }
546   log_transport_aux_data_destroy(aux);
547 
548   if (msg_count == self->options->fetch_limit)
549     self->immediate_check = TRUE;
550   return 0;
551 }
552 
553 static void
log_reader_io_handle_in(gpointer s)554 log_reader_io_handle_in(gpointer s)
555 {
556   LogReader *self = (LogReader *) s;
557 
558   log_reader_disable_watches(self);
559   if ((self->options->flags & LR_THREADED))
560     {
561       main_loop_io_worker_job_submit(&self->io_job, G_IO_IN);
562     }
563   else
564     {
565       /* Checking main_loop_io_worker_job_quit() helps to speed up the
566        * reload process.  If reload/shutdown is requested we shouldn't do
567        * anything here, outstanding messages will be processed by the new
568        * configuration.
569        *
570        * Our current understanding is that it doesn't prevent race
571        * conditions of any kind.
572        */
573       if (!main_loop_worker_job_quit())
574         {
575           log_reader_work_perform(s, G_IO_IN);
576           log_reader_work_finished(s);
577         }
578     }
579 }
580 
581 static void
_register_aggregated_stats(LogReader * self)582 _register_aggregated_stats(LogReader *self)
583 {
584   StatsClusterKey sc_key_eps_input;
585   stats_cluster_logpipe_key_set(&sc_key_eps_input, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
586                                 self->super.stats_instance);
587 
588   stats_aggregator_lock();
589   StatsClusterKey sc_key;
590 
591   stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
592                                          self->super.stats_instance, "msg_size_max");
593   stats_register_aggregator_maximum(self->super.options->stats_level, &sc_key, &self->max_message_size);
594 
595   stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
596                                          self->super.stats_instance, "msg_size_avg");
597   stats_register_aggregator_average(self->super.options->stats_level, &sc_key, &self->average_messages_size);
598 
599   stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
600                                          self->super.stats_instance, "eps");
601   stats_register_aggregator_cps(self->super.options->stats_level, &sc_key, &sc_key_eps_input, SC_TYPE_PROCESSED,
602                                 &self->CPS);
603 
604   stats_aggregator_unlock();
605 }
606 
607 static void
_unregister_aggregated_stats(LogReader * self)608 _unregister_aggregated_stats(LogReader *self)
609 {
610   stats_aggregator_lock();
611 
612   stats_unregister_aggregator_maximum(&self->max_message_size);
613   stats_unregister_aggregator_average(&self->average_messages_size);
614   stats_unregister_aggregator_cps(&self->CPS);
615 
616   stats_aggregator_unlock();
617 }
618 
619 /*****************************************************************************
620  * LogReader->LogPipe interface implementation
621  *****************************************************************************/
622 
623 static gboolean
log_reader_init(LogPipe * s)624 log_reader_init(LogPipe *s)
625 {
626   LogReader *self = (LogReader *) s;
627 
628   if (!log_source_init(s))
629     return FALSE;
630 
631   log_proto_server_set_ack_tracker(self->proto, self->super.ack_tracker);
632 
633 
634   if (!log_proto_server_validate_options(self->proto))
635     return FALSE;
636 
637   if (!self->options->parse_options.format_handler)
638     {
639       msg_error("Unknown format plugin specified",
640                 evt_tag_str("format", self->options->parse_options.format));
641       return FALSE;
642     }
643 
644   iv_event_register(&self->schedule_wakeup);
645 
646   log_reader_start_watches(self);
647 
648   _register_aggregated_stats(self);
649 
650   return TRUE;
651 }
652 
653 static gboolean
log_reader_deinit(LogPipe * s)654 log_reader_deinit(LogPipe *s)
655 {
656   LogReader *self = (LogReader *) s;
657 
658   main_loop_assert_main_thread();
659 
660   iv_event_unregister(&self->schedule_wakeup);
661   if (iv_task_registered(&self->restart_task))
662     iv_task_unregister(&self->restart_task);
663 
664   log_reader_stop_watches(self);
665 
666   _unregister_aggregated_stats(self);
667   if (!log_source_deinit(s))
668     return FALSE;
669 
670   return TRUE;
671 }
672 
673 
674 static void
log_reader_init_watches(LogReader * self)675 log_reader_init_watches(LogReader *self)
676 {
677   IV_TASK_INIT(&self->restart_task);
678   self->restart_task.cookie = self;
679   self->restart_task.handler = log_reader_io_handle_in;
680 
681   IV_EVENT_INIT(&self->schedule_wakeup);
682   self->schedule_wakeup.cookie = self;
683   self->schedule_wakeup.handler = log_reader_wakeup_triggered;
684 
685   IV_TIMER_INIT(&self->idle_timer);
686   self->idle_timer.cookie = self;
687   self->idle_timer.handler = log_reader_idle_timeout;
688 
689   main_loop_io_worker_job_init(&self->io_job);
690   self->io_job.user_data = self;
691   self->io_job.work = (void (*)(void *, GIOCondition)) log_reader_work_perform;
692   self->io_job.completion = (void (*)(void *)) log_reader_work_finished;
693   self->io_job.engage = (void (*)(void *)) log_pipe_ref;
694   self->io_job.release = (void (*)(void *)) log_pipe_unref;
695 }
696 
697 static void
log_reader_free(LogPipe * s)698 log_reader_free(LogPipe *s)
699 {
700   LogReader *self = (LogReader *) s;
701 
702   if (self->proto)
703     {
704       log_proto_server_free(self->proto);
705       self->proto = NULL;
706     }
707   if (self->poll_events)
708     poll_events_free(self->poll_events);
709 
710   log_pipe_unref(self->control);
711   g_sockaddr_unref(self->peer_addr);
712   g_sockaddr_unref(self->local_addr);
713   g_mutex_clear(&self->pending_close_lock);
714   g_cond_clear(&self->pending_close_cond);
715   log_source_free(s);
716 }
717 
718 static void
_schedule_dynamic_window_realloc(LogSource * s)719 _schedule_dynamic_window_realloc(LogSource *s)
720 {
721   LogReader *self = (LogReader *)s;
722 
723   msg_trace("LogReader::dynamic_window_realloc called");
724 
725   if (self->io_job.working)
726     {
727       self->realloc_window_after_fetch = TRUE;
728       return;
729     }
730 
731   log_source_dynamic_window_realloc(s);
732 }
733 
734 
735 LogReader *
log_reader_new(GlobalConfig * cfg)736 log_reader_new(GlobalConfig *cfg)
737 {
738   LogReader *self = g_new0(LogReader, 1);
739 
740   log_source_init_instance(&self->super, cfg);
741   self->super.super.init = log_reader_init;
742   self->super.super.deinit = log_reader_deinit;
743   self->super.super.free_fn = log_reader_free;
744   self->super.wakeup = log_reader_wakeup;
745   self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
746   self->immediate_check = FALSE;
747   log_reader_init_watches(self);
748   g_mutex_init(&self->pending_close_lock);
749   g_cond_init(&self->pending_close_cond);
750   return self;
751 }
752 
753 /****************************************************************************
754  * LogReaderOptions defaults/init/destroy
755  ***************************************************************************/
756 
757 void
log_reader_options_defaults(LogReaderOptions * options)758 log_reader_options_defaults(LogReaderOptions *options)
759 {
760   log_source_options_defaults(&options->super);
761   log_proto_server_options_defaults(&options->proto_options.super);
762   msg_format_options_defaults(&options->parse_options);
763   options->fetch_limit = 10;
764 }
765 
766 /*
767  * NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy
768  *
769  * Rationale:
770  *   - init is called from driver init (e.g. affile_sd_init),
771  *   - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_sd_deinit)
772  *
773  * The reason:
774  *   - when initializing the reloaded configuration fails for some reason,
775  *     we have to fall back to the old configuration, thus we cannot dump
776  *     the information stored in the Options structure at deinit time, but
777  *     have to recover it when the old configuration is initialized.
778  *
779  * For the reasons above, init and destroy behave the following way:
780  *
781  *   - init is idempotent, it can be called multiple times without leaking
782  *     memory, and without loss of information
783  *   - destroy is only called once, when the options are indeed to be destroyed
784  *
785  * Also important to note is that when init is called multiple times, the
786  * GlobalConfig reference is the same, this means that it is enough to
787  * remember whether init was called already and return w/o doing anything in
788  * that case, which is actually how idempotency is implemented here.
789  */
790 void
log_reader_options_init(LogReaderOptions * options,GlobalConfig * cfg,const gchar * group_name)791 log_reader_options_init(LogReaderOptions *options, GlobalConfig *cfg, const gchar *group_name)
792 {
793   if (options->initialized)
794     return;
795 
796   log_source_options_init(&options->super, cfg, group_name);
797   log_proto_server_options_init(&options->proto_options.super, cfg);
798   msg_format_options_init(&options->parse_options, cfg);
799 
800   if (options->check_hostname == -1)
801     options->check_hostname = cfg->check_hostname;
802   if (options->check_hostname)
803     {
804       options->parse_options.flags |= LP_CHECK_HOSTNAME;
805     }
806   if (!options->super.keep_timestamp)
807     {
808       options->parse_options.flags |= LP_NO_PARSE_DATE;
809     }
810   if (options->parse_options.default_pri == 0xFFFF)
811     {
812       if (options->flags & LR_KERNEL)
813         options->parse_options.default_pri = LOG_KERN | LOG_NOTICE;
814       else
815         options->parse_options.default_pri = LOG_USER | LOG_NOTICE;
816     }
817   if (options->proto_options.super.encoding)
818     options->parse_options.flags |= LP_ASSUME_UTF8;
819   if (cfg->threaded)
820     options->flags |= LR_THREADED;
821   options->initialized = TRUE;
822 }
823 
824 void
log_reader_options_destroy(LogReaderOptions * options)825 log_reader_options_destroy(LogReaderOptions *options)
826 {
827   log_source_options_destroy(&options->super);
828   log_proto_server_options_destroy(&options->proto_options.super);
829   msg_format_options_destroy(&options->parse_options);
830   options->initialized = FALSE;
831 }
832 
833 CfgFlagHandler log_reader_flag_handlers[] =
834 {
835   /* NOTE: underscores are automatically converted to dashes */
836 
837   /* LogReaderOptions */
838   { "kernel",                     CFH_SET, offsetof(LogReaderOptions, flags),               LR_KERNEL },
839   { "empty-lines",                CFH_SET, offsetof(LogReaderOptions, flags),               LR_EMPTY_LINES },
840   { "threaded",                   CFH_SET, offsetof(LogReaderOptions, flags),               LR_THREADED },
841   { "ignore-aux-data",            CFH_SET, offsetof(LogReaderOptions, flags),               LR_IGNORE_AUX_DATA },
842   { NULL },
843 };
844 
845 gboolean
log_reader_options_process_flag(LogReaderOptions * options,const gchar * flag)846 log_reader_options_process_flag(LogReaderOptions *options, const gchar *flag)
847 {
848   if (!msg_format_options_process_flag(&options->parse_options, flag))
849     return cfg_process_flag(log_reader_flag_handlers, options, flag);
850   return TRUE;
851 }
852