1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logreader.h"
26 #include "stats/stats-cluster-single.h"
27 #include "mainloop-call.h"
28 #include "ack-tracker/ack_tracker.h"
29 #include "ack-tracker/ack_tracker_factory.h"
30
31 static void log_reader_io_handle_in(gpointer s);
32 static gboolean log_reader_fetch_log(LogReader *self);
33 static void log_reader_update_watches(LogReader *self);
34
35 /*****************************************************************************
36 * LogReader setters
37 *****************************************************************************/
38
39
40 void
log_reader_set_peer_addr(LogReader * s,GSockAddr * peer_addr)41 log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr)
42 {
43 LogReader *self = (LogReader *) s;
44
45 g_sockaddr_unref(self->peer_addr);
46 self->peer_addr = g_sockaddr_ref(peer_addr);
47 }
48
49 void
log_reader_set_local_addr(LogReader * s,GSockAddr * local_addr)50 log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr)
51 {
52 LogReader *self = (LogReader *) s;
53
54 g_sockaddr_unref(self->local_addr);
55 self->local_addr = g_sockaddr_ref(local_addr);
56 }
57
58 void
log_reader_set_immediate_check(LogReader * s)59 log_reader_set_immediate_check(LogReader *s)
60 {
61 LogReader *self = (LogReader *) s;
62
63 self->immediate_check = TRUE;
64 }
65
66 void
log_reader_set_options(LogReader * s,LogPipe * control,LogReaderOptions * options,const gchar * stats_id,const gchar * stats_instance)67 log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options,
68 const gchar *stats_id, const gchar *stats_instance)
69 {
70 LogReader *self = (LogReader *) s;
71
72 /* log_reader_reopen() needs to be called prior to set_options. This is
73 * an ugly hack, but at least it is more explicitly than what used to be
74 * here, which silently ignored if self->proto was NULL.
75 */
76
77 g_assert(self->proto != NULL);
78
79 log_source_set_options(&self->super, &options->super, stats_id, stats_instance,
80 (options->flags & LR_THREADED), control->expr_node);
81 AckTrackerFactory *factory = log_proto_server_get_ack_tracker_factory(self->proto);
82 log_source_set_ack_tracker_factory(&self->super, ack_tracker_factory_ref(factory));
83
84 log_pipe_unref(self->control);
85 self->control = log_pipe_ref(control);
86
87 self->options = options;
88 log_proto_server_set_options(self->proto, &self->options->proto_options.super);
89 }
90
91 void
log_reader_set_name(LogReader * self,const gchar * name)92 log_reader_set_name(LogReader *self, const gchar *name)
93 {
94 log_source_set_name(&self->super, name);
95 }
96
97 void
log_reader_disable_bookmark_saving(LogReader * s)98 log_reader_disable_bookmark_saving(LogReader *s)
99 {
100 log_source_disable_bookmark_saving(&s->super);
101 }
102
103 /****************************************************************************
104 * Watches: the poll_events instance and the idle timer
105 ***************************************************************************/
106
107 static void
log_reader_idle_timeout(void * cookie)108 log_reader_idle_timeout(void *cookie)
109 {
110 LogReader *self = (LogReader *) cookie;
111
112 g_assert(!self->io_job.working);
113 msg_notice("Source timeout has elapsed, closing connection",
114 evt_tag_int("fd", log_proto_server_get_fd(self->proto)));
115
116 log_pipe_notify(self->control, NC_CLOSE, self);
117 }
118
119 static void
log_reader_stop_idle_timer(LogReader * self)120 log_reader_stop_idle_timer(LogReader *self)
121 {
122 if (iv_timer_registered(&self->idle_timer))
123 iv_timer_unregister(&self->idle_timer);
124 }
125
126 static void
log_reader_start_watches(LogReader * self)127 log_reader_start_watches(LogReader *self)
128 {
129 g_assert(!self->watches_running);
130 if (self->poll_events)
131 poll_events_start_watches(self->poll_events);
132 self->watches_running = TRUE;
133 log_reader_update_watches(self);
134 }
135
136 static void
log_reader_stop_watches(LogReader * self)137 log_reader_stop_watches(LogReader *self)
138 {
139 g_assert(self->watches_running);
140 if (self->poll_events)
141 poll_events_stop_watches(self->poll_events);
142 log_reader_stop_idle_timer(self);
143
144 self->watches_running = FALSE;
145 }
146
147 static void
log_reader_disable_watches(LogReader * self)148 log_reader_disable_watches(LogReader *self)
149 {
150 g_assert(self->watches_running);
151 if (self->poll_events)
152 poll_events_suspend_watches(self->poll_events);
153 log_reader_stop_idle_timer(self);
154 }
155
156 /****************************************************************************
157 * Suspend/wakeup
158 ****************************************************************************/
159
160 static void
log_reader_suspend_until_awoken(LogReader * self)161 log_reader_suspend_until_awoken(LogReader *self)
162 {
163 self->immediate_check = FALSE;
164 log_reader_disable_watches(self);
165 self->suspended = TRUE;
166 }
167
168 static void
log_reader_force_check_in_next_poll(LogReader * self)169 log_reader_force_check_in_next_poll(LogReader *self)
170 {
171 self->immediate_check = FALSE;
172 log_reader_disable_watches(self);
173 self->suspended = FALSE;
174
175 if (!iv_task_registered(&self->restart_task))
176 {
177 iv_task_register(&self->restart_task);
178 }
179 }
180
181 static void
log_reader_wakeup_triggered(gpointer s)182 log_reader_wakeup_triggered(gpointer s)
183 {
184 LogReader *self = (LogReader *) s;
185
186 if (!self->io_job.working && self->suspended)
187 {
188 /* NOTE: by the time working is set to FALSE we're over an
189 * update_watches call. So it is called either here (when
190 * work_finished has done its work) or from work_finished above. The
191 * two are not racing as both run in the main thread
192 */
193 log_reader_update_watches(self);
194 }
195 }
196
197 /* NOTE: may be running in the destination's thread, thus proper locking must be used */
198 static void
log_reader_wakeup(LogSource * s)199 log_reader_wakeup(LogSource *s)
200 {
201 LogReader *self = (LogReader *) s;
202
203 /*
204 * We might get called even after this LogReader has been
205 * deinitialized, in which case we must not do anything (since the
206 * iv_event triggered here is not registered).
207 *
208 * This happens when log_writer_deinit() flushes its output queue
209 * after the reader which produced the message has already been
210 * deinited. Since init/deinit calls are made in the main thread, no
211 * locking is needed.
212 *
213 */
214
215 if (self->super.super.flags & PIF_INITIALIZED)
216 iv_event_post(&self->schedule_wakeup);
217 }
218
219 /****************************************************************************
220 * Open/close/reopen
221 ***************************************************************************/
222
223 static void
log_reader_apply_proto_and_poll_events(LogReader * self,LogProtoServer * proto,PollEvents * poll_events)224 log_reader_apply_proto_and_poll_events(LogReader *self, LogProtoServer *proto, PollEvents *poll_events)
225 {
226 if (self->proto)
227 log_proto_server_free(self->proto);
228 if (self->poll_events)
229 poll_events_free(self->poll_events);
230
231 self->proto = proto;
232
233 if (self->proto)
234 log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
235
236 self->poll_events = poll_events;
237 }
238
239
240 /* run in the main thread in reaction to a log_reader_reopen to change
241 * the source LogProtoServer instance. It needs to be ran in the main
242 * thread as it reregisters the watches associated with the main
243 * thread. */
244 void
log_reader_close_proto_deferred(gpointer s)245 log_reader_close_proto_deferred(gpointer s)
246 {
247 LogReader *self = (LogReader *) s;
248
249 if (self->io_job.working)
250 {
251 self->pending_close = TRUE;
252 return;
253 }
254
255 log_reader_stop_watches(self);
256 log_reader_apply_proto_and_poll_events(self, NULL, NULL);
257 log_reader_start_watches(self);
258 }
259
260 void
log_reader_close_proto(LogReader * self)261 log_reader_close_proto(LogReader *self)
262 {
263 g_assert(self->watches_running);
264 main_loop_call((MainLoopTaskFunc) log_reader_close_proto_deferred, self, TRUE);
265
266 if (!main_loop_is_main_thread())
267 {
268 g_mutex_lock(&self->pending_close_lock);
269 while (self->pending_close)
270 {
271 g_cond_wait(&self->pending_close_cond, &self->pending_close_lock);
272 }
273 g_mutex_unlock(&self->pending_close_lock);
274 }
275 }
276
277 void
log_reader_open(LogReader * self,LogProtoServer * proto,PollEvents * poll_events)278 log_reader_open(LogReader *self, LogProtoServer *proto, PollEvents *poll_events)
279 {
280 g_assert(!self->watches_running);
281 poll_events_set_callback(poll_events, log_reader_io_handle_in, self);
282
283 log_reader_apply_proto_and_poll_events(self, proto, poll_events);
284 }
285
286 static gboolean
log_reader_is_opened(LogReader * self)287 log_reader_is_opened(LogReader *self)
288 {
289 return self->proto && self->poll_events;
290 }
291
292 /*****************************************************************************
293 * Set watches state so we are polling the event(s) that comes next.
294 *****************************************************************************/
295
296 static void
log_reader_update_watches(LogReader * self)297 log_reader_update_watches(LogReader *self)
298 {
299 GIOCondition cond;
300 gint idle_timeout = -1;
301
302 main_loop_assert_main_thread();
303 g_assert(self->watches_running);
304
305 log_reader_disable_watches(self);
306
307 if (!log_reader_is_opened(self))
308 return;
309
310 gboolean free_to_send = log_source_free_to_send(&self->super);
311 if (!free_to_send)
312 {
313 log_reader_suspend_until_awoken(self);
314 return;
315 }
316
317 LogProtoPrepareAction prepare_action = log_proto_server_prepare(self->proto, &cond, &idle_timeout);
318
319 if (idle_timeout > 0)
320 {
321 iv_validate_now();
322
323 self->idle_timer.expires = iv_now;
324 self->idle_timer.expires.tv_sec += idle_timeout;
325
326 iv_timer_register(&self->idle_timer);
327 }
328
329 if (self->immediate_check)
330 {
331 log_reader_force_check_in_next_poll(self);
332 return;
333 }
334
335 switch (prepare_action)
336 {
337 case LPPA_POLL_IO:
338 poll_events_update_watches(self->poll_events, cond);
339 break;
340 case LPPA_FORCE_SCHEDULE_FETCH:
341 log_reader_force_check_in_next_poll(self);
342 break;
343 case LPPA_SUSPEND:
344 log_reader_suspend_until_awoken(self);
345 break;
346 default:
347 g_assert_not_reached();
348 break;
349 }
350 }
351
352 /*****************************************************************************
353 * Glue into MainLoopIOWorker
354 *****************************************************************************/
355
356 static void
log_reader_work_perform(void * s,GIOCondition cond)357 log_reader_work_perform(void *s, GIOCondition cond)
358 {
359 LogReader *self = (LogReader *) s;
360
361 self->notify_code = log_reader_fetch_log(self);
362 }
363
364 static void
log_reader_work_finished(void * s)365 log_reader_work_finished(void *s)
366 {
367 LogReader *self = (LogReader *) s;
368
369 if (self->pending_close)
370 {
371 /* pending proto is only set in the main thread, so no need to
372 * lock it before coming here. After we're syncing with the
373 * log_writer_reopen() call, quite possibly coming from a
374 * non-main thread. */
375
376 g_mutex_lock(&self->pending_close_lock);
377
378 log_reader_apply_proto_and_poll_events(self, NULL, NULL);
379 self->pending_close = FALSE;
380
381 g_cond_signal(&self->pending_close_cond);
382 g_mutex_unlock(&self->pending_close_lock);
383 }
384
385 if (self->notify_code)
386 {
387 gint notify_code = self->notify_code;
388
389 self->notify_code = 0;
390 log_pipe_notify(self->control, notify_code, self);
391 }
392 if (self->super.super.flags & PIF_INITIALIZED)
393 {
394 /* reenable polling the source assuming that we're still in
395 * business (e.g. the reader hasn't been uninitialized) */
396
397 if (self->realloc_window_after_fetch)
398 {
399 self->realloc_window_after_fetch = FALSE;
400 log_source_dynamic_window_realloc(s);
401 }
402 log_proto_server_reset_error(self->proto);
403 log_reader_update_watches(self);
404 }
405 }
406
407 /*****************************************************************************
408 * Input processing, the main function of LogReader
409 *****************************************************************************/
410
411 static void
_add_aux_nvpair(const gchar * name,const gchar * value,gsize value_len,gpointer user_data)412 _add_aux_nvpair(const gchar *name, const gchar *value, gsize value_len, gpointer user_data)
413 {
414 LogMessage *msg = (LogMessage *) user_data;
415
416 log_msg_set_value_by_name(msg, name, value, value_len);;
417 }
418
419 static inline gint
log_reader_process_handshake(LogReader * self)420 log_reader_process_handshake(LogReader *self)
421 {
422 LogProtoStatus status = log_proto_server_handshake(self->proto);
423
424 switch (status)
425 {
426 case LPS_EOF:
427 case LPS_ERROR:
428 return status == LPS_ERROR ? NC_READ_ERROR : NC_CLOSE;
429 case LPS_SUCCESS:
430 break;
431 case LPS_AGAIN:
432 break;
433 default:
434 g_assert_not_reached();
435 break;
436 }
437 return 0;
438 }
439
440 static void
_log_reader_insert_msg_length_stats(LogReader * self,gsize len)441 _log_reader_insert_msg_length_stats(LogReader *self, gsize len)
442 {
443 stats_aggregator_insert_data(self->max_message_size, len);
444 stats_aggregator_insert_data(self->average_messages_size, len);
445 }
446
447 static gboolean
log_reader_handle_line(LogReader * self,const guchar * line,gint length,LogTransportAuxData * aux)448 log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTransportAuxData *aux)
449 {
450 LogMessage *m;
451
452 msg_debug("Incoming log entry",
453 evt_tag_printf("line", "%.*s", length, line));
454 /* use the current time to get the time zone offset */
455 m = log_msg_new((gchar *) line, length,
456 &self->options->parse_options);
457
458 _log_reader_insert_msg_length_stats(self, length);
459 if (aux)
460 {
461 log_msg_set_saddr(m, aux->peer_addr ? : self->peer_addr);
462 log_msg_set_daddr(m, aux->local_addr ? : self->local_addr);
463 m->proto = aux->proto;
464 }
465 log_msg_refcache_start_producer(m);
466
467 log_transport_aux_data_foreach(aux, _add_aux_nvpair, m);
468
469 log_source_post(&self->super, m);
470 log_msg_refcache_stop();
471 return log_source_free_to_send(&self->super);
472 }
473
474 /* returns: notify_code (NC_XXXX) or 0 for success */
475 static gint
log_reader_fetch_log(LogReader * self)476 log_reader_fetch_log(LogReader *self)
477 {
478 gint msg_count = 0;
479 gboolean may_read = TRUE;
480 LogTransportAuxData aux_storage, *aux = &aux_storage;
481
482 if ((self->options->flags & LR_IGNORE_AUX_DATA))
483 aux = NULL;
484
485 log_transport_aux_data_init(aux);
486 if (log_proto_server_handshake_in_progress(self->proto))
487 {
488 return log_reader_process_handshake(self);
489 }
490
491 /* NOTE: this loop is here to decrease the load on the main loop, we try
492 * to fetch a couple of messages in a single run (but only up to
493 * fetch_limit).
494 */
495 while (msg_count < self->options->fetch_limit && !main_loop_worker_job_quit())
496 {
497 Bookmark *bookmark;
498 const guchar *msg;
499 gsize msg_len;
500 LogProtoStatus status;
501
502 msg = NULL;
503
504 /* NOTE: may_read is used to implement multi-read checking. It
505 * is initialized to TRUE to indicate that the protocol is
506 * allowed to issue a read(). If multi-read is disallowed in the
507 * protocol, it resets may_read to FALSE after the first read was issued.
508 */
509
510 log_transport_aux_data_reinit(aux);
511 bookmark = ack_tracker_request_bookmark(self->super.ack_tracker);
512 status = log_proto_server_fetch(self->proto, &msg, &msg_len, &may_read, aux, bookmark);
513 switch (status)
514 {
515 case LPS_EOF:
516 log_transport_aux_data_destroy(aux);
517 return NC_CLOSE;
518 case LPS_ERROR:
519 log_transport_aux_data_destroy(aux);
520 return NC_READ_ERROR;
521 case LPS_SUCCESS:
522 break;
523 case LPS_AGAIN:
524 break;
525 default:
526 g_assert_not_reached();
527 break;
528 }
529
530 if (!msg)
531 {
532 /* no more messages for now */
533 break;
534 }
535 if (msg_len > 0 || (self->options->flags & LR_EMPTY_LINES))
536 {
537 msg_count++;
538
539 if (!log_reader_handle_line(self, msg, msg_len, aux))
540 {
541 /* window is full, don't generate further messages */
542 break;
543 }
544 }
545 }
546 log_transport_aux_data_destroy(aux);
547
548 if (msg_count == self->options->fetch_limit)
549 self->immediate_check = TRUE;
550 return 0;
551 }
552
553 static void
log_reader_io_handle_in(gpointer s)554 log_reader_io_handle_in(gpointer s)
555 {
556 LogReader *self = (LogReader *) s;
557
558 log_reader_disable_watches(self);
559 if ((self->options->flags & LR_THREADED))
560 {
561 main_loop_io_worker_job_submit(&self->io_job, G_IO_IN);
562 }
563 else
564 {
565 /* Checking main_loop_io_worker_job_quit() helps to speed up the
566 * reload process. If reload/shutdown is requested we shouldn't do
567 * anything here, outstanding messages will be processed by the new
568 * configuration.
569 *
570 * Our current understanding is that it doesn't prevent race
571 * conditions of any kind.
572 */
573 if (!main_loop_worker_job_quit())
574 {
575 log_reader_work_perform(s, G_IO_IN);
576 log_reader_work_finished(s);
577 }
578 }
579 }
580
581 static void
_register_aggregated_stats(LogReader * self)582 _register_aggregated_stats(LogReader *self)
583 {
584 StatsClusterKey sc_key_eps_input;
585 stats_cluster_logpipe_key_set(&sc_key_eps_input, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
586 self->super.stats_instance);
587
588 stats_aggregator_lock();
589 StatsClusterKey sc_key;
590
591 stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
592 self->super.stats_instance, "msg_size_max");
593 stats_register_aggregator_maximum(self->super.options->stats_level, &sc_key, &self->max_message_size);
594
595 stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
596 self->super.stats_instance, "msg_size_avg");
597 stats_register_aggregator_average(self->super.options->stats_level, &sc_key, &self->average_messages_size);
598
599 stats_cluster_single_key_set_with_name(&sc_key, self->super.options->stats_source | SCS_SOURCE, self->super.stats_id,
600 self->super.stats_instance, "eps");
601 stats_register_aggregator_cps(self->super.options->stats_level, &sc_key, &sc_key_eps_input, SC_TYPE_PROCESSED,
602 &self->CPS);
603
604 stats_aggregator_unlock();
605 }
606
607 static void
_unregister_aggregated_stats(LogReader * self)608 _unregister_aggregated_stats(LogReader *self)
609 {
610 stats_aggregator_lock();
611
612 stats_unregister_aggregator_maximum(&self->max_message_size);
613 stats_unregister_aggregator_average(&self->average_messages_size);
614 stats_unregister_aggregator_cps(&self->CPS);
615
616 stats_aggregator_unlock();
617 }
618
619 /*****************************************************************************
620 * LogReader->LogPipe interface implementation
621 *****************************************************************************/
622
623 static gboolean
log_reader_init(LogPipe * s)624 log_reader_init(LogPipe *s)
625 {
626 LogReader *self = (LogReader *) s;
627
628 if (!log_source_init(s))
629 return FALSE;
630
631 log_proto_server_set_ack_tracker(self->proto, self->super.ack_tracker);
632
633
634 if (!log_proto_server_validate_options(self->proto))
635 return FALSE;
636
637 if (!self->options->parse_options.format_handler)
638 {
639 msg_error("Unknown format plugin specified",
640 evt_tag_str("format", self->options->parse_options.format));
641 return FALSE;
642 }
643
644 iv_event_register(&self->schedule_wakeup);
645
646 log_reader_start_watches(self);
647
648 _register_aggregated_stats(self);
649
650 return TRUE;
651 }
652
653 static gboolean
log_reader_deinit(LogPipe * s)654 log_reader_deinit(LogPipe *s)
655 {
656 LogReader *self = (LogReader *) s;
657
658 main_loop_assert_main_thread();
659
660 iv_event_unregister(&self->schedule_wakeup);
661 if (iv_task_registered(&self->restart_task))
662 iv_task_unregister(&self->restart_task);
663
664 log_reader_stop_watches(self);
665
666 _unregister_aggregated_stats(self);
667 if (!log_source_deinit(s))
668 return FALSE;
669
670 return TRUE;
671 }
672
673
674 static void
log_reader_init_watches(LogReader * self)675 log_reader_init_watches(LogReader *self)
676 {
677 IV_TASK_INIT(&self->restart_task);
678 self->restart_task.cookie = self;
679 self->restart_task.handler = log_reader_io_handle_in;
680
681 IV_EVENT_INIT(&self->schedule_wakeup);
682 self->schedule_wakeup.cookie = self;
683 self->schedule_wakeup.handler = log_reader_wakeup_triggered;
684
685 IV_TIMER_INIT(&self->idle_timer);
686 self->idle_timer.cookie = self;
687 self->idle_timer.handler = log_reader_idle_timeout;
688
689 main_loop_io_worker_job_init(&self->io_job);
690 self->io_job.user_data = self;
691 self->io_job.work = (void (*)(void *, GIOCondition)) log_reader_work_perform;
692 self->io_job.completion = (void (*)(void *)) log_reader_work_finished;
693 self->io_job.engage = (void (*)(void *)) log_pipe_ref;
694 self->io_job.release = (void (*)(void *)) log_pipe_unref;
695 }
696
697 static void
log_reader_free(LogPipe * s)698 log_reader_free(LogPipe *s)
699 {
700 LogReader *self = (LogReader *) s;
701
702 if (self->proto)
703 {
704 log_proto_server_free(self->proto);
705 self->proto = NULL;
706 }
707 if (self->poll_events)
708 poll_events_free(self->poll_events);
709
710 log_pipe_unref(self->control);
711 g_sockaddr_unref(self->peer_addr);
712 g_sockaddr_unref(self->local_addr);
713 g_mutex_clear(&self->pending_close_lock);
714 g_cond_clear(&self->pending_close_cond);
715 log_source_free(s);
716 }
717
718 static void
_schedule_dynamic_window_realloc(LogSource * s)719 _schedule_dynamic_window_realloc(LogSource *s)
720 {
721 LogReader *self = (LogReader *)s;
722
723 msg_trace("LogReader::dynamic_window_realloc called");
724
725 if (self->io_job.working)
726 {
727 self->realloc_window_after_fetch = TRUE;
728 return;
729 }
730
731 log_source_dynamic_window_realloc(s);
732 }
733
734
735 LogReader *
log_reader_new(GlobalConfig * cfg)736 log_reader_new(GlobalConfig *cfg)
737 {
738 LogReader *self = g_new0(LogReader, 1);
739
740 log_source_init_instance(&self->super, cfg);
741 self->super.super.init = log_reader_init;
742 self->super.super.deinit = log_reader_deinit;
743 self->super.super.free_fn = log_reader_free;
744 self->super.wakeup = log_reader_wakeup;
745 self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
746 self->immediate_check = FALSE;
747 log_reader_init_watches(self);
748 g_mutex_init(&self->pending_close_lock);
749 g_cond_init(&self->pending_close_cond);
750 return self;
751 }
752
753 /****************************************************************************
754 * LogReaderOptions defaults/init/destroy
755 ***************************************************************************/
756
757 void
log_reader_options_defaults(LogReaderOptions * options)758 log_reader_options_defaults(LogReaderOptions *options)
759 {
760 log_source_options_defaults(&options->super);
761 log_proto_server_options_defaults(&options->proto_options.super);
762 msg_format_options_defaults(&options->parse_options);
763 options->fetch_limit = 10;
764 }
765
766 /*
767 * NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy
768 *
769 * Rationale:
770 * - init is called from driver init (e.g. affile_sd_init),
771 * - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_sd_deinit)
772 *
773 * The reason:
774 * - when initializing the reloaded configuration fails for some reason,
775 * we have to fall back to the old configuration, thus we cannot dump
776 * the information stored in the Options structure at deinit time, but
777 * have to recover it when the old configuration is initialized.
778 *
779 * For the reasons above, init and destroy behave the following way:
780 *
781 * - init is idempotent, it can be called multiple times without leaking
782 * memory, and without loss of information
783 * - destroy is only called once, when the options are indeed to be destroyed
784 *
785 * Also important to note is that when init is called multiple times, the
786 * GlobalConfig reference is the same, this means that it is enough to
787 * remember whether init was called already and return w/o doing anything in
788 * that case, which is actually how idempotency is implemented here.
789 */
790 void
log_reader_options_init(LogReaderOptions * options,GlobalConfig * cfg,const gchar * group_name)791 log_reader_options_init(LogReaderOptions *options, GlobalConfig *cfg, const gchar *group_name)
792 {
793 if (options->initialized)
794 return;
795
796 log_source_options_init(&options->super, cfg, group_name);
797 log_proto_server_options_init(&options->proto_options.super, cfg);
798 msg_format_options_init(&options->parse_options, cfg);
799
800 if (options->check_hostname == -1)
801 options->check_hostname = cfg->check_hostname;
802 if (options->check_hostname)
803 {
804 options->parse_options.flags |= LP_CHECK_HOSTNAME;
805 }
806 if (!options->super.keep_timestamp)
807 {
808 options->parse_options.flags |= LP_NO_PARSE_DATE;
809 }
810 if (options->parse_options.default_pri == 0xFFFF)
811 {
812 if (options->flags & LR_KERNEL)
813 options->parse_options.default_pri = LOG_KERN | LOG_NOTICE;
814 else
815 options->parse_options.default_pri = LOG_USER | LOG_NOTICE;
816 }
817 if (options->proto_options.super.encoding)
818 options->parse_options.flags |= LP_ASSUME_UTF8;
819 if (cfg->threaded)
820 options->flags |= LR_THREADED;
821 options->initialized = TRUE;
822 }
823
824 void
log_reader_options_destroy(LogReaderOptions * options)825 log_reader_options_destroy(LogReaderOptions *options)
826 {
827 log_source_options_destroy(&options->super);
828 log_proto_server_options_destroy(&options->proto_options.super);
829 msg_format_options_destroy(&options->parse_options);
830 options->initialized = FALSE;
831 }
832
833 CfgFlagHandler log_reader_flag_handlers[] =
834 {
835 /* NOTE: underscores are automatically converted to dashes */
836
837 /* LogReaderOptions */
838 { "kernel", CFH_SET, offsetof(LogReaderOptions, flags), LR_KERNEL },
839 { "empty-lines", CFH_SET, offsetof(LogReaderOptions, flags), LR_EMPTY_LINES },
840 { "threaded", CFH_SET, offsetof(LogReaderOptions, flags), LR_THREADED },
841 { "ignore-aux-data", CFH_SET, offsetof(LogReaderOptions, flags), LR_IGNORE_AUX_DATA },
842 { NULL },
843 };
844
845 gboolean
log_reader_options_process_flag(LogReaderOptions * options,const gchar * flag)846 log_reader_options_process_flag(LogReaderOptions *options, const gchar *flag)
847 {
848 if (!msg_format_options_process_flag(&options->parse_options, flag))
849 return cfg_process_flag(log_reader_flag_handlers, options, flag);
850 return TRUE;
851 }
852