1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logwriter.h"
26 #include "messages.h"
27 #include "stats/stats-registry.h"
28 #include "stats/aggregator/stats-aggregator-registry.h"
29 #include "stats/stats-cluster-single.h"
30 #include "stats/aggregator/stats-aggregator.h"
31 #include "hostname.h"
32 #include "host-resolve.h"
33 #include "seqnum.h"
34 #include "str-utils.h"
35 #include "find-crlf.h"
36 #include "mainloop.h"
37 #include "mainloop-io-worker.h"
38 #include "mainloop-call.h"
39 #include "ml-batched-timer.h"
40 #include "str-format.h"
41 #include "scratch-buffers.h"
42 #include "timeutils/format.h"
43 #include "timeutils/misc.h"
44 
45 #include <assert.h>
46 #include <string.h>
47 #include <stdlib.h>
48 #include <sys/stat.h>
49 #include <unistd.h>
50 #include <iv.h>
51 #include <iv_event.h>
52 #include <iv_work.h>
53 
54 typedef enum
55 {
56   /* flush modes */
57 
58   /* business as usual, flush when the buffer is full */
59   LW_FLUSH_NORMAL,
60   /* flush the buffer immediately please */
61   LW_FLUSH_FORCE,
62 } LogWriterFlushMode;
63 
64 struct _LogWriter
65 {
66   LogPipe super;
67   LogQueue *queue;
68   guint32 flags:31;
69   gint32 seq_num;
70   gboolean partial_write;
71   StatsCounterItem *dropped_messages;
72   StatsCounterItem *suppressed_messages;
73   StatsCounterItem *processed_messages;
74   StatsCounterItem *written_messages;
75   StatsAggregator *max_message_size;
76   StatsAggregator *average_messages_size;
77   StatsAggregator *CPS;
78   struct
79   {
80     StatsCounterItem *count;
81     StatsCounterItem *bytes;
82   } truncated;
83   LogPipe *control;
84   LogWriterOptions *options;
85   LogMessage *last_msg;
86   guint32 last_msg_count;
87   GString *line_buffer;
88 
89   gchar *stats_id;
90   gchar *stats_instance;
91 
92   struct iv_fd fd_watch;
93   struct iv_timer suspend_timer;
94   struct iv_task immed_io_task;
95   struct iv_event queue_filled;
96   MainLoopIOWorkerJob io_job;
97   GMutex suppress_lock;
98   MlBatchedTimer suppress_timer;
99   MlBatchedTimer mark_timer;
100   struct iv_timer reopen_timer;
101   struct iv_timer idle_timer;
102   gboolean work_result;
103   gint pollable_state;
104   LogProtoClient *proto, *pending_proto;
105   gboolean watches_running:1, suspended:1, waiting_for_throttle:1;
106   gboolean pending_proto_present;
107   GCond pending_proto_cond;
108   GMutex pending_proto_lock;
109 };
110 
111 /**
112  * LogWriter behaviour
113  * ~~~~~~~~~~~~~~~~~~~
114  *
115  * LogWriter is a core element of syslog-ng sending messages out to some
116  * kind of destination represented by a UNIX fd. Outgoing messages are sent
117  * to the target asynchronously, first by placing them to a queue and then
118  * sending messages when poll() indicates that the fd is writable.
119  *
120  *
121  * Flow control
122  * ------------
123  * For a simple log writer without a disk buffer messages are placed on a
124  * GQueue and they are acknowledged when the send() system call returned
125  * success. This is more complex when disk buffering is used, in which case
126  * messages are put to the "disk buffer" first and acknowledged immediately.
127  * (this way the reader never stops when the disk buffer area is not yet
128  * full). When disk buffer reaches its limit, messages are added to the
129  * usual GQueue and messages get acknowledged when they are moved to the
130  * disk buffer.
131  *
132  **/
133 
134 static gboolean log_writer_process_out(LogWriter *self);
135 static gboolean log_writer_process_in(LogWriter *self);
136 static void log_writer_broken(LogWriter *self, gint notify_code);
137 static void log_writer_start_watches(LogWriter *self);
138 static void log_writer_stop_watches(LogWriter *self);
139 static void log_writer_stop_idle_timer(LogWriter *self);
140 static void log_writer_update_watches(LogWriter *self);
141 static void log_writer_suspend(LogWriter *self);
142 static void log_writer_free_proto(LogWriter *self);
143 static void log_writer_set_proto(LogWriter *self, LogProtoClient *proto);
144 static void log_writer_set_pending_proto(LogWriter *self, LogProtoClient *proto, gboolean present);
145 
146 
147 static void
log_writer_msg_ack(gint num_msg_acked,gpointer user_data)148 log_writer_msg_ack(gint num_msg_acked, gpointer user_data)
149 {
150   LogWriter *self = (LogWriter *)user_data;
151   log_queue_ack_backlog(self->queue, num_msg_acked);
152 }
153 
154 void
log_writer_msg_rewind(LogWriter * self)155 log_writer_msg_rewind(LogWriter *self)
156 {
157   log_queue_rewind_backlog_all(self->queue);
158 }
159 
160 static void
log_writer_msg_rewind_cb(gpointer user_data)161 log_writer_msg_rewind_cb(gpointer user_data)
162 {
163   LogWriter *self = (LogWriter *)user_data;
164   log_writer_msg_rewind(self);
165 }
166 
167 void
log_writer_set_flags(LogWriter * self,guint32 flags)168 log_writer_set_flags(LogWriter *self, guint32 flags)
169 {
170   g_assert((self->super.flags & PIF_INITIALIZED) == 0);
171   self->flags = flags;
172 }
173 
174 guint32
log_writer_get_flags(LogWriter * self)175 log_writer_get_flags(LogWriter *self)
176 {
177   return self->flags;
178 }
179 
180 /* returns a reference */
181 LogQueue *
log_writer_get_queue(LogWriter * s)182 log_writer_get_queue(LogWriter *s)
183 {
184   LogWriter *self = (LogWriter *) s;
185 
186   return log_queue_ref(self->queue);
187 }
188 
189 /* consumes the reference */
190 void
log_writer_set_queue(LogWriter * self,LogQueue * queue)191 log_writer_set_queue(LogWriter *self, LogQueue *queue)
192 {
193   log_queue_unref(self->queue);
194   self->queue = log_queue_ref(queue);
195   log_queue_set_use_backlog(self->queue, TRUE);
196 }
197 
198 static void
log_writer_work_perform(gpointer s,GIOCondition cond)199 log_writer_work_perform(gpointer s, GIOCondition cond)
200 {
201   LogWriter *self = (LogWriter *) s;
202 
203   g_assert((self->super.flags & PIF_INITIALIZED) != 0);
204   g_assert((cond == G_IO_OUT) || (cond == G_IO_IN));
205 
206   if (cond == G_IO_OUT)
207     self->work_result = log_writer_process_out(self);
208   else if (cond == G_IO_IN)
209     self->work_result = log_writer_process_in(self);
210 }
211 
212 static void
log_writer_work_finished(gpointer s)213 log_writer_work_finished(gpointer s)
214 {
215   LogWriter *self = (LogWriter *) s;
216 
217   main_loop_assert_main_thread();
218   self->waiting_for_throttle = FALSE;
219 
220   if (self->pending_proto_present)
221     {
222       /* pending proto is only set in the main thread, so no need to
223        * lock it before coming here. After we're syncing with the
224        * log_writer_reopen() call, quite possibly coming from a
225        * non-main thread. */
226 
227       g_mutex_lock(&self->pending_proto_lock);
228       log_writer_free_proto(self);
229 
230       log_writer_set_proto(self, self->pending_proto);
231       log_writer_set_pending_proto(self, NULL, FALSE);
232 
233       g_cond_signal(&self->pending_proto_cond);
234       g_mutex_unlock(&self->pending_proto_lock);
235     }
236 
237   if (!self->work_result)
238     {
239       log_writer_broken(self, NC_WRITE_ERROR);
240       if (self->proto)
241         {
242           log_writer_suspend(self);
243           msg_notice("Suspending write operation because of an I/O error",
244                      evt_tag_int("fd", log_proto_client_get_fd(self->proto)),
245                      evt_tag_int("time_reopen", self->options->time_reopen));
246         }
247       return;
248     }
249 
250   if ((self->super.flags & PIF_INITIALIZED) && self->proto)
251     {
252       /* reenable polling the source, but only if we're still initialized */
253       log_writer_start_watches(self);
254     }
255 }
256 
257 static void
log_writer_io_handler(gpointer s,GIOCondition cond)258 log_writer_io_handler(gpointer s, GIOCondition cond)
259 {
260   LogWriter *self = (LogWriter *) s;
261 
262   main_loop_assert_main_thread();
263 
264   log_writer_stop_watches(self);
265   if ((self->options->options & LWO_THREADED))
266     {
267       main_loop_io_worker_job_submit(&self->io_job, cond);
268     }
269   else
270     {
271       /* Checking main_loop_io_worker_job_quit() helps to speed up the
272        * reload process.  If reload/shutdown is requested we shouldn't do
273        * anything here, a final flush will be attempted in
274        * log_writer_deinit().
275        *
276        * Our current understanding is that it doesn't prevent race
277        * conditions of any kind.
278        */
279 
280       if (!main_loop_worker_job_quit())
281         {
282           log_writer_work_perform(s, cond);
283           log_writer_work_finished(s);
284         }
285     }
286 }
287 
288 static void
log_writer_io_handle_out(gpointer s)289 log_writer_io_handle_out(gpointer s)
290 {
291   log_writer_io_handler(s, G_IO_OUT);
292 }
293 
294 static void
log_writer_io_handle_in(gpointer s)295 log_writer_io_handle_in(gpointer s)
296 {
297   log_writer_io_handler(s, G_IO_IN);
298 }
299 
300 static void
log_writer_io_error(gpointer s)301 log_writer_io_error(gpointer s)
302 {
303   LogWriter *self = (LogWriter *) s;
304 
305   if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
306     {
307       msg_debug("POLLERR occurred while idle",
308                 evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
309       log_writer_broken(self, NC_WRITE_ERROR);
310       return;
311     }
312   else
313     {
314       /* in case we have an error state but we also asked for read/write
315        * polling, the error should be handled by the I/O callback.  But we
316        * need not call that explicitly as ivykis does that for us.  */
317     }
318   log_writer_update_watches(self);
319 }
320 
321 static void
log_writer_io_check_eof(gpointer s)322 log_writer_io_check_eof(gpointer s)
323 {
324   LogWriter *self = (LogWriter *) s;
325 
326   msg_error("EOF occurred while idle",
327             evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
328   log_writer_broken(self, NC_CLOSE);
329 }
330 
331 static void
log_writer_error_suspend_elapsed(gpointer s)332 log_writer_error_suspend_elapsed(gpointer s)
333 {
334   LogWriter *self = (LogWriter *) s;
335 
336   self->suspended = FALSE;
337   msg_notice("Error suspend timeout has elapsed, attempting to write again",
338              evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
339   log_writer_start_watches(self);
340 }
341 
342 static void
log_writer_update_fd_callbacks(LogWriter * self,GIOCondition cond)343 log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
344 {
345   main_loop_assert_main_thread();
346   if (self->pollable_state > 0)
347     {
348       if (cond & G_IO_IN)
349         iv_fd_set_handler_in(&self->fd_watch, log_writer_io_handle_in);
350       else if (self->flags & LW_DETECT_EOF)
351         iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
352       else
353         iv_fd_set_handler_in(&self->fd_watch, NULL);
354 
355       if (cond & G_IO_OUT)
356         iv_fd_set_handler_out(&self->fd_watch, log_writer_io_handle_out);
357       else
358         iv_fd_set_handler_out(&self->fd_watch, NULL);
359 
360       iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
361     }
362   else
363     {
364       /* fd is not pollable, assume it is always writable */
365       if (cond & G_IO_OUT)
366         {
367           if (!iv_task_registered(&self->immed_io_task))
368             iv_task_register(&self->immed_io_task);
369         }
370       else if (iv_task_registered(&self->immed_io_task))
371         {
372           iv_task_unregister(&self->immed_io_task);
373         }
374     }
375 }
376 
377 static void
log_writer_stop_suspend_timer(LogWriter * self)378 log_writer_stop_suspend_timer(LogWriter *self)
379 {
380   if (iv_timer_registered(&self->suspend_timer))
381     iv_timer_unregister(&self->suspend_timer);
382 }
383 
384 static void
log_writer_arm_suspend_timer(LogWriter * self,void (* handler)(void *),glong timeout_msec)385 log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), glong timeout_msec)
386 {
387   main_loop_assert_main_thread();
388 
389   log_writer_stop_suspend_timer(self);
390 
391   iv_validate_now();
392   self->suspend_timer.handler = handler;
393   self->suspend_timer.expires = iv_now;
394   timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
395   iv_timer_register(&self->suspend_timer);
396 }
397 
398 static void
log_writer_queue_filled(gpointer s)399 log_writer_queue_filled(gpointer s)
400 {
401   LogWriter *self = (LogWriter *) s;
402 
403   main_loop_assert_main_thread();
404 
405   /*
406    * NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
407    * is the right scenario, but the race was closed.  So take this with a
408    * grain of salt.
409    *
410    * The queue_filled callback is running in the main thread. Because of the
411    * possible delay caused by iv_event_post() the callback might be
412    * delivered event after stop_watches() has been called.
413    *
414    *   - log_writer_schedule_update_watches() is called by the reader
415    *     thread, which calls iv_event_post()
416    *   - the main thread calls stop_watches() in work_perform
417    *   - the event is delivered in the main thread
418    *
419    * But since stop/start watches always run in the main thread and we do
420    * too, we can check if this is the case.  A LogWriter without watches
421    * running is busy writing out data to the destination, e.g.  a
422    * start_watches is to be expected once log_writer_work_finished() is run
423    * at the end of the deferred work, executed by the I/O threads.
424    */
425   if (self->watches_running)
426     log_writer_update_watches((LogWriter *) s);
427 }
428 
429 /* NOTE: runs in the source thread */
430 static void
log_writer_schedule_update_watches(LogWriter * self)431 log_writer_schedule_update_watches(LogWriter *self)
432 {
433   iv_event_post(&self->queue_filled);
434 }
435 
436 static void
log_writer_suspend(LogWriter * self)437 log_writer_suspend(LogWriter *self)
438 {
439   /* flush code indicates that we need to suspend our writing activities
440    * until time_reopen elapses */
441 
442   log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1000L);
443   self->suspended = TRUE;
444 }
445 
446 static void
log_writer_update_watches(LogWriter * self)447 log_writer_update_watches(LogWriter *self)
448 {
449   gint fd;
450   GIOCondition cond = 0;
451   gint timeout_msec = 0;
452   gint idle_timeout = -1;
453 
454   main_loop_assert_main_thread();
455 
456   log_writer_stop_idle_timer(self);
457 
458   /* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
459 
460   if (log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout) ||
461       self->waiting_for_throttle ||
462       log_queue_check_items(self->queue, &timeout_msec,
463                             (LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
464     {
465       /* flush_lines number of element is already available and throttle would permit us to send. */
466       log_writer_update_fd_callbacks(self, cond);
467     }
468   else if (timeout_msec)
469     {
470       /* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
471 
472       log_writer_update_fd_callbacks(self, 0);
473       self->waiting_for_throttle = TRUE;
474       log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, (glong)timeout_msec);
475     }
476   else
477     {
478       /* no elements or no throttle space, wait for a wakeup by the queue
479        * when the required number of items are added.  see the
480        * log_queue_check_items and its parallel_push argument above
481        */
482       log_writer_update_fd_callbacks(self, 0);
483     }
484 
485   if (idle_timeout > 0)
486     {
487       iv_validate_now();
488 
489       self->idle_timer.expires = iv_now;
490       self->idle_timer.expires.tv_sec += idle_timeout;
491 
492       iv_timer_register(&self->idle_timer);
493     }
494 }
495 
496 static gboolean
is_file_regular(gint fd)497 is_file_regular(gint fd)
498 {
499   struct stat st;
500 
501   if (fstat(fd, &st) >= 0)
502     {
503       return S_ISREG(st.st_mode);
504     }
505 
506   /* if stat fails, that's interesting, but we should probably poll
507    * it, hopefully that's less likely to cause spinning */
508 
509   return FALSE;
510 }
511 
512 static void
log_writer_start_watches(LogWriter * self)513 log_writer_start_watches(LogWriter *self)
514 {
515   gint fd;
516   GIOCondition cond;
517   gint idle_timeout = -1;
518 
519   if (self->watches_running)
520     return;
521 
522   log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout);
523 
524   self->fd_watch.fd = fd;
525 
526   if (self->pollable_state < 0)
527     {
528       if (is_file_regular(fd))
529         self->pollable_state = 0;
530       else
531         self->pollable_state = !iv_fd_register_try(&self->fd_watch);
532     }
533   else if (self->pollable_state > 0)
534     iv_fd_register(&self->fd_watch);
535 
536   log_writer_update_watches(self);
537   self->watches_running = TRUE;
538 }
539 
540 static void
log_writer_stop_watches(LogWriter * self)541 log_writer_stop_watches(LogWriter *self)
542 {
543   if (self->watches_running)
544     {
545       if (iv_timer_registered(&self->reopen_timer))
546         iv_timer_unregister(&self->reopen_timer);
547 
548       if (iv_fd_registered(&self->fd_watch))
549         iv_fd_unregister(&self->fd_watch);
550       if (iv_task_registered(&self->immed_io_task))
551         iv_task_unregister(&self->immed_io_task);
552 
553       log_queue_reset_parallel_push(self->queue);
554 
555       self->watches_running = FALSE;
556     }
557 
558   log_writer_stop_suspend_timer(self);
559   log_writer_stop_idle_timer(self);
560 }
561 
562 static void
log_writer_stop_idle_timer(LogWriter * self)563 log_writer_stop_idle_timer(LogWriter *self)
564 {
565   if (iv_timer_registered(&self->idle_timer))
566     iv_timer_unregister(&self->idle_timer);
567 }
568 
569 static void
log_writer_arm_suppress_timer(LogWriter * self)570 log_writer_arm_suppress_timer(LogWriter *self)
571 {
572   ml_batched_timer_postpone(&self->suppress_timer, self->options->suppress);
573 }
574 
575 /**
576  * Remember the last message for dup detection.
577  *
578  * NOTE: suppress_lock must be held.
579  **/
580 static void
log_writer_record_last_message(LogWriter * self,LogMessage * lm)581 log_writer_record_last_message(LogWriter *self, LogMessage *lm)
582 {
583   if (self->last_msg)
584     log_msg_unref(self->last_msg);
585 
586   log_msg_ref(lm);
587   self->last_msg = lm;
588   self->last_msg_count = 0;
589 }
590 
591 /*
592  * NOTE: suppress_lock must be held.
593  */
594 static void
log_writer_release_last_message(LogWriter * self)595 log_writer_release_last_message(LogWriter *self)
596 {
597   if (self->last_msg)
598     log_msg_unref(self->last_msg);
599 
600   self->last_msg = NULL;
601   self->last_msg_count = 0;
602 }
603 
604 /*
605  * NOTE: suppress_lock must be held.
606  */
607 static void
log_writer_emit_suppress_summary(LogWriter * self)608 log_writer_emit_suppress_summary(LogWriter *self)
609 {
610   LogMessage *m;
611   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
612   gchar buf[1024];
613   gssize len;
614   const gchar *p;
615 
616   msg_debug("Suppress timer elapsed, emitting suppression summary");
617 
618   len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, suppressed by syslog-ng on %s",
619                    log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL),
620                    self->last_msg_count,
621                    get_local_hostname_fqdn());
622 
623   m = log_msg_new_internal(self->last_msg->pri, buf);
624 
625   p = log_msg_get_value(self->last_msg, LM_V_HOST, &len);
626   log_msg_set_value(m, LM_V_HOST, p, len);
627   p = log_msg_get_value(self->last_msg, LM_V_PROGRAM, &len);
628   log_msg_set_value(m, LM_V_PROGRAM, p, len);
629 
630   path_options.ack_needed = FALSE;
631 
632   log_queue_push_tail(self->queue, m, &path_options);
633   log_writer_release_last_message(self);
634 }
635 
636 static gboolean
log_writer_suppress_timeout(gpointer pt)637 log_writer_suppress_timeout(gpointer pt)
638 {
639   LogWriter *self = (LogWriter *) pt;
640 
641   main_loop_assert_main_thread();
642 
643   /* NOTE: this will probably do nothing as we are the timer callback, but
644    * we may not do it with the suppress_lock held */
645   ml_batched_timer_cancel(&self->suppress_timer);
646   g_mutex_lock(&self->suppress_lock);
647 
648   /* NOTE: we may be waken up an extra time if the suppress_timer setup race
649    * is lost, see the comment at log_writer_is_msg_suppressed() for an
650    * explanation */
651   if (self->last_msg_count > 0)
652     log_writer_emit_suppress_summary(self);
653   g_mutex_unlock(&self->suppress_lock);
654 
655   return FALSE;
656 }
657 
658 static gboolean
_is_message_a_mark(LogMessage * msg)659 _is_message_a_mark(LogMessage *msg)
660 {
661   gssize msg_len;
662   const gchar *value = log_msg_get_value(msg, LM_V_MESSAGE, &msg_len);
663   return strncmp(value, "-- MARK --", msg_len) == 0;
664 }
665 
666 static gboolean
_is_message_a_repetition(LogMessage * msg,LogMessage * last)667 _is_message_a_repetition(LogMessage *msg, LogMessage *last)
668 {
669   return strcmp(log_msg_get_value(last, LM_V_MESSAGE, NULL), log_msg_get_value(msg, LM_V_MESSAGE, NULL)) == 0 &&
670          strcmp(log_msg_get_value(last, LM_V_HOST, NULL), log_msg_get_value(msg, LM_V_HOST, NULL)) == 0 &&
671          strcmp(log_msg_get_value(last, LM_V_PROGRAM, NULL), log_msg_get_value(msg, LM_V_PROGRAM, NULL)) == 0 &&
672          strcmp(log_msg_get_value(last, LM_V_PID, NULL), log_msg_get_value(msg, LM_V_PID, NULL)) == 0;
673 }
674 
675 static gboolean
_is_time_within_the_suppress_timeout(LogWriter * self,LogMessage * msg)676 _is_time_within_the_suppress_timeout(LogWriter *self, LogMessage *msg)
677 {
678   return self->last_msg->timestamps[LM_TS_RECVD].ut_sec >= msg->timestamps[LM_TS_RECVD].ut_sec - self->options->suppress;
679 }
680 
681 /**
682  * log_writer_is_msg_suppressed:
683  *
684  * This function is called to suppress duplicate messages from a given host.
685  *
686  * Locking notes:
687  *
688  * There's a strict ordering requirement between suppress_lock and
689  * interacting with the main loop (which ml_batched_timer beind
690  * suppress_timer is doing).
691  *
692  * The reason is that the main thread (running
693  * the main loop) sometimes acquires suppress_lock (at suppress timer
694  * expiration) and while blocking on suppress_lock it cannot service
695  * main_loop_calls()
696  *
697  * This function makes it sure that ml_batched_timer_update/cancel calls are
698  * only done with the suppress lock released.
699  *
700  * If we do this, we might have a few unfortunate side effects due to races
701  * that we also try to handle:
702  *
703  * Two messages race, one of these matches the recorded last message,
704  * the other doesn't. In this case, moving the update on the suppress_timer
705  * outside of the lock region might cause two different races:
706  *
707  * 1) matching message comes first, then non-matching
708  *
709  * This case the suppress_lock protected region decides that the suppress
710  * timer needs to fire (#1) and then the other decides that it needs to
711  * be cancelled. (#2)
712  *
713  * If these are processed in order, then we are the same as if the two was
714  * also protected by the mutex (which is ok)
715  *
716  * If they are reversed, e.g. we first cancels the timer and the second arms it,
717  * then we might have a timer wakeup which will find no suppressed messages
718  * to report (as the non-matching message will set last_msg_count to zero). This
719  * spurious wakeup should be handled by the expiration callback.
720  *
721  * 1) non-matching message comes first, then matching
722  *
723  * This is simply a message reordering case, e.g. we don't
724  * want any suppressions to be emitted.
725  *
726  * In this case the locked regions finds that neither messages matched
727  * the recorded one, thus both times they decide to cancel the timer, which
728  * is ok. Timer cancellation can be reordered as they will have the same
729  * effect anyway.
730  *
731  * Returns TRUE to indicate that the message is to be suppressed.
732  **/
733 static gboolean
log_writer_is_msg_suppressed(LogWriter * self,LogMessage * lm)734 log_writer_is_msg_suppressed(LogWriter *self, LogMessage *lm)
735 {
736   gboolean need_to_arm_suppress_timer;
737   gboolean need_to_cancel_suppress_timer = FALSE;
738 
739   if (self->options->suppress <= 0)
740     return FALSE;
741 
742   g_mutex_lock(&self->suppress_lock);
743   if (self->last_msg)
744     {
745       if (_is_time_within_the_suppress_timeout(self, lm) &&
746           _is_message_a_repetition(lm, self->last_msg) &&
747           !_is_message_a_mark(lm))
748         {
749 
750           stats_counter_inc(self->suppressed_messages);
751           self->last_msg_count++;
752 
753           /* we only create the timer if this is the first suppressed message, otherwise it is already running. */
754           need_to_arm_suppress_timer = self->last_msg_count == 1;
755           g_mutex_unlock(&self->suppress_lock);
756 
757           /* this has to be outside of suppress_lock */
758           if (need_to_arm_suppress_timer)
759             log_writer_arm_suppress_timer(self);
760 
761           msg_debug("Suppressing duplicate message",
762                     evt_tag_str("host", log_msg_get_value(lm, LM_V_HOST, NULL)),
763                     evt_tag_str("msg", log_msg_get_value(lm, LM_V_MESSAGE, NULL)));
764           return TRUE;
765         }
766       else
767         {
768           if (self->last_msg_count)
769             log_writer_emit_suppress_summary(self);
770           else
771             log_writer_release_last_message(self);
772           need_to_cancel_suppress_timer = TRUE;
773         }
774     }
775 
776   log_writer_record_last_message(self, lm);
777   g_mutex_unlock(&self->suppress_lock);
778   if (need_to_cancel_suppress_timer)
779     ml_batched_timer_cancel(&self->suppress_timer);
780   return FALSE;
781 }
782 
783 static void
log_writer_postpone_mark_timer(LogWriter * self)784 log_writer_postpone_mark_timer(LogWriter *self)
785 {
786   if (self->options->mark_freq > 0)
787     ml_batched_timer_postpone(&self->mark_timer, self->options->mark_freq);
788 }
789 
790 /* this is the callback function that gets called when the MARK timeout
791  * elapsed. It runs in the main thread.
792  */
793 static void
log_writer_mark_timeout(void * cookie)794 log_writer_mark_timeout(void *cookie)
795 {
796   LogWriter *self = (LogWriter *)cookie;
797   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
798   const gchar *hostname;
799   gsize hostname_len;
800   LogMessage *msg;
801 
802   main_loop_assert_main_thread();
803 
804   msg = log_msg_new_mark();
805   path_options.ack_needed = FALSE;
806   /* timeout: there was no new message on the writer or it is in periodical mode */
807   hostname = resolve_sockaddr_to_hostname(&hostname_len, msg->saddr, &self->options->host_resolve_options);
808 
809   log_msg_set_value(msg, LM_V_HOST, hostname, hostname_len);
810 
811   if (!log_writer_is_msg_suppressed(self, msg))
812     {
813       log_queue_push_tail(self->queue, msg, &path_options);
814       stats_counter_inc(self->processed_messages);
815     }
816   else
817     {
818       log_msg_drop(msg, &path_options, AT_PROCESSED);
819     }
820 
821   /* we need to issue another MARK in all mark-mode cases that already
822    * triggered this callback (dst-idle, host-idle, periodical).  The
823    * original setup of the timer is at a different location:
824    *   - log_writer_queue() for "*-idle" modes
825    *   - log_writer_init() for periodical mode
826    */
827   log_writer_postpone_mark_timer(self);
828 }
829 
830 /* NOTE: runs in the reader thread */
831 static void
log_writer_queue(LogPipe * s,LogMessage * lm,const LogPathOptions * path_options)832 log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options)
833 {
834   LogWriter *self = (LogWriter *) s;
835   LogPathOptions local_options;
836   gint mark_mode = self->options->mark_mode;
837 
838   if (!path_options->flow_control_requested &&
839       ((self->proto == NULL || self->suspended) || !(self->flags & LW_SOFT_FLOW_CONTROL)))
840     {
841       /* NOTE: this code ACKs the message back if there's a write error in
842        * order not to hang the client in case of a disk full */
843 
844       path_options = log_msg_break_ack(lm, path_options, &local_options);
845     }
846 
847   if (log_writer_is_msg_suppressed(self, lm))
848     {
849       log_msg_drop(lm, path_options, AT_PROCESSED);
850       return;
851     }
852 
853   if (mark_mode != MM_INTERNAL && (lm->flags & LF_INTERNAL) && (lm->flags & LF_MARK))
854     {
855       /* drop MARK messages generated by internal() in case our mark-mode != internal */
856       log_msg_drop(lm, path_options, AT_PROCESSED);
857       return;
858     }
859 
860   if (mark_mode == MM_DST_IDLE || (mark_mode == MM_HOST_IDLE && !(lm->flags & LF_LOCAL)))
861     {
862       /* in dst-idle and host-idle most, messages postpone the MARK itself */
863       log_writer_postpone_mark_timer(self);
864     }
865 
866   stats_counter_inc(self->processed_messages);
867   log_queue_push_tail(self->queue, lm, path_options);
868 }
869 
870 static void
log_writer_append_value(GString * result,LogMessage * lm,NVHandle handle,gboolean use_nil,gboolean append_space)871 log_writer_append_value(GString *result, LogMessage *lm, NVHandle handle, gboolean use_nil, gboolean append_space)
872 {
873   const gchar *value;
874   gssize value_len;
875 
876   value = log_msg_get_value(lm, handle, &value_len);
877   if (use_nil && value_len == 0)
878     g_string_append_c(result, '-');
879   else
880     {
881       gchar *space;
882 
883       space = strchr(value, ' ');
884 
885       if (!space)
886         g_string_append_len(result, value, value_len);
887       else
888         g_string_append_len(result, value, space - value);
889     }
890   if (append_space)
891     g_string_append_c(result, ' ');
892 }
893 
894 static void
log_writer_do_padding(LogWriter * self,GString * result)895 log_writer_do_padding(LogWriter *self, GString *result)
896 {
897   if (!self->options->padding)
898     return;
899 
900   if(G_UNLIKELY(self->options->padding < result->len))
901     {
902       msg_warning("Padding is too small to hold the full message",
903                   evt_tag_int("padding", self->options->padding),
904                   evt_tag_int("msg_size", result->len));
905       g_string_set_size(result, self->options->padding);
906       return;
907     }
908   /* store the original length of the result */
909   gint len = result->len;
910   gint padd_bytes = self->options->padding - result->len;
911   /* set the size to the padded size, this will allocate the string */
912   g_string_set_size(result, self->options->padding);
913   memset(result->str + len - 1, '\0', padd_bytes);
914 }
915 
916 void
log_writer_format_log(LogWriter * self,LogMessage * lm,GString * result)917 log_writer_format_log(LogWriter *self, LogMessage *lm, GString *result)
918 {
919   LogTemplate *template = NULL;
920   UnixTime *stamp;
921   guint32 seq_num;
922   static NVHandle meta_seqid = 0;
923 
924   if (!meta_seqid)
925     meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
926 
927   if (lm->flags & LF_LOCAL)
928     {
929       seq_num = self->seq_num;
930     }
931   else
932     {
933       const gchar *seqid;
934       gssize seqid_length;
935 
936       seqid = log_msg_get_value(lm, meta_seqid, &seqid_length);
937       APPEND_ZERO(seqid, seqid, seqid_length);
938       if (seqid[0])
939         seq_num = strtol(seqid, NULL, 10);
940       else
941         seq_num = 0;
942     }
943 
944   /* no template was specified, use default */
945   stamp = &lm->timestamps[LM_TS_STAMP];
946 
947   g_string_truncate(result, 0);
948 
949   if ((self->flags & LW_SYSLOG_PROTOCOL) || (self->options->options & LWO_SYSLOG_PROTOCOL))
950     {
951       gssize len;
952 
953       /* we currently hard-wire version 1 */
954       g_string_append_c(result, '<');
955       format_uint32_padded(result, 0, 0, 10, lm->pri);
956       g_string_append_c(result, '>');
957       g_string_append_c(result, '1');
958       g_string_append_c(result, ' ');
959 
960       append_format_unix_time(stamp, result, TS_FMT_ISO,
961                               time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
962                               self->options->template_options.frac_digits);
963       g_string_append_c(result, ' ');
964 
965       log_writer_append_value(result, lm, LM_V_HOST, TRUE, TRUE);
966       log_writer_append_value(result, lm, LM_V_PROGRAM, TRUE, TRUE);
967       log_writer_append_value(result, lm, LM_V_PID, TRUE, TRUE);
968       log_writer_append_value(result, lm, LM_V_MSGID, TRUE, TRUE);
969 
970       len = result->len;
971       log_msg_append_format_sdata(lm, result, seq_num);
972       if (len == result->len)
973         {
974           /* NOTE: sd_param format did not generate any output, take it as an empty SD string */
975           g_string_append_c(result, '-');
976         }
977 
978       if (self->options->template)
979         {
980           g_string_append_c(result, ' ');
981           if (lm->flags & LF_UTF8)
982             g_string_append_len(result, "\xEF\xBB\xBF", 3);
983           LogTemplateEvalOptions options =
984           {
985             &self->options->template_options,
986             LTZ_SEND,
987             seq_num, NULL,
988           };
989 
990           log_template_append_format(self->options->template, lm,
991                                      &options,
992                                      result);
993         }
994       else
995         {
996           const gchar *p;
997 
998           p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
999           g_string_append_c(result, ' ');
1000           if (len != 0)
1001             {
1002               if (lm->flags & LF_UTF8)
1003                 g_string_append_len(result, "\xEF\xBB\xBF", 3);
1004 
1005               g_string_append_len(result, p, len);
1006             }
1007         }
1008       g_string_append_c(result, '\n');
1009       log_writer_do_padding(self, result);
1010     }
1011   else
1012     {
1013 
1014       if (self->options->template)
1015         {
1016           template = self->options->template;
1017         }
1018       else if (self->flags & LW_FORMAT_FILE)
1019         {
1020           template = self->options->file_template;
1021         }
1022       else if ((self->flags & LW_FORMAT_PROTO))
1023         {
1024           template = self->options->proto_template;
1025         }
1026 
1027       if (template)
1028         {
1029           LogTemplateEvalOptions options =
1030           {
1031             &self->options->template_options,
1032             LTZ_SEND,
1033             seq_num, NULL
1034           };
1035 
1036           log_template_format(template, lm, &options, result);
1037 
1038         }
1039       else
1040         {
1041           const gchar *p;
1042           gssize len;
1043 
1044           if (self->flags & LW_FORMAT_FILE)
1045             {
1046               format_unix_time(stamp, result, self->options->template_options.ts_format,
1047                                time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
1048                                self->options->template_options.frac_digits);
1049             }
1050           else if (self->flags & LW_FORMAT_PROTO)
1051             {
1052               g_string_append_c(result, '<');
1053               format_uint32_padded(result, 0, 0, 10, lm->pri);
1054               g_string_append_c(result, '>');
1055 
1056               /* always use BSD timestamp by default, the use can override this using a custom template */
1057               append_format_unix_time(stamp, result, TS_FMT_BSD,
1058                                       time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
1059                                       self->options->template_options.frac_digits);
1060             }
1061           g_string_append_c(result, ' ');
1062 
1063           p = log_msg_get_value(lm, LM_V_HOST, &len);
1064           g_string_append_len(result, p, len);
1065           g_string_append_c(result, ' ');
1066 
1067           p = log_msg_get_value(lm, LM_V_LEGACY_MSGHDR, &len);
1068           if (len > 0)
1069             {
1070               g_string_append_len(result, p, len);
1071             }
1072           else
1073             {
1074               p = log_msg_get_value(lm, LM_V_PROGRAM, &len);
1075               if (len > 0)
1076                 {
1077                   g_string_append_len(result, p, len);
1078                   p = log_msg_get_value(lm, LM_V_PID, &len);
1079                   if (len > 0)
1080                     {
1081                       g_string_append_c(result, '[');
1082                       g_string_append_len(result, p, len);
1083                       g_string_append_c(result, ']');
1084                     }
1085                   g_string_append_len(result, ": ", 2);
1086                 }
1087             }
1088           p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
1089           g_string_append_len(result, p, len);
1090           g_string_append_c(result, '\n');
1091           log_writer_do_padding(self, result);
1092         }
1093     }
1094   if (self->options->options & LWO_NO_MULTI_LINE)
1095     {
1096       gchar *p;
1097 
1098       p = result->str;
1099       /* NOTE: the size is calculated to leave trailing new line */
1100       while ((p = find_cr_or_lf(p, result->str + result->len - p - 1)))
1101         {
1102           *p = ' ';
1103           p++;
1104         }
1105 
1106     }
1107 
1108   if (self->options->truncate_size != -1 && result->len > self->options->truncate_size)
1109     {
1110       const gint truncated_bytes = result->len - self->options->truncate_size;
1111 
1112       g_string_truncate(result, self->options->truncate_size);
1113 
1114       stats_counter_inc(self->truncated.count);
1115       stats_counter_add(self->truncated.bytes, truncated_bytes);
1116     }
1117 }
1118 
1119 static void
log_writer_broken(LogWriter * self,gint notify_code)1120 log_writer_broken(LogWriter *self, gint notify_code)
1121 {
1122   log_writer_stop_watches(self);
1123   log_pipe_notify(self->control, notify_code, self);
1124 }
1125 
1126 static void
log_writer_realloc_line_buffer(LogWriter * self)1127 log_writer_realloc_line_buffer(LogWriter *self)
1128 {
1129   self->line_buffer->str = g_malloc(self->line_buffer->allocated_len);
1130   self->line_buffer->str[0] = 0;
1131   self->line_buffer->len = 0;
1132 }
1133 
1134 /*
1135  * Write messages to the underlying file descriptor using the installed
1136  * LogProtoClient instance.  This is called whenever the output is ready to accept
1137  * further messages, and once during config deinitialization, in order to
1138  * flush messages still in the queue, in the hope that most of them can be
1139  * written out.
1140  *
1141  * In threaded mode, this function is invoked as part of the "output" task
1142  * (in essence, this is the function that performs the output task).
1143  *
1144  */
1145 
1146 static gboolean
log_writer_flush_finalize(LogWriter * self)1147 log_writer_flush_finalize(LogWriter *self)
1148 {
1149   LogProtoStatus status = log_proto_client_flush(self->proto);
1150 
1151   if (status == LPS_SUCCESS || status == LPS_PARTIAL)
1152     return TRUE;
1153 
1154 
1155   return FALSE;
1156 }
1157 
1158 static void
_log_writer_insert_msg_length_stats(LogWriter * self,gsize msg_len)1159 _log_writer_insert_msg_length_stats(LogWriter *self, gsize msg_len)
1160 {
1161   stats_aggregator_insert_data(self->max_message_size, msg_len);
1162   stats_aggregator_insert_data(self->average_messages_size, msg_len);
1163 }
1164 
1165 static gboolean
log_writer_write_message(LogWriter * self,LogMessage * msg,LogPathOptions * path_options,gboolean * write_error)1166 log_writer_write_message(LogWriter *self, LogMessage *msg, LogPathOptions *path_options, gboolean *write_error)
1167 {
1168   gboolean consumed = FALSE;
1169 
1170   *write_error = FALSE;
1171 
1172   log_msg_refcache_start_consumer(msg, path_options);
1173   msg_set_context(msg);
1174 
1175   log_writer_format_log(self, msg, self->line_buffer);
1176 
1177   if (!(msg->flags & LF_INTERNAL))
1178     {
1179       msg_debug("Outgoing message",
1180                 evt_tag_printf("message", "%s", self->line_buffer->str));
1181     }
1182 
1183   gsize msg_len = 0;
1184   if (self->line_buffer->len)
1185     {
1186       msg_len = self->line_buffer->len;
1187       LogProtoStatus status = log_proto_client_post(self->proto, msg, (guchar *)self->line_buffer->str,
1188                                                     self->line_buffer->len,
1189                                                     &consumed);
1190 
1191       self->partial_write = (status == LPS_PARTIAL);
1192 
1193       if (consumed)
1194         log_writer_realloc_line_buffer(self);
1195 
1196       if (status == LPS_ERROR)
1197         {
1198           if ((self->options->options & LWO_IGNORE_ERRORS) != 0)
1199             {
1200               if (!consumed)
1201                 {
1202                   g_free(self->line_buffer->str);
1203                   log_writer_realloc_line_buffer(self);
1204                   consumed = TRUE;
1205                 }
1206             }
1207           else
1208             {
1209               *write_error = TRUE;
1210               consumed = FALSE;
1211             }
1212         }
1213     }
1214   else
1215     {
1216       msg_debug("Error posting log message as template() output resulted in an empty string, skipping message");
1217       consumed = TRUE;
1218     }
1219 
1220   if (consumed)
1221     {
1222       if (msg->flags & LF_LOCAL)
1223         step_sequence_number(&self->seq_num);
1224 
1225       log_msg_unref(msg);
1226       msg_set_context(NULL);
1227       log_msg_refcache_stop();
1228       _log_writer_insert_msg_length_stats(self, msg_len);
1229 
1230       return TRUE;
1231     }
1232   else
1233     {
1234       msg_debug("Can't send the message rewind backlog",
1235                 evt_tag_printf("message", "%s", self->line_buffer->str));
1236 
1237       log_queue_rewind_backlog(self->queue, 1);
1238 
1239       log_msg_unref(msg);
1240       msg_set_context(NULL);
1241       log_msg_refcache_stop();
1242 
1243       return FALSE;
1244     }
1245 }
1246 
1247 static inline LogMessage *
log_writer_queue_pop_message(LogWriter * self,LogPathOptions * path_options,gboolean force_flush)1248 log_writer_queue_pop_message(LogWriter *self, LogPathOptions *path_options, gboolean force_flush)
1249 {
1250   if (force_flush)
1251     return log_queue_pop_head_ignore_throttle(self->queue, path_options);
1252   else
1253     return log_queue_pop_head(self->queue, path_options);
1254 }
1255 
1256 static inline gboolean
log_writer_process_handshake(LogWriter * self)1257 log_writer_process_handshake(LogWriter *self)
1258 {
1259   LogProtoStatus status = log_proto_client_handshake(self->proto);
1260 
1261   if (status != LPS_SUCCESS)
1262     return FALSE;
1263 
1264   return TRUE;
1265 }
1266 
1267 /*
1268  * @flush_mode specifies how hard LogWriter is trying to send messages to
1269  * the actual destination:
1270  *
1271  *
1272  * LW_FLUSH_NORMAL    - business as usual, flush when the buffer is full
1273  * LW_FLUSH_FORCE     - flush the buffer immediately please
1274  *
1275  */
1276 static gboolean
log_writer_flush(LogWriter * self,LogWriterFlushMode flush_mode)1277 log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
1278 {
1279   gboolean write_error = FALSE;
1280 
1281   if (!self->proto)
1282     return FALSE;
1283 
1284   if (log_proto_client_handshake_in_progress(self->proto))
1285     {
1286       return log_writer_process_handshake(self);
1287     }
1288 
1289   /* NOTE: in case we're reloading or exiting we flush all queued items as
1290    * long as the destination can consume it.  This is not going to be an
1291    * infinite loop, since the reader will cease to produce new messages when
1292    * main_loop_io_worker_job_quit() is set. */
1293 
1294   while ((!main_loop_worker_job_quit() || flush_mode == LW_FLUSH_FORCE) && !write_error)
1295     {
1296       LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
1297       LogMessage *msg = log_writer_queue_pop_message(self, &path_options, flush_mode == LW_FLUSH_FORCE);
1298 
1299       if (!msg)
1300         break;
1301 
1302       ScratchBuffersMarker mark;
1303       scratch_buffers_mark(&mark);
1304       if (!log_writer_write_message(self, msg, &path_options, &write_error))
1305         {
1306           scratch_buffers_reclaim_marked(mark);
1307           break;
1308         }
1309       scratch_buffers_reclaim_marked(mark);
1310 
1311       if (!write_error)
1312         stats_counter_inc(self->written_messages);
1313     }
1314 
1315   if (write_error)
1316     return FALSE;
1317 
1318   return log_writer_flush_finalize(self);
1319 }
1320 
1321 static gboolean
log_writer_forced_flush(LogWriter * self)1322 log_writer_forced_flush(LogWriter *self)
1323 {
1324   return log_writer_flush(self, LW_FLUSH_FORCE);
1325 }
1326 
1327 static gboolean
log_writer_process_in(LogWriter * self)1328 log_writer_process_in(LogWriter *self)
1329 {
1330   if (!self->proto)
1331     return FALSE;
1332 
1333   return (log_proto_client_process_in(self->proto) == LPS_SUCCESS);
1334 }
1335 
1336 static gboolean
log_writer_process_out(LogWriter * self)1337 log_writer_process_out(LogWriter *self)
1338 {
1339   return log_writer_flush(self, LW_FLUSH_NORMAL);
1340 }
1341 
1342 static void
log_writer_reopen_timeout(void * cookie)1343 log_writer_reopen_timeout(void *cookie)
1344 {
1345   LogWriter *self = (LogWriter *)cookie;
1346 
1347   log_pipe_notify(self->control, NC_REOPEN_REQUIRED, self);
1348 }
1349 
1350 static void
log_writer_idle_timeout(void * cookie)1351 log_writer_idle_timeout(void *cookie)
1352 {
1353   LogWriter *self = (LogWriter *) cookie;
1354 
1355   g_assert(!self->io_job.working);
1356   msg_verbose("Destination timeout has elapsed, closing connection",
1357               evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
1358 
1359   log_pipe_notify(self->control, NC_CLOSE, self);
1360 }
1361 
1362 static void
log_writer_init_watches(LogWriter * self)1363 log_writer_init_watches(LogWriter *self)
1364 {
1365   IV_FD_INIT(&self->fd_watch);
1366   self->fd_watch.cookie = self;
1367 
1368   IV_TASK_INIT(&self->immed_io_task);
1369   self->immed_io_task.cookie = self;
1370   self->immed_io_task.handler = log_writer_io_handle_out;
1371 
1372   IV_TIMER_INIT(&self->suspend_timer);
1373   self->suspend_timer.cookie = self;
1374 
1375   ml_batched_timer_init(&self->suppress_timer);
1376   self->suppress_timer.cookie = self;
1377   self->suppress_timer.handler = (void (*)(void *)) log_writer_suppress_timeout;
1378   self->suppress_timer.ref_cookie = (gpointer (*)(gpointer)) log_pipe_ref;
1379   self->suppress_timer.unref_cookie = (void (*)(gpointer)) log_pipe_unref;
1380 
1381   ml_batched_timer_init(&self->mark_timer);
1382   self->mark_timer.cookie = self;
1383   self->mark_timer.handler = log_writer_mark_timeout;
1384   self->mark_timer.ref_cookie = (gpointer (*)(gpointer)) log_pipe_ref;
1385   self->mark_timer.unref_cookie = (void (*)(gpointer)) log_pipe_unref;
1386 
1387   IV_TIMER_INIT(&self->reopen_timer);
1388   self->reopen_timer.cookie = self;
1389   self->reopen_timer.handler = log_writer_reopen_timeout;
1390 
1391   IV_TIMER_INIT(&self->idle_timer);
1392   self->idle_timer.cookie = self;
1393   self->idle_timer.handler = log_writer_idle_timeout;
1394 
1395   IV_EVENT_INIT(&self->queue_filled);
1396   self->queue_filled.cookie = self;
1397   self->queue_filled.handler = log_writer_queue_filled;
1398 
1399   main_loop_io_worker_job_init(&self->io_job);
1400   self->io_job.user_data = self;
1401   self->io_job.work = (void (*)(void *, GIOCondition)) log_writer_work_perform;
1402   self->io_job.completion = (void (*)(void *)) log_writer_work_finished;
1403   self->io_job.engage = (void (*)(void *)) log_pipe_ref;
1404   self->io_job.release = (void (*)(void *)) log_pipe_unref;
1405 }
1406 
1407 static void
_register_aggregated_stats(LogWriter * self,StatsClusterKey * sc_key_input,gint stats_type)1408 _register_aggregated_stats(LogWriter *self, StatsClusterKey *sc_key_input, gint stats_type)
1409 {
1410   stats_aggregator_lock();
1411   StatsClusterKey sc_key;
1412 
1413   stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1414                                          self->stats_instance, "msg_size_max");
1415   stats_register_aggregator_maximum(self->options->stats_level, &sc_key, &self->max_message_size);
1416 
1417   stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1418                                          self->stats_instance, "msg_size_avg");
1419   stats_register_aggregator_average(self->options->stats_level, &sc_key, &self->average_messages_size);
1420 
1421   stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1422                                          self->stats_instance, "eps");
1423   stats_register_aggregator_cps(self->options->stats_level, &sc_key, sc_key_input, stats_type, &self->CPS);
1424 
1425   stats_aggregator_unlock();
1426 }
1427 
1428 static void
_unregister_aggregated_stats(LogWriter * self)1429 _unregister_aggregated_stats(LogWriter *self)
1430 {
1431   stats_aggregator_lock();
1432 
1433   stats_unregister_aggregator_maximum(&self->max_message_size);
1434   stats_unregister_aggregator_average(&self->average_messages_size);
1435   stats_unregister_aggregator_cps(&self->CPS);
1436 
1437   stats_aggregator_unlock();
1438 }
1439 
1440 static void
_register_counters(LogWriter * self)1441 _register_counters(LogWriter *self)
1442 {
1443   stats_lock();
1444   StatsClusterKey sc_key;
1445   stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1446                                 self->stats_instance);
1447 
1448   if (self->options->suppress > 0)
1449     stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1450   stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1451   stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1452   stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1453   log_queue_register_stats_counters(self->queue, self->options->stats_level, &sc_key);
1454 
1455   StatsClusterKey sc_key_truncated_count;
1456   stats_cluster_single_key_set_with_name(&sc_key_truncated_count, self->options->stats_source | SCS_DESTINATION,
1457                                          self->stats_id, self->stats_instance, "truncated_count");
1458   stats_register_counter(self->options->stats_level, &sc_key_truncated_count, SC_TYPE_SINGLE_VALUE,
1459                          &self->truncated.count);
1460 
1461   StatsClusterKey sc_key_truncated_bytes;
1462   stats_cluster_single_key_set_with_name(&sc_key_truncated_bytes, self->options->stats_source | SCS_DESTINATION,
1463                                          self->stats_id, self->stats_instance, "truncated_bytes");
1464   stats_register_counter(self->options->stats_level, &sc_key_truncated_bytes, SC_TYPE_SINGLE_VALUE,
1465                          &self->truncated.bytes);
1466 
1467   stats_unlock();
1468   _register_aggregated_stats(self, &sc_key, SC_TYPE_WRITTEN);
1469 }
1470 
1471 static gboolean
log_writer_init(LogPipe * s)1472 log_writer_init(LogPipe *s)
1473 {
1474   LogWriter *self = (LogWriter *) s;
1475 
1476   if (self->queue == NULL)
1477     {
1478       return FALSE;
1479     }
1480   iv_event_register(&self->queue_filled);
1481 
1482   if ((self->options->options & LWO_NO_STATS) == 0 && !self->dropped_messages)
1483     _register_counters(self);
1484 
1485   if (self->proto)
1486     {
1487       LogProtoClient *proto;
1488 
1489       proto = self->proto;
1490       log_writer_set_proto(self, NULL);
1491       log_writer_reopen(self, proto);
1492     }
1493 
1494   if (self->options->mark_mode == MM_PERIODICAL)
1495     {
1496       /* periodical marks should be emitted even if no message is received,
1497        * so we need a timer right from the start */
1498 
1499       log_writer_postpone_mark_timer(self);
1500     }
1501 
1502   return TRUE;
1503 }
1504 
1505 static void
_unregister_counters(LogWriter * self)1506 _unregister_counters(LogWriter *self)
1507 {
1508   stats_lock();
1509   {
1510     StatsClusterKey sc_key;
1511     stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1512                                   self->stats_instance);
1513 
1514     stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1515     stats_unregister_counter(&sc_key, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1516     stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1517     stats_unregister_counter(&sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1518 
1519     StatsClusterKey sc_key_truncated_count;
1520     stats_cluster_single_key_set_with_name(&sc_key_truncated_count, self->options->stats_source | SCS_DESTINATION,
1521                                            self->stats_id, self->stats_instance, "truncated_count");
1522     stats_unregister_counter(&sc_key_truncated_count, SC_TYPE_SINGLE_VALUE, &self->truncated.count);
1523 
1524     StatsClusterKey sc_key_truncated_bytes;
1525     stats_cluster_single_key_set_with_name(&sc_key_truncated_bytes, self->options->stats_source | SCS_DESTINATION,
1526                                            self->stats_id, self->stats_instance, "truncated_bytes");
1527     stats_unregister_counter(&sc_key_truncated_bytes, SC_TYPE_SINGLE_VALUE, &self->truncated.bytes);
1528 
1529     log_queue_unregister_stats_counters(self->queue, &sc_key);
1530 
1531   }
1532   stats_unlock();
1533   _unregister_aggregated_stats(self);
1534 
1535 }
1536 
1537 static gboolean
log_writer_deinit(LogPipe * s)1538 log_writer_deinit(LogPipe *s)
1539 {
1540   LogWriter *self = (LogWriter *) s;
1541 
1542   main_loop_assert_main_thread();
1543 
1544   log_queue_reset_parallel_push(self->queue);
1545   log_writer_forced_flush(self);
1546   /* FIXME: by the time we arrive here, it must be guaranteed that no
1547    * _queue() call is running in a different thread, otherwise we'd need
1548    * some kind of locking. */
1549 
1550   log_writer_stop_watches(self);
1551 
1552   iv_event_unregister(&self->queue_filled);
1553 
1554   if (iv_timer_registered(&self->reopen_timer))
1555     iv_timer_unregister(&self->reopen_timer);
1556 
1557   ml_batched_timer_unregister(&self->suppress_timer);
1558   ml_batched_timer_unregister(&self->mark_timer);
1559 
1560   _unregister_counters(self);
1561 
1562   return TRUE;
1563 }
1564 
1565 static void
log_writer_free(LogPipe * s)1566 log_writer_free(LogPipe *s)
1567 {
1568   LogWriter *self = (LogWriter *) s;
1569 
1570   log_writer_free_proto(self);
1571 
1572   if (self->line_buffer)
1573     g_string_free(self->line_buffer, TRUE);
1574 
1575   log_queue_unref(self->queue);
1576   if (self->last_msg)
1577     log_msg_unref(self->last_msg);
1578   g_free(self->stats_id);
1579   g_free(self->stats_instance);
1580   ml_batched_timer_free(&self->mark_timer);
1581   ml_batched_timer_free(&self->suppress_timer);
1582   g_mutex_clear(&self->suppress_lock);
1583   g_mutex_clear(&self->pending_proto_lock);
1584   g_cond_clear(&self->pending_proto_cond);
1585 
1586   log_pipe_free_method(s);
1587 }
1588 
1589 /* FIXME: this is inherently racy */
1590 gboolean
log_writer_has_pending_writes(LogWriter * self)1591 log_writer_has_pending_writes(LogWriter *self)
1592 {
1593   return !log_queue_is_empty_racy(self->queue) || !self->watches_running;
1594 }
1595 
1596 gboolean
log_writer_opened(LogWriter * self)1597 log_writer_opened(LogWriter *self)
1598 {
1599   return self->proto != NULL;
1600 }
1601 
1602 
1603 static void
log_writer_free_proto(LogWriter * self)1604 log_writer_free_proto(LogWriter *self)
1605 {
1606   if (self->proto)
1607     log_proto_client_free(self->proto);
1608 
1609   self->proto = NULL;
1610 }
1611 
1612 static void
log_writer_set_proto(LogWriter * self,LogProtoClient * proto)1613 log_writer_set_proto(LogWriter *self, LogProtoClient *proto)
1614 {
1615   self->proto = proto;
1616 
1617   if (proto)
1618     {
1619       LogProtoClientFlowControlFuncs flow_control_funcs;
1620       flow_control_funcs.ack_callback = log_writer_msg_ack;
1621       flow_control_funcs.rewind_callback = log_writer_msg_rewind_cb;
1622       flow_control_funcs.user_data = self;
1623 
1624       log_proto_client_set_client_flow_control(self->proto, &flow_control_funcs);
1625       log_proto_client_set_options(self->proto, &self->options->proto_options.super);
1626     }
1627 }
1628 
1629 static void
log_writer_set_pending_proto(LogWriter * self,LogProtoClient * proto,gboolean present)1630 log_writer_set_pending_proto(LogWriter *self, LogProtoClient *proto, gboolean present)
1631 {
1632   self->pending_proto = proto;
1633   self->pending_proto_present = present;
1634 }
1635 
1636 /* run in the main thread in reaction to a log_writer_reopen to change
1637  * the destination LogProtoClient instance. It needs to be ran in the main
1638  * thread as it reregisters the watches associated with the main
1639  * thread. */
1640 static void
log_writer_reopen_deferred(gpointer s)1641 log_writer_reopen_deferred(gpointer s)
1642 {
1643   gpointer *args = (gpointer *) s;
1644   LogWriter *self = args[0];
1645   LogProtoClient *proto = args[1];
1646 
1647   if (!proto)
1648     {
1649       iv_validate_now();
1650 
1651       self->reopen_timer.expires = iv_now;
1652       self->reopen_timer.expires.tv_sec += self->options->time_reopen;
1653 
1654       if (iv_timer_registered(&self->reopen_timer))
1655         iv_timer_unregister(&self->reopen_timer);
1656 
1657       iv_timer_register(&self->reopen_timer);
1658     }
1659 
1660   init_sequence_number(&self->seq_num);
1661 
1662   if (self->io_job.working)
1663     {
1664       /* NOTE: proto can be NULL but it is present... */
1665       log_writer_set_pending_proto(self, proto, TRUE);
1666       return;
1667     }
1668 
1669   log_writer_stop_watches(self);
1670 
1671   if (self->partial_write)
1672     {
1673       log_queue_rewind_backlog_all(self->queue);
1674     }
1675   log_writer_free_proto(self);
1676   log_writer_set_proto(self, proto);
1677 
1678   if (proto)
1679     log_writer_start_watches(self);
1680 }
1681 
1682 /*
1683  * This function can be called from any threads, from the main thread
1684  * as well as I/O worker threads. It takes care about going to the
1685  * main thread to actually switch LogProtoClient under this writer.
1686  *
1687  * The writer may still be operating, (e.g. log_pipe_deinit/init is
1688  * not needed).
1689  *
1690  * In case we're running in a non-main thread, then by the time this
1691  * function returns, the reopen has finished. In case it is called
1692  * from the main thread, this function may defer updating self->proto
1693  * until the worker thread has finished. The reason for this
1694  * difference is:
1695  *
1696  *   - if LogWriter is busy, then updating the LogProtoClient instance is
1697  *     deferred to log_writer_work_finished(), but that runs in the
1698  *     main thread.
1699  *
1700  *   - normally, even this deferred update is waited for, but in case
1701  *     we're in the main thread, we can't block.
1702  *
1703  * This situation could probably be improved, maybe the synchonous
1704  * return of log_writer_reopen() is not needed by call sites, but I
1705  * was not sure, and right before release I didn't want to take the
1706  * risky approach.
1707  */
1708 void
log_writer_reopen(LogWriter * s,LogProtoClient * proto)1709 log_writer_reopen(LogWriter *s, LogProtoClient *proto)
1710 {
1711   LogWriter *self = (LogWriter *) s;
1712   gpointer args[] = { s, proto };
1713 
1714   main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
1715 
1716   if (!main_loop_is_main_thread())
1717     {
1718       g_mutex_lock(&self->pending_proto_lock);
1719       while (self->pending_proto_present)
1720         {
1721           g_cond_wait(&self->pending_proto_cond, &self->pending_proto_lock);
1722         }
1723       g_mutex_unlock(&self->pending_proto_lock);
1724     }
1725 }
1726 
1727 void
log_writer_set_options(LogWriter * self,LogPipe * control,LogWriterOptions * options,const gchar * stats_id,const gchar * stats_instance)1728 log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options,
1729                        const gchar *stats_id, const gchar *stats_instance)
1730 {
1731   self->control = control;
1732   self->options = options;
1733 
1734   if (control)
1735     self->super.expr_node = control->expr_node;
1736 
1737   if (self->stats_id)
1738     g_free(self->stats_id);
1739   self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
1740 
1741   if (self->stats_instance)
1742     g_free(self->stats_instance);
1743   self->stats_instance = stats_instance ? g_strdup(stats_instance) : NULL;
1744 }
1745 
1746 LogWriter *
log_writer_new(guint32 flags,GlobalConfig * cfg)1747 log_writer_new(guint32 flags, GlobalConfig *cfg)
1748 {
1749   LogWriter *self = g_new0(LogWriter, 1);
1750 
1751   log_pipe_init_instance(&self->super, cfg);
1752   self->super.init = log_writer_init;
1753   self->super.deinit = log_writer_deinit;
1754   self->super.queue = log_writer_queue;
1755   self->super.free_fn = log_writer_free;
1756   self->flags = flags;
1757   self->line_buffer = g_string_sized_new(128);
1758   self->pollable_state = -1;
1759   init_sequence_number(&self->seq_num);
1760 
1761   log_writer_init_watches(self);
1762   g_mutex_init(&self->suppress_lock);
1763   g_mutex_init(&self->pending_proto_lock);
1764   g_cond_init(&self->pending_proto_cond);
1765 
1766   log_pipe_add_info(&self->super, "writer");
1767 
1768   return self;
1769 }
1770 
1771 void
log_writer_options_defaults(LogWriterOptions * options)1772 log_writer_options_defaults(LogWriterOptions *options)
1773 {
1774   options->template = NULL;
1775   options->flush_lines = -1;
1776   log_template_options_defaults(&options->template_options);
1777   options->time_reopen = -1;
1778   options->suppress = -1;
1779   options->padding = 0;
1780   options->mark_mode = MM_GLOBAL;
1781   options->mark_freq = -1;
1782   options->truncate_size = -1;
1783   host_resolve_options_defaults(&options->host_resolve_options);
1784 }
1785 
1786 void
log_writer_options_set_template_escape(LogWriterOptions * options,gboolean enable)1787 log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable)
1788 {
1789   if (options->template && options->template->def_inline)
1790     {
1791       log_template_set_escape(options->template, enable);
1792     }
1793   else
1794     {
1795       msg_error("Macro escaping can only be specified for inline templates");
1796     }
1797 }
1798 
1799 void
log_writer_options_set_mark_mode(LogWriterOptions * options,const gchar * mark_mode)1800 log_writer_options_set_mark_mode(LogWriterOptions *options, const gchar *mark_mode)
1801 {
1802   options->mark_mode = cfg_lookup_mark_mode(mark_mode);
1803 }
1804 
1805 /*
1806  * NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy
1807  *
1808  * Rationale:
1809  *   - init is called from driver init (e.g. affile_sd_init),
1810  *   - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_sd_deinit)
1811  *
1812  * The reason:
1813  *   - when initializing the reloaded configuration fails for some reason,
1814  *     we have to fall back to the old configuration, thus we cannot dump
1815  *     the information stored in the Options structure at deinit time, but
1816  *     have to recover it when the old configuration is initialized.
1817  *
1818  * For the reasons above, init and destroy behave the following way:
1819  *
1820  *   - init is idempotent, it can be called multiple times without leaking
1821  *     memory, and without loss of information
1822  *   - destroy is only called once, when the options are indeed to be destroyed
1823  *
1824  * Also important to note is that when init is called multiple times, the
1825  * GlobalConfig reference is the same, this means that it is enough to
1826  * remember whether init was called already and return w/o doing anything in
1827  * that case, which is actually how idempotency is implemented here.
1828  */
1829 void
log_writer_options_init(LogWriterOptions * options,GlobalConfig * cfg,guint32 option_flags)1830 log_writer_options_init(LogWriterOptions *options, GlobalConfig *cfg, guint32 option_flags)
1831 {
1832   if (options->initialized)
1833     return;
1834 
1835   log_template_options_init(&options->template_options, cfg);
1836   host_resolve_options_init(&options->host_resolve_options, &cfg->host_resolve_options);
1837   log_proto_client_options_init(&options->proto_options.super, cfg);
1838   options->options |= option_flags;
1839 
1840   if (options->flush_lines == -1)
1841     options->flush_lines = cfg->flush_lines;
1842   if (options->suppress == -1)
1843     options->suppress = cfg->suppress;
1844   if (options->time_reopen == -1)
1845     options->time_reopen = cfg->time_reopen;
1846   options->file_template = log_template_ref(cfg->file_template);
1847   options->proto_template = log_template_ref(cfg->proto_template);
1848   if (cfg->threaded)
1849     options->options |= LWO_THREADED;
1850   /* per-destination MARK messages */
1851   if (options->mark_mode == MM_GLOBAL)
1852     {
1853       /* get the global option */
1854       options->mark_mode = cfg->mark_mode;
1855     }
1856   if (options->mark_freq == -1)
1857     {
1858       /* not initialized, use the global mark freq */
1859       options->mark_freq = cfg->mark_freq;
1860     }
1861 
1862   options->initialized = TRUE;
1863 }
1864 
1865 void
log_writer_options_destroy(LogWriterOptions * options)1866 log_writer_options_destroy(LogWriterOptions *options)
1867 {
1868   log_template_options_destroy(&options->template_options);
1869   host_resolve_options_destroy(&options->host_resolve_options);
1870   log_proto_client_options_destroy(&options->proto_options.super);
1871   log_template_unref(options->template);
1872   log_template_unref(options->file_template);
1873   log_template_unref(options->proto_template);
1874   options->initialized = FALSE;
1875 }
1876 
1877 gint
log_writer_options_lookup_flag(const gchar * flag)1878 log_writer_options_lookup_flag(const gchar *flag)
1879 {
1880   if (strcmp(flag, "syslog-protocol") == 0)
1881     return LWO_SYSLOG_PROTOCOL;
1882   if (strcmp(flag, "no-multi-line") == 0)
1883     return LWO_NO_MULTI_LINE;
1884   if (strcmp(flag, "threaded") == 0)
1885     return LWO_THREADED;
1886   if (strcmp(flag, "ignore-errors") == 0)
1887     return LWO_IGNORE_ERRORS;
1888   msg_error("Unknown dest writer flag", evt_tag_str("flag", flag));
1889   return 0;
1890 }
1891