1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logwriter.h"
26 #include "messages.h"
27 #include "stats/stats-registry.h"
28 #include "stats/aggregator/stats-aggregator-registry.h"
29 #include "stats/stats-cluster-single.h"
30 #include "stats/aggregator/stats-aggregator.h"
31 #include "hostname.h"
32 #include "host-resolve.h"
33 #include "seqnum.h"
34 #include "str-utils.h"
35 #include "find-crlf.h"
36 #include "mainloop.h"
37 #include "mainloop-io-worker.h"
38 #include "mainloop-call.h"
39 #include "ml-batched-timer.h"
40 #include "str-format.h"
41 #include "scratch-buffers.h"
42 #include "timeutils/format.h"
43 #include "timeutils/misc.h"
44
45 #include <assert.h>
46 #include <string.h>
47 #include <stdlib.h>
48 #include <sys/stat.h>
49 #include <unistd.h>
50 #include <iv.h>
51 #include <iv_event.h>
52 #include <iv_work.h>
53
54 typedef enum
55 {
56 /* flush modes */
57
58 /* business as usual, flush when the buffer is full */
59 LW_FLUSH_NORMAL,
60 /* flush the buffer immediately please */
61 LW_FLUSH_FORCE,
62 } LogWriterFlushMode;
63
64 struct _LogWriter
65 {
66 LogPipe super;
67 LogQueue *queue;
68 guint32 flags:31;
69 gint32 seq_num;
70 gboolean partial_write;
71 StatsCounterItem *dropped_messages;
72 StatsCounterItem *suppressed_messages;
73 StatsCounterItem *processed_messages;
74 StatsCounterItem *written_messages;
75 StatsAggregator *max_message_size;
76 StatsAggregator *average_messages_size;
77 StatsAggregator *CPS;
78 struct
79 {
80 StatsCounterItem *count;
81 StatsCounterItem *bytes;
82 } truncated;
83 LogPipe *control;
84 LogWriterOptions *options;
85 LogMessage *last_msg;
86 guint32 last_msg_count;
87 GString *line_buffer;
88
89 gchar *stats_id;
90 gchar *stats_instance;
91
92 struct iv_fd fd_watch;
93 struct iv_timer suspend_timer;
94 struct iv_task immed_io_task;
95 struct iv_event queue_filled;
96 MainLoopIOWorkerJob io_job;
97 GMutex suppress_lock;
98 MlBatchedTimer suppress_timer;
99 MlBatchedTimer mark_timer;
100 struct iv_timer reopen_timer;
101 struct iv_timer idle_timer;
102 gboolean work_result;
103 gint pollable_state;
104 LogProtoClient *proto, *pending_proto;
105 gboolean watches_running:1, suspended:1, waiting_for_throttle:1;
106 gboolean pending_proto_present;
107 GCond pending_proto_cond;
108 GMutex pending_proto_lock;
109 };
110
111 /**
112 * LogWriter behaviour
113 * ~~~~~~~~~~~~~~~~~~~
114 *
115 * LogWriter is a core element of syslog-ng sending messages out to some
116 * kind of destination represented by a UNIX fd. Outgoing messages are sent
117 * to the target asynchronously, first by placing them to a queue and then
118 * sending messages when poll() indicates that the fd is writable.
119 *
120 *
121 * Flow control
122 * ------------
123 * For a simple log writer without a disk buffer messages are placed on a
124 * GQueue and they are acknowledged when the send() system call returned
125 * success. This is more complex when disk buffering is used, in which case
126 * messages are put to the "disk buffer" first and acknowledged immediately.
127 * (this way the reader never stops when the disk buffer area is not yet
128 * full). When disk buffer reaches its limit, messages are added to the
129 * usual GQueue and messages get acknowledged when they are moved to the
130 * disk buffer.
131 *
132 **/
133
134 static gboolean log_writer_process_out(LogWriter *self);
135 static gboolean log_writer_process_in(LogWriter *self);
136 static void log_writer_broken(LogWriter *self, gint notify_code);
137 static void log_writer_start_watches(LogWriter *self);
138 static void log_writer_stop_watches(LogWriter *self);
139 static void log_writer_stop_idle_timer(LogWriter *self);
140 static void log_writer_update_watches(LogWriter *self);
141 static void log_writer_suspend(LogWriter *self);
142 static void log_writer_free_proto(LogWriter *self);
143 static void log_writer_set_proto(LogWriter *self, LogProtoClient *proto);
144 static void log_writer_set_pending_proto(LogWriter *self, LogProtoClient *proto, gboolean present);
145
146
147 static void
log_writer_msg_ack(gint num_msg_acked,gpointer user_data)148 log_writer_msg_ack(gint num_msg_acked, gpointer user_data)
149 {
150 LogWriter *self = (LogWriter *)user_data;
151 log_queue_ack_backlog(self->queue, num_msg_acked);
152 }
153
154 void
log_writer_msg_rewind(LogWriter * self)155 log_writer_msg_rewind(LogWriter *self)
156 {
157 log_queue_rewind_backlog_all(self->queue);
158 }
159
160 static void
log_writer_msg_rewind_cb(gpointer user_data)161 log_writer_msg_rewind_cb(gpointer user_data)
162 {
163 LogWriter *self = (LogWriter *)user_data;
164 log_writer_msg_rewind(self);
165 }
166
167 void
log_writer_set_flags(LogWriter * self,guint32 flags)168 log_writer_set_flags(LogWriter *self, guint32 flags)
169 {
170 g_assert((self->super.flags & PIF_INITIALIZED) == 0);
171 self->flags = flags;
172 }
173
174 guint32
log_writer_get_flags(LogWriter * self)175 log_writer_get_flags(LogWriter *self)
176 {
177 return self->flags;
178 }
179
180 /* returns a reference */
181 LogQueue *
log_writer_get_queue(LogWriter * s)182 log_writer_get_queue(LogWriter *s)
183 {
184 LogWriter *self = (LogWriter *) s;
185
186 return log_queue_ref(self->queue);
187 }
188
189 /* consumes the reference */
190 void
log_writer_set_queue(LogWriter * self,LogQueue * queue)191 log_writer_set_queue(LogWriter *self, LogQueue *queue)
192 {
193 log_queue_unref(self->queue);
194 self->queue = log_queue_ref(queue);
195 log_queue_set_use_backlog(self->queue, TRUE);
196 }
197
198 static void
log_writer_work_perform(gpointer s,GIOCondition cond)199 log_writer_work_perform(gpointer s, GIOCondition cond)
200 {
201 LogWriter *self = (LogWriter *) s;
202
203 g_assert((self->super.flags & PIF_INITIALIZED) != 0);
204 g_assert((cond == G_IO_OUT) || (cond == G_IO_IN));
205
206 if (cond == G_IO_OUT)
207 self->work_result = log_writer_process_out(self);
208 else if (cond == G_IO_IN)
209 self->work_result = log_writer_process_in(self);
210 }
211
212 static void
log_writer_work_finished(gpointer s)213 log_writer_work_finished(gpointer s)
214 {
215 LogWriter *self = (LogWriter *) s;
216
217 main_loop_assert_main_thread();
218 self->waiting_for_throttle = FALSE;
219
220 if (self->pending_proto_present)
221 {
222 /* pending proto is only set in the main thread, so no need to
223 * lock it before coming here. After we're syncing with the
224 * log_writer_reopen() call, quite possibly coming from a
225 * non-main thread. */
226
227 g_mutex_lock(&self->pending_proto_lock);
228 log_writer_free_proto(self);
229
230 log_writer_set_proto(self, self->pending_proto);
231 log_writer_set_pending_proto(self, NULL, FALSE);
232
233 g_cond_signal(&self->pending_proto_cond);
234 g_mutex_unlock(&self->pending_proto_lock);
235 }
236
237 if (!self->work_result)
238 {
239 log_writer_broken(self, NC_WRITE_ERROR);
240 if (self->proto)
241 {
242 log_writer_suspend(self);
243 msg_notice("Suspending write operation because of an I/O error",
244 evt_tag_int("fd", log_proto_client_get_fd(self->proto)),
245 evt_tag_int("time_reopen", self->options->time_reopen));
246 }
247 return;
248 }
249
250 if ((self->super.flags & PIF_INITIALIZED) && self->proto)
251 {
252 /* reenable polling the source, but only if we're still initialized */
253 log_writer_start_watches(self);
254 }
255 }
256
257 static void
log_writer_io_handler(gpointer s,GIOCondition cond)258 log_writer_io_handler(gpointer s, GIOCondition cond)
259 {
260 LogWriter *self = (LogWriter *) s;
261
262 main_loop_assert_main_thread();
263
264 log_writer_stop_watches(self);
265 if ((self->options->options & LWO_THREADED))
266 {
267 main_loop_io_worker_job_submit(&self->io_job, cond);
268 }
269 else
270 {
271 /* Checking main_loop_io_worker_job_quit() helps to speed up the
272 * reload process. If reload/shutdown is requested we shouldn't do
273 * anything here, a final flush will be attempted in
274 * log_writer_deinit().
275 *
276 * Our current understanding is that it doesn't prevent race
277 * conditions of any kind.
278 */
279
280 if (!main_loop_worker_job_quit())
281 {
282 log_writer_work_perform(s, cond);
283 log_writer_work_finished(s);
284 }
285 }
286 }
287
288 static void
log_writer_io_handle_out(gpointer s)289 log_writer_io_handle_out(gpointer s)
290 {
291 log_writer_io_handler(s, G_IO_OUT);
292 }
293
294 static void
log_writer_io_handle_in(gpointer s)295 log_writer_io_handle_in(gpointer s)
296 {
297 log_writer_io_handler(s, G_IO_IN);
298 }
299
300 static void
log_writer_io_error(gpointer s)301 log_writer_io_error(gpointer s)
302 {
303 LogWriter *self = (LogWriter *) s;
304
305 if (self->fd_watch.handler_out == NULL && self->fd_watch.handler_in == NULL)
306 {
307 msg_debug("POLLERR occurred while idle",
308 evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
309 log_writer_broken(self, NC_WRITE_ERROR);
310 return;
311 }
312 else
313 {
314 /* in case we have an error state but we also asked for read/write
315 * polling, the error should be handled by the I/O callback. But we
316 * need not call that explicitly as ivykis does that for us. */
317 }
318 log_writer_update_watches(self);
319 }
320
321 static void
log_writer_io_check_eof(gpointer s)322 log_writer_io_check_eof(gpointer s)
323 {
324 LogWriter *self = (LogWriter *) s;
325
326 msg_error("EOF occurred while idle",
327 evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
328 log_writer_broken(self, NC_CLOSE);
329 }
330
331 static void
log_writer_error_suspend_elapsed(gpointer s)332 log_writer_error_suspend_elapsed(gpointer s)
333 {
334 LogWriter *self = (LogWriter *) s;
335
336 self->suspended = FALSE;
337 msg_notice("Error suspend timeout has elapsed, attempting to write again",
338 evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
339 log_writer_start_watches(self);
340 }
341
342 static void
log_writer_update_fd_callbacks(LogWriter * self,GIOCondition cond)343 log_writer_update_fd_callbacks(LogWriter *self, GIOCondition cond)
344 {
345 main_loop_assert_main_thread();
346 if (self->pollable_state > 0)
347 {
348 if (cond & G_IO_IN)
349 iv_fd_set_handler_in(&self->fd_watch, log_writer_io_handle_in);
350 else if (self->flags & LW_DETECT_EOF)
351 iv_fd_set_handler_in(&self->fd_watch, log_writer_io_check_eof);
352 else
353 iv_fd_set_handler_in(&self->fd_watch, NULL);
354
355 if (cond & G_IO_OUT)
356 iv_fd_set_handler_out(&self->fd_watch, log_writer_io_handle_out);
357 else
358 iv_fd_set_handler_out(&self->fd_watch, NULL);
359
360 iv_fd_set_handler_err(&self->fd_watch, log_writer_io_error);
361 }
362 else
363 {
364 /* fd is not pollable, assume it is always writable */
365 if (cond & G_IO_OUT)
366 {
367 if (!iv_task_registered(&self->immed_io_task))
368 iv_task_register(&self->immed_io_task);
369 }
370 else if (iv_task_registered(&self->immed_io_task))
371 {
372 iv_task_unregister(&self->immed_io_task);
373 }
374 }
375 }
376
377 static void
log_writer_stop_suspend_timer(LogWriter * self)378 log_writer_stop_suspend_timer(LogWriter *self)
379 {
380 if (iv_timer_registered(&self->suspend_timer))
381 iv_timer_unregister(&self->suspend_timer);
382 }
383
384 static void
log_writer_arm_suspend_timer(LogWriter * self,void (* handler)(void *),glong timeout_msec)385 log_writer_arm_suspend_timer(LogWriter *self, void (*handler)(void *), glong timeout_msec)
386 {
387 main_loop_assert_main_thread();
388
389 log_writer_stop_suspend_timer(self);
390
391 iv_validate_now();
392 self->suspend_timer.handler = handler;
393 self->suspend_timer.expires = iv_now;
394 timespec_add_msec(&self->suspend_timer.expires, timeout_msec);
395 iv_timer_register(&self->suspend_timer);
396 }
397
398 static void
log_writer_queue_filled(gpointer s)399 log_writer_queue_filled(gpointer s)
400 {
401 LogWriter *self = (LogWriter *) s;
402
403 main_loop_assert_main_thread();
404
405 /*
406 * NOTE: This theory is somewhat questionable, e.g. I'm not 100% sure it
407 * is the right scenario, but the race was closed. So take this with a
408 * grain of salt.
409 *
410 * The queue_filled callback is running in the main thread. Because of the
411 * possible delay caused by iv_event_post() the callback might be
412 * delivered event after stop_watches() has been called.
413 *
414 * - log_writer_schedule_update_watches() is called by the reader
415 * thread, which calls iv_event_post()
416 * - the main thread calls stop_watches() in work_perform
417 * - the event is delivered in the main thread
418 *
419 * But since stop/start watches always run in the main thread and we do
420 * too, we can check if this is the case. A LogWriter without watches
421 * running is busy writing out data to the destination, e.g. a
422 * start_watches is to be expected once log_writer_work_finished() is run
423 * at the end of the deferred work, executed by the I/O threads.
424 */
425 if (self->watches_running)
426 log_writer_update_watches((LogWriter *) s);
427 }
428
429 /* NOTE: runs in the source thread */
430 static void
log_writer_schedule_update_watches(LogWriter * self)431 log_writer_schedule_update_watches(LogWriter *self)
432 {
433 iv_event_post(&self->queue_filled);
434 }
435
436 static void
log_writer_suspend(LogWriter * self)437 log_writer_suspend(LogWriter *self)
438 {
439 /* flush code indicates that we need to suspend our writing activities
440 * until time_reopen elapses */
441
442 log_writer_arm_suspend_timer(self, log_writer_error_suspend_elapsed, self->options->time_reopen * 1000L);
443 self->suspended = TRUE;
444 }
445
446 static void
log_writer_update_watches(LogWriter * self)447 log_writer_update_watches(LogWriter *self)
448 {
449 gint fd;
450 GIOCondition cond = 0;
451 gint timeout_msec = 0;
452 gint idle_timeout = -1;
453
454 main_loop_assert_main_thread();
455
456 log_writer_stop_idle_timer(self);
457
458 /* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */
459
460 if (log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout) ||
461 self->waiting_for_throttle ||
462 log_queue_check_items(self->queue, &timeout_msec,
463 (LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL))
464 {
465 /* flush_lines number of element is already available and throttle would permit us to send. */
466 log_writer_update_fd_callbacks(self, cond);
467 }
468 else if (timeout_msec)
469 {
470 /* few elements are available, but less than flush_lines, we need to start a timer to initiate a flush */
471
472 log_writer_update_fd_callbacks(self, 0);
473 self->waiting_for_throttle = TRUE;
474 log_writer_arm_suspend_timer(self, (void (*)(void *)) log_writer_update_watches, (glong)timeout_msec);
475 }
476 else
477 {
478 /* no elements or no throttle space, wait for a wakeup by the queue
479 * when the required number of items are added. see the
480 * log_queue_check_items and its parallel_push argument above
481 */
482 log_writer_update_fd_callbacks(self, 0);
483 }
484
485 if (idle_timeout > 0)
486 {
487 iv_validate_now();
488
489 self->idle_timer.expires = iv_now;
490 self->idle_timer.expires.tv_sec += idle_timeout;
491
492 iv_timer_register(&self->idle_timer);
493 }
494 }
495
496 static gboolean
is_file_regular(gint fd)497 is_file_regular(gint fd)
498 {
499 struct stat st;
500
501 if (fstat(fd, &st) >= 0)
502 {
503 return S_ISREG(st.st_mode);
504 }
505
506 /* if stat fails, that's interesting, but we should probably poll
507 * it, hopefully that's less likely to cause spinning */
508
509 return FALSE;
510 }
511
512 static void
log_writer_start_watches(LogWriter * self)513 log_writer_start_watches(LogWriter *self)
514 {
515 gint fd;
516 GIOCondition cond;
517 gint idle_timeout = -1;
518
519 if (self->watches_running)
520 return;
521
522 log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout);
523
524 self->fd_watch.fd = fd;
525
526 if (self->pollable_state < 0)
527 {
528 if (is_file_regular(fd))
529 self->pollable_state = 0;
530 else
531 self->pollable_state = !iv_fd_register_try(&self->fd_watch);
532 }
533 else if (self->pollable_state > 0)
534 iv_fd_register(&self->fd_watch);
535
536 log_writer_update_watches(self);
537 self->watches_running = TRUE;
538 }
539
540 static void
log_writer_stop_watches(LogWriter * self)541 log_writer_stop_watches(LogWriter *self)
542 {
543 if (self->watches_running)
544 {
545 if (iv_timer_registered(&self->reopen_timer))
546 iv_timer_unregister(&self->reopen_timer);
547
548 if (iv_fd_registered(&self->fd_watch))
549 iv_fd_unregister(&self->fd_watch);
550 if (iv_task_registered(&self->immed_io_task))
551 iv_task_unregister(&self->immed_io_task);
552
553 log_queue_reset_parallel_push(self->queue);
554
555 self->watches_running = FALSE;
556 }
557
558 log_writer_stop_suspend_timer(self);
559 log_writer_stop_idle_timer(self);
560 }
561
562 static void
log_writer_stop_idle_timer(LogWriter * self)563 log_writer_stop_idle_timer(LogWriter *self)
564 {
565 if (iv_timer_registered(&self->idle_timer))
566 iv_timer_unregister(&self->idle_timer);
567 }
568
569 static void
log_writer_arm_suppress_timer(LogWriter * self)570 log_writer_arm_suppress_timer(LogWriter *self)
571 {
572 ml_batched_timer_postpone(&self->suppress_timer, self->options->suppress);
573 }
574
575 /**
576 * Remember the last message for dup detection.
577 *
578 * NOTE: suppress_lock must be held.
579 **/
580 static void
log_writer_record_last_message(LogWriter * self,LogMessage * lm)581 log_writer_record_last_message(LogWriter *self, LogMessage *lm)
582 {
583 if (self->last_msg)
584 log_msg_unref(self->last_msg);
585
586 log_msg_ref(lm);
587 self->last_msg = lm;
588 self->last_msg_count = 0;
589 }
590
591 /*
592 * NOTE: suppress_lock must be held.
593 */
594 static void
log_writer_release_last_message(LogWriter * self)595 log_writer_release_last_message(LogWriter *self)
596 {
597 if (self->last_msg)
598 log_msg_unref(self->last_msg);
599
600 self->last_msg = NULL;
601 self->last_msg_count = 0;
602 }
603
604 /*
605 * NOTE: suppress_lock must be held.
606 */
607 static void
log_writer_emit_suppress_summary(LogWriter * self)608 log_writer_emit_suppress_summary(LogWriter *self)
609 {
610 LogMessage *m;
611 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
612 gchar buf[1024];
613 gssize len;
614 const gchar *p;
615
616 msg_debug("Suppress timer elapsed, emitting suppression summary");
617
618 len = g_snprintf(buf, sizeof(buf), "Last message '%.20s' repeated %d times, suppressed by syslog-ng on %s",
619 log_msg_get_value(self->last_msg, LM_V_MESSAGE, NULL),
620 self->last_msg_count,
621 get_local_hostname_fqdn());
622
623 m = log_msg_new_internal(self->last_msg->pri, buf);
624
625 p = log_msg_get_value(self->last_msg, LM_V_HOST, &len);
626 log_msg_set_value(m, LM_V_HOST, p, len);
627 p = log_msg_get_value(self->last_msg, LM_V_PROGRAM, &len);
628 log_msg_set_value(m, LM_V_PROGRAM, p, len);
629
630 path_options.ack_needed = FALSE;
631
632 log_queue_push_tail(self->queue, m, &path_options);
633 log_writer_release_last_message(self);
634 }
635
636 static gboolean
log_writer_suppress_timeout(gpointer pt)637 log_writer_suppress_timeout(gpointer pt)
638 {
639 LogWriter *self = (LogWriter *) pt;
640
641 main_loop_assert_main_thread();
642
643 /* NOTE: this will probably do nothing as we are the timer callback, but
644 * we may not do it with the suppress_lock held */
645 ml_batched_timer_cancel(&self->suppress_timer);
646 g_mutex_lock(&self->suppress_lock);
647
648 /* NOTE: we may be waken up an extra time if the suppress_timer setup race
649 * is lost, see the comment at log_writer_is_msg_suppressed() for an
650 * explanation */
651 if (self->last_msg_count > 0)
652 log_writer_emit_suppress_summary(self);
653 g_mutex_unlock(&self->suppress_lock);
654
655 return FALSE;
656 }
657
658 static gboolean
_is_message_a_mark(LogMessage * msg)659 _is_message_a_mark(LogMessage *msg)
660 {
661 gssize msg_len;
662 const gchar *value = log_msg_get_value(msg, LM_V_MESSAGE, &msg_len);
663 return strncmp(value, "-- MARK --", msg_len) == 0;
664 }
665
666 static gboolean
_is_message_a_repetition(LogMessage * msg,LogMessage * last)667 _is_message_a_repetition(LogMessage *msg, LogMessage *last)
668 {
669 return strcmp(log_msg_get_value(last, LM_V_MESSAGE, NULL), log_msg_get_value(msg, LM_V_MESSAGE, NULL)) == 0 &&
670 strcmp(log_msg_get_value(last, LM_V_HOST, NULL), log_msg_get_value(msg, LM_V_HOST, NULL)) == 0 &&
671 strcmp(log_msg_get_value(last, LM_V_PROGRAM, NULL), log_msg_get_value(msg, LM_V_PROGRAM, NULL)) == 0 &&
672 strcmp(log_msg_get_value(last, LM_V_PID, NULL), log_msg_get_value(msg, LM_V_PID, NULL)) == 0;
673 }
674
675 static gboolean
_is_time_within_the_suppress_timeout(LogWriter * self,LogMessage * msg)676 _is_time_within_the_suppress_timeout(LogWriter *self, LogMessage *msg)
677 {
678 return self->last_msg->timestamps[LM_TS_RECVD].ut_sec >= msg->timestamps[LM_TS_RECVD].ut_sec - self->options->suppress;
679 }
680
681 /**
682 * log_writer_is_msg_suppressed:
683 *
684 * This function is called to suppress duplicate messages from a given host.
685 *
686 * Locking notes:
687 *
688 * There's a strict ordering requirement between suppress_lock and
689 * interacting with the main loop (which ml_batched_timer beind
690 * suppress_timer is doing).
691 *
692 * The reason is that the main thread (running
693 * the main loop) sometimes acquires suppress_lock (at suppress timer
694 * expiration) and while blocking on suppress_lock it cannot service
695 * main_loop_calls()
696 *
697 * This function makes it sure that ml_batched_timer_update/cancel calls are
698 * only done with the suppress lock released.
699 *
700 * If we do this, we might have a few unfortunate side effects due to races
701 * that we also try to handle:
702 *
703 * Two messages race, one of these matches the recorded last message,
704 * the other doesn't. In this case, moving the update on the suppress_timer
705 * outside of the lock region might cause two different races:
706 *
707 * 1) matching message comes first, then non-matching
708 *
709 * This case the suppress_lock protected region decides that the suppress
710 * timer needs to fire (#1) and then the other decides that it needs to
711 * be cancelled. (#2)
712 *
713 * If these are processed in order, then we are the same as if the two was
714 * also protected by the mutex (which is ok)
715 *
716 * If they are reversed, e.g. we first cancels the timer and the second arms it,
717 * then we might have a timer wakeup which will find no suppressed messages
718 * to report (as the non-matching message will set last_msg_count to zero). This
719 * spurious wakeup should be handled by the expiration callback.
720 *
721 * 1) non-matching message comes first, then matching
722 *
723 * This is simply a message reordering case, e.g. we don't
724 * want any suppressions to be emitted.
725 *
726 * In this case the locked regions finds that neither messages matched
727 * the recorded one, thus both times they decide to cancel the timer, which
728 * is ok. Timer cancellation can be reordered as they will have the same
729 * effect anyway.
730 *
731 * Returns TRUE to indicate that the message is to be suppressed.
732 **/
733 static gboolean
log_writer_is_msg_suppressed(LogWriter * self,LogMessage * lm)734 log_writer_is_msg_suppressed(LogWriter *self, LogMessage *lm)
735 {
736 gboolean need_to_arm_suppress_timer;
737 gboolean need_to_cancel_suppress_timer = FALSE;
738
739 if (self->options->suppress <= 0)
740 return FALSE;
741
742 g_mutex_lock(&self->suppress_lock);
743 if (self->last_msg)
744 {
745 if (_is_time_within_the_suppress_timeout(self, lm) &&
746 _is_message_a_repetition(lm, self->last_msg) &&
747 !_is_message_a_mark(lm))
748 {
749
750 stats_counter_inc(self->suppressed_messages);
751 self->last_msg_count++;
752
753 /* we only create the timer if this is the first suppressed message, otherwise it is already running. */
754 need_to_arm_suppress_timer = self->last_msg_count == 1;
755 g_mutex_unlock(&self->suppress_lock);
756
757 /* this has to be outside of suppress_lock */
758 if (need_to_arm_suppress_timer)
759 log_writer_arm_suppress_timer(self);
760
761 msg_debug("Suppressing duplicate message",
762 evt_tag_str("host", log_msg_get_value(lm, LM_V_HOST, NULL)),
763 evt_tag_str("msg", log_msg_get_value(lm, LM_V_MESSAGE, NULL)));
764 return TRUE;
765 }
766 else
767 {
768 if (self->last_msg_count)
769 log_writer_emit_suppress_summary(self);
770 else
771 log_writer_release_last_message(self);
772 need_to_cancel_suppress_timer = TRUE;
773 }
774 }
775
776 log_writer_record_last_message(self, lm);
777 g_mutex_unlock(&self->suppress_lock);
778 if (need_to_cancel_suppress_timer)
779 ml_batched_timer_cancel(&self->suppress_timer);
780 return FALSE;
781 }
782
783 static void
log_writer_postpone_mark_timer(LogWriter * self)784 log_writer_postpone_mark_timer(LogWriter *self)
785 {
786 if (self->options->mark_freq > 0)
787 ml_batched_timer_postpone(&self->mark_timer, self->options->mark_freq);
788 }
789
790 /* this is the callback function that gets called when the MARK timeout
791 * elapsed. It runs in the main thread.
792 */
793 static void
log_writer_mark_timeout(void * cookie)794 log_writer_mark_timeout(void *cookie)
795 {
796 LogWriter *self = (LogWriter *)cookie;
797 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
798 const gchar *hostname;
799 gsize hostname_len;
800 LogMessage *msg;
801
802 main_loop_assert_main_thread();
803
804 msg = log_msg_new_mark();
805 path_options.ack_needed = FALSE;
806 /* timeout: there was no new message on the writer or it is in periodical mode */
807 hostname = resolve_sockaddr_to_hostname(&hostname_len, msg->saddr, &self->options->host_resolve_options);
808
809 log_msg_set_value(msg, LM_V_HOST, hostname, hostname_len);
810
811 if (!log_writer_is_msg_suppressed(self, msg))
812 {
813 log_queue_push_tail(self->queue, msg, &path_options);
814 stats_counter_inc(self->processed_messages);
815 }
816 else
817 {
818 log_msg_drop(msg, &path_options, AT_PROCESSED);
819 }
820
821 /* we need to issue another MARK in all mark-mode cases that already
822 * triggered this callback (dst-idle, host-idle, periodical). The
823 * original setup of the timer is at a different location:
824 * - log_writer_queue() for "*-idle" modes
825 * - log_writer_init() for periodical mode
826 */
827 log_writer_postpone_mark_timer(self);
828 }
829
830 /* NOTE: runs in the reader thread */
831 static void
log_writer_queue(LogPipe * s,LogMessage * lm,const LogPathOptions * path_options)832 log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options)
833 {
834 LogWriter *self = (LogWriter *) s;
835 LogPathOptions local_options;
836 gint mark_mode = self->options->mark_mode;
837
838 if (!path_options->flow_control_requested &&
839 ((self->proto == NULL || self->suspended) || !(self->flags & LW_SOFT_FLOW_CONTROL)))
840 {
841 /* NOTE: this code ACKs the message back if there's a write error in
842 * order not to hang the client in case of a disk full */
843
844 path_options = log_msg_break_ack(lm, path_options, &local_options);
845 }
846
847 if (log_writer_is_msg_suppressed(self, lm))
848 {
849 log_msg_drop(lm, path_options, AT_PROCESSED);
850 return;
851 }
852
853 if (mark_mode != MM_INTERNAL && (lm->flags & LF_INTERNAL) && (lm->flags & LF_MARK))
854 {
855 /* drop MARK messages generated by internal() in case our mark-mode != internal */
856 log_msg_drop(lm, path_options, AT_PROCESSED);
857 return;
858 }
859
860 if (mark_mode == MM_DST_IDLE || (mark_mode == MM_HOST_IDLE && !(lm->flags & LF_LOCAL)))
861 {
862 /* in dst-idle and host-idle most, messages postpone the MARK itself */
863 log_writer_postpone_mark_timer(self);
864 }
865
866 stats_counter_inc(self->processed_messages);
867 log_queue_push_tail(self->queue, lm, path_options);
868 }
869
870 static void
log_writer_append_value(GString * result,LogMessage * lm,NVHandle handle,gboolean use_nil,gboolean append_space)871 log_writer_append_value(GString *result, LogMessage *lm, NVHandle handle, gboolean use_nil, gboolean append_space)
872 {
873 const gchar *value;
874 gssize value_len;
875
876 value = log_msg_get_value(lm, handle, &value_len);
877 if (use_nil && value_len == 0)
878 g_string_append_c(result, '-');
879 else
880 {
881 gchar *space;
882
883 space = strchr(value, ' ');
884
885 if (!space)
886 g_string_append_len(result, value, value_len);
887 else
888 g_string_append_len(result, value, space - value);
889 }
890 if (append_space)
891 g_string_append_c(result, ' ');
892 }
893
894 static void
log_writer_do_padding(LogWriter * self,GString * result)895 log_writer_do_padding(LogWriter *self, GString *result)
896 {
897 if (!self->options->padding)
898 return;
899
900 if(G_UNLIKELY(self->options->padding < result->len))
901 {
902 msg_warning("Padding is too small to hold the full message",
903 evt_tag_int("padding", self->options->padding),
904 evt_tag_int("msg_size", result->len));
905 g_string_set_size(result, self->options->padding);
906 return;
907 }
908 /* store the original length of the result */
909 gint len = result->len;
910 gint padd_bytes = self->options->padding - result->len;
911 /* set the size to the padded size, this will allocate the string */
912 g_string_set_size(result, self->options->padding);
913 memset(result->str + len - 1, '\0', padd_bytes);
914 }
915
916 void
log_writer_format_log(LogWriter * self,LogMessage * lm,GString * result)917 log_writer_format_log(LogWriter *self, LogMessage *lm, GString *result)
918 {
919 LogTemplate *template = NULL;
920 UnixTime *stamp;
921 guint32 seq_num;
922 static NVHandle meta_seqid = 0;
923
924 if (!meta_seqid)
925 meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
926
927 if (lm->flags & LF_LOCAL)
928 {
929 seq_num = self->seq_num;
930 }
931 else
932 {
933 const gchar *seqid;
934 gssize seqid_length;
935
936 seqid = log_msg_get_value(lm, meta_seqid, &seqid_length);
937 APPEND_ZERO(seqid, seqid, seqid_length);
938 if (seqid[0])
939 seq_num = strtol(seqid, NULL, 10);
940 else
941 seq_num = 0;
942 }
943
944 /* no template was specified, use default */
945 stamp = &lm->timestamps[LM_TS_STAMP];
946
947 g_string_truncate(result, 0);
948
949 if ((self->flags & LW_SYSLOG_PROTOCOL) || (self->options->options & LWO_SYSLOG_PROTOCOL))
950 {
951 gssize len;
952
953 /* we currently hard-wire version 1 */
954 g_string_append_c(result, '<');
955 format_uint32_padded(result, 0, 0, 10, lm->pri);
956 g_string_append_c(result, '>');
957 g_string_append_c(result, '1');
958 g_string_append_c(result, ' ');
959
960 append_format_unix_time(stamp, result, TS_FMT_ISO,
961 time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
962 self->options->template_options.frac_digits);
963 g_string_append_c(result, ' ');
964
965 log_writer_append_value(result, lm, LM_V_HOST, TRUE, TRUE);
966 log_writer_append_value(result, lm, LM_V_PROGRAM, TRUE, TRUE);
967 log_writer_append_value(result, lm, LM_V_PID, TRUE, TRUE);
968 log_writer_append_value(result, lm, LM_V_MSGID, TRUE, TRUE);
969
970 len = result->len;
971 log_msg_append_format_sdata(lm, result, seq_num);
972 if (len == result->len)
973 {
974 /* NOTE: sd_param format did not generate any output, take it as an empty SD string */
975 g_string_append_c(result, '-');
976 }
977
978 if (self->options->template)
979 {
980 g_string_append_c(result, ' ');
981 if (lm->flags & LF_UTF8)
982 g_string_append_len(result, "\xEF\xBB\xBF", 3);
983 LogTemplateEvalOptions options =
984 {
985 &self->options->template_options,
986 LTZ_SEND,
987 seq_num, NULL,
988 };
989
990 log_template_append_format(self->options->template, lm,
991 &options,
992 result);
993 }
994 else
995 {
996 const gchar *p;
997
998 p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
999 g_string_append_c(result, ' ');
1000 if (len != 0)
1001 {
1002 if (lm->flags & LF_UTF8)
1003 g_string_append_len(result, "\xEF\xBB\xBF", 3);
1004
1005 g_string_append_len(result, p, len);
1006 }
1007 }
1008 g_string_append_c(result, '\n');
1009 log_writer_do_padding(self, result);
1010 }
1011 else
1012 {
1013
1014 if (self->options->template)
1015 {
1016 template = self->options->template;
1017 }
1018 else if (self->flags & LW_FORMAT_FILE)
1019 {
1020 template = self->options->file_template;
1021 }
1022 else if ((self->flags & LW_FORMAT_PROTO))
1023 {
1024 template = self->options->proto_template;
1025 }
1026
1027 if (template)
1028 {
1029 LogTemplateEvalOptions options =
1030 {
1031 &self->options->template_options,
1032 LTZ_SEND,
1033 seq_num, NULL
1034 };
1035
1036 log_template_format(template, lm, &options, result);
1037
1038 }
1039 else
1040 {
1041 const gchar *p;
1042 gssize len;
1043
1044 if (self->flags & LW_FORMAT_FILE)
1045 {
1046 format_unix_time(stamp, result, self->options->template_options.ts_format,
1047 time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
1048 self->options->template_options.frac_digits);
1049 }
1050 else if (self->flags & LW_FORMAT_PROTO)
1051 {
1052 g_string_append_c(result, '<');
1053 format_uint32_padded(result, 0, 0, 10, lm->pri);
1054 g_string_append_c(result, '>');
1055
1056 /* always use BSD timestamp by default, the use can override this using a custom template */
1057 append_format_unix_time(stamp, result, TS_FMT_BSD,
1058 time_zone_info_get_offset(self->options->template_options.time_zone_info[LTZ_SEND], stamp->ut_sec),
1059 self->options->template_options.frac_digits);
1060 }
1061 g_string_append_c(result, ' ');
1062
1063 p = log_msg_get_value(lm, LM_V_HOST, &len);
1064 g_string_append_len(result, p, len);
1065 g_string_append_c(result, ' ');
1066
1067 p = log_msg_get_value(lm, LM_V_LEGACY_MSGHDR, &len);
1068 if (len > 0)
1069 {
1070 g_string_append_len(result, p, len);
1071 }
1072 else
1073 {
1074 p = log_msg_get_value(lm, LM_V_PROGRAM, &len);
1075 if (len > 0)
1076 {
1077 g_string_append_len(result, p, len);
1078 p = log_msg_get_value(lm, LM_V_PID, &len);
1079 if (len > 0)
1080 {
1081 g_string_append_c(result, '[');
1082 g_string_append_len(result, p, len);
1083 g_string_append_c(result, ']');
1084 }
1085 g_string_append_len(result, ": ", 2);
1086 }
1087 }
1088 p = log_msg_get_value(lm, LM_V_MESSAGE, &len);
1089 g_string_append_len(result, p, len);
1090 g_string_append_c(result, '\n');
1091 log_writer_do_padding(self, result);
1092 }
1093 }
1094 if (self->options->options & LWO_NO_MULTI_LINE)
1095 {
1096 gchar *p;
1097
1098 p = result->str;
1099 /* NOTE: the size is calculated to leave trailing new line */
1100 while ((p = find_cr_or_lf(p, result->str + result->len - p - 1)))
1101 {
1102 *p = ' ';
1103 p++;
1104 }
1105
1106 }
1107
1108 if (self->options->truncate_size != -1 && result->len > self->options->truncate_size)
1109 {
1110 const gint truncated_bytes = result->len - self->options->truncate_size;
1111
1112 g_string_truncate(result, self->options->truncate_size);
1113
1114 stats_counter_inc(self->truncated.count);
1115 stats_counter_add(self->truncated.bytes, truncated_bytes);
1116 }
1117 }
1118
1119 static void
log_writer_broken(LogWriter * self,gint notify_code)1120 log_writer_broken(LogWriter *self, gint notify_code)
1121 {
1122 log_writer_stop_watches(self);
1123 log_pipe_notify(self->control, notify_code, self);
1124 }
1125
1126 static void
log_writer_realloc_line_buffer(LogWriter * self)1127 log_writer_realloc_line_buffer(LogWriter *self)
1128 {
1129 self->line_buffer->str = g_malloc(self->line_buffer->allocated_len);
1130 self->line_buffer->str[0] = 0;
1131 self->line_buffer->len = 0;
1132 }
1133
1134 /*
1135 * Write messages to the underlying file descriptor using the installed
1136 * LogProtoClient instance. This is called whenever the output is ready to accept
1137 * further messages, and once during config deinitialization, in order to
1138 * flush messages still in the queue, in the hope that most of them can be
1139 * written out.
1140 *
1141 * In threaded mode, this function is invoked as part of the "output" task
1142 * (in essence, this is the function that performs the output task).
1143 *
1144 */
1145
1146 static gboolean
log_writer_flush_finalize(LogWriter * self)1147 log_writer_flush_finalize(LogWriter *self)
1148 {
1149 LogProtoStatus status = log_proto_client_flush(self->proto);
1150
1151 if (status == LPS_SUCCESS || status == LPS_PARTIAL)
1152 return TRUE;
1153
1154
1155 return FALSE;
1156 }
1157
1158 static void
_log_writer_insert_msg_length_stats(LogWriter * self,gsize msg_len)1159 _log_writer_insert_msg_length_stats(LogWriter *self, gsize msg_len)
1160 {
1161 stats_aggregator_insert_data(self->max_message_size, msg_len);
1162 stats_aggregator_insert_data(self->average_messages_size, msg_len);
1163 }
1164
1165 static gboolean
log_writer_write_message(LogWriter * self,LogMessage * msg,LogPathOptions * path_options,gboolean * write_error)1166 log_writer_write_message(LogWriter *self, LogMessage *msg, LogPathOptions *path_options, gboolean *write_error)
1167 {
1168 gboolean consumed = FALSE;
1169
1170 *write_error = FALSE;
1171
1172 log_msg_refcache_start_consumer(msg, path_options);
1173 msg_set_context(msg);
1174
1175 log_writer_format_log(self, msg, self->line_buffer);
1176
1177 if (!(msg->flags & LF_INTERNAL))
1178 {
1179 msg_debug("Outgoing message",
1180 evt_tag_printf("message", "%s", self->line_buffer->str));
1181 }
1182
1183 gsize msg_len = 0;
1184 if (self->line_buffer->len)
1185 {
1186 msg_len = self->line_buffer->len;
1187 LogProtoStatus status = log_proto_client_post(self->proto, msg, (guchar *)self->line_buffer->str,
1188 self->line_buffer->len,
1189 &consumed);
1190
1191 self->partial_write = (status == LPS_PARTIAL);
1192
1193 if (consumed)
1194 log_writer_realloc_line_buffer(self);
1195
1196 if (status == LPS_ERROR)
1197 {
1198 if ((self->options->options & LWO_IGNORE_ERRORS) != 0)
1199 {
1200 if (!consumed)
1201 {
1202 g_free(self->line_buffer->str);
1203 log_writer_realloc_line_buffer(self);
1204 consumed = TRUE;
1205 }
1206 }
1207 else
1208 {
1209 *write_error = TRUE;
1210 consumed = FALSE;
1211 }
1212 }
1213 }
1214 else
1215 {
1216 msg_debug("Error posting log message as template() output resulted in an empty string, skipping message");
1217 consumed = TRUE;
1218 }
1219
1220 if (consumed)
1221 {
1222 if (msg->flags & LF_LOCAL)
1223 step_sequence_number(&self->seq_num);
1224
1225 log_msg_unref(msg);
1226 msg_set_context(NULL);
1227 log_msg_refcache_stop();
1228 _log_writer_insert_msg_length_stats(self, msg_len);
1229
1230 return TRUE;
1231 }
1232 else
1233 {
1234 msg_debug("Can't send the message rewind backlog",
1235 evt_tag_printf("message", "%s", self->line_buffer->str));
1236
1237 log_queue_rewind_backlog(self->queue, 1);
1238
1239 log_msg_unref(msg);
1240 msg_set_context(NULL);
1241 log_msg_refcache_stop();
1242
1243 return FALSE;
1244 }
1245 }
1246
1247 static inline LogMessage *
log_writer_queue_pop_message(LogWriter * self,LogPathOptions * path_options,gboolean force_flush)1248 log_writer_queue_pop_message(LogWriter *self, LogPathOptions *path_options, gboolean force_flush)
1249 {
1250 if (force_flush)
1251 return log_queue_pop_head_ignore_throttle(self->queue, path_options);
1252 else
1253 return log_queue_pop_head(self->queue, path_options);
1254 }
1255
1256 static inline gboolean
log_writer_process_handshake(LogWriter * self)1257 log_writer_process_handshake(LogWriter *self)
1258 {
1259 LogProtoStatus status = log_proto_client_handshake(self->proto);
1260
1261 if (status != LPS_SUCCESS)
1262 return FALSE;
1263
1264 return TRUE;
1265 }
1266
1267 /*
1268 * @flush_mode specifies how hard LogWriter is trying to send messages to
1269 * the actual destination:
1270 *
1271 *
1272 * LW_FLUSH_NORMAL - business as usual, flush when the buffer is full
1273 * LW_FLUSH_FORCE - flush the buffer immediately please
1274 *
1275 */
1276 static gboolean
log_writer_flush(LogWriter * self,LogWriterFlushMode flush_mode)1277 log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
1278 {
1279 gboolean write_error = FALSE;
1280
1281 if (!self->proto)
1282 return FALSE;
1283
1284 if (log_proto_client_handshake_in_progress(self->proto))
1285 {
1286 return log_writer_process_handshake(self);
1287 }
1288
1289 /* NOTE: in case we're reloading or exiting we flush all queued items as
1290 * long as the destination can consume it. This is not going to be an
1291 * infinite loop, since the reader will cease to produce new messages when
1292 * main_loop_io_worker_job_quit() is set. */
1293
1294 while ((!main_loop_worker_job_quit() || flush_mode == LW_FLUSH_FORCE) && !write_error)
1295 {
1296 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
1297 LogMessage *msg = log_writer_queue_pop_message(self, &path_options, flush_mode == LW_FLUSH_FORCE);
1298
1299 if (!msg)
1300 break;
1301
1302 ScratchBuffersMarker mark;
1303 scratch_buffers_mark(&mark);
1304 if (!log_writer_write_message(self, msg, &path_options, &write_error))
1305 {
1306 scratch_buffers_reclaim_marked(mark);
1307 break;
1308 }
1309 scratch_buffers_reclaim_marked(mark);
1310
1311 if (!write_error)
1312 stats_counter_inc(self->written_messages);
1313 }
1314
1315 if (write_error)
1316 return FALSE;
1317
1318 return log_writer_flush_finalize(self);
1319 }
1320
1321 static gboolean
log_writer_forced_flush(LogWriter * self)1322 log_writer_forced_flush(LogWriter *self)
1323 {
1324 return log_writer_flush(self, LW_FLUSH_FORCE);
1325 }
1326
1327 static gboolean
log_writer_process_in(LogWriter * self)1328 log_writer_process_in(LogWriter *self)
1329 {
1330 if (!self->proto)
1331 return FALSE;
1332
1333 return (log_proto_client_process_in(self->proto) == LPS_SUCCESS);
1334 }
1335
1336 static gboolean
log_writer_process_out(LogWriter * self)1337 log_writer_process_out(LogWriter *self)
1338 {
1339 return log_writer_flush(self, LW_FLUSH_NORMAL);
1340 }
1341
1342 static void
log_writer_reopen_timeout(void * cookie)1343 log_writer_reopen_timeout(void *cookie)
1344 {
1345 LogWriter *self = (LogWriter *)cookie;
1346
1347 log_pipe_notify(self->control, NC_REOPEN_REQUIRED, self);
1348 }
1349
1350 static void
log_writer_idle_timeout(void * cookie)1351 log_writer_idle_timeout(void *cookie)
1352 {
1353 LogWriter *self = (LogWriter *) cookie;
1354
1355 g_assert(!self->io_job.working);
1356 msg_verbose("Destination timeout has elapsed, closing connection",
1357 evt_tag_int("fd", log_proto_client_get_fd(self->proto)));
1358
1359 log_pipe_notify(self->control, NC_CLOSE, self);
1360 }
1361
1362 static void
log_writer_init_watches(LogWriter * self)1363 log_writer_init_watches(LogWriter *self)
1364 {
1365 IV_FD_INIT(&self->fd_watch);
1366 self->fd_watch.cookie = self;
1367
1368 IV_TASK_INIT(&self->immed_io_task);
1369 self->immed_io_task.cookie = self;
1370 self->immed_io_task.handler = log_writer_io_handle_out;
1371
1372 IV_TIMER_INIT(&self->suspend_timer);
1373 self->suspend_timer.cookie = self;
1374
1375 ml_batched_timer_init(&self->suppress_timer);
1376 self->suppress_timer.cookie = self;
1377 self->suppress_timer.handler = (void (*)(void *)) log_writer_suppress_timeout;
1378 self->suppress_timer.ref_cookie = (gpointer (*)(gpointer)) log_pipe_ref;
1379 self->suppress_timer.unref_cookie = (void (*)(gpointer)) log_pipe_unref;
1380
1381 ml_batched_timer_init(&self->mark_timer);
1382 self->mark_timer.cookie = self;
1383 self->mark_timer.handler = log_writer_mark_timeout;
1384 self->mark_timer.ref_cookie = (gpointer (*)(gpointer)) log_pipe_ref;
1385 self->mark_timer.unref_cookie = (void (*)(gpointer)) log_pipe_unref;
1386
1387 IV_TIMER_INIT(&self->reopen_timer);
1388 self->reopen_timer.cookie = self;
1389 self->reopen_timer.handler = log_writer_reopen_timeout;
1390
1391 IV_TIMER_INIT(&self->idle_timer);
1392 self->idle_timer.cookie = self;
1393 self->idle_timer.handler = log_writer_idle_timeout;
1394
1395 IV_EVENT_INIT(&self->queue_filled);
1396 self->queue_filled.cookie = self;
1397 self->queue_filled.handler = log_writer_queue_filled;
1398
1399 main_loop_io_worker_job_init(&self->io_job);
1400 self->io_job.user_data = self;
1401 self->io_job.work = (void (*)(void *, GIOCondition)) log_writer_work_perform;
1402 self->io_job.completion = (void (*)(void *)) log_writer_work_finished;
1403 self->io_job.engage = (void (*)(void *)) log_pipe_ref;
1404 self->io_job.release = (void (*)(void *)) log_pipe_unref;
1405 }
1406
1407 static void
_register_aggregated_stats(LogWriter * self,StatsClusterKey * sc_key_input,gint stats_type)1408 _register_aggregated_stats(LogWriter *self, StatsClusterKey *sc_key_input, gint stats_type)
1409 {
1410 stats_aggregator_lock();
1411 StatsClusterKey sc_key;
1412
1413 stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1414 self->stats_instance, "msg_size_max");
1415 stats_register_aggregator_maximum(self->options->stats_level, &sc_key, &self->max_message_size);
1416
1417 stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1418 self->stats_instance, "msg_size_avg");
1419 stats_register_aggregator_average(self->options->stats_level, &sc_key, &self->average_messages_size);
1420
1421 stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1422 self->stats_instance, "eps");
1423 stats_register_aggregator_cps(self->options->stats_level, &sc_key, sc_key_input, stats_type, &self->CPS);
1424
1425 stats_aggregator_unlock();
1426 }
1427
1428 static void
_unregister_aggregated_stats(LogWriter * self)1429 _unregister_aggregated_stats(LogWriter *self)
1430 {
1431 stats_aggregator_lock();
1432
1433 stats_unregister_aggregator_maximum(&self->max_message_size);
1434 stats_unregister_aggregator_average(&self->average_messages_size);
1435 stats_unregister_aggregator_cps(&self->CPS);
1436
1437 stats_aggregator_unlock();
1438 }
1439
1440 static void
_register_counters(LogWriter * self)1441 _register_counters(LogWriter *self)
1442 {
1443 stats_lock();
1444 StatsClusterKey sc_key;
1445 stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1446 self->stats_instance);
1447
1448 if (self->options->suppress > 0)
1449 stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1450 stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1451 stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1452 stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1453 log_queue_register_stats_counters(self->queue, self->options->stats_level, &sc_key);
1454
1455 StatsClusterKey sc_key_truncated_count;
1456 stats_cluster_single_key_set_with_name(&sc_key_truncated_count, self->options->stats_source | SCS_DESTINATION,
1457 self->stats_id, self->stats_instance, "truncated_count");
1458 stats_register_counter(self->options->stats_level, &sc_key_truncated_count, SC_TYPE_SINGLE_VALUE,
1459 &self->truncated.count);
1460
1461 StatsClusterKey sc_key_truncated_bytes;
1462 stats_cluster_single_key_set_with_name(&sc_key_truncated_bytes, self->options->stats_source | SCS_DESTINATION,
1463 self->stats_id, self->stats_instance, "truncated_bytes");
1464 stats_register_counter(self->options->stats_level, &sc_key_truncated_bytes, SC_TYPE_SINGLE_VALUE,
1465 &self->truncated.bytes);
1466
1467 stats_unlock();
1468 _register_aggregated_stats(self, &sc_key, SC_TYPE_WRITTEN);
1469 }
1470
1471 static gboolean
log_writer_init(LogPipe * s)1472 log_writer_init(LogPipe *s)
1473 {
1474 LogWriter *self = (LogWriter *) s;
1475
1476 if (self->queue == NULL)
1477 {
1478 return FALSE;
1479 }
1480 iv_event_register(&self->queue_filled);
1481
1482 if ((self->options->options & LWO_NO_STATS) == 0 && !self->dropped_messages)
1483 _register_counters(self);
1484
1485 if (self->proto)
1486 {
1487 LogProtoClient *proto;
1488
1489 proto = self->proto;
1490 log_writer_set_proto(self, NULL);
1491 log_writer_reopen(self, proto);
1492 }
1493
1494 if (self->options->mark_mode == MM_PERIODICAL)
1495 {
1496 /* periodical marks should be emitted even if no message is received,
1497 * so we need a timer right from the start */
1498
1499 log_writer_postpone_mark_timer(self);
1500 }
1501
1502 return TRUE;
1503 }
1504
1505 static void
_unregister_counters(LogWriter * self)1506 _unregister_counters(LogWriter *self)
1507 {
1508 stats_lock();
1509 {
1510 StatsClusterKey sc_key;
1511 stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_DESTINATION, self->stats_id,
1512 self->stats_instance);
1513
1514 stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1515 stats_unregister_counter(&sc_key, SC_TYPE_SUPPRESSED, &self->suppressed_messages);
1516 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1517 stats_unregister_counter(&sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1518
1519 StatsClusterKey sc_key_truncated_count;
1520 stats_cluster_single_key_set_with_name(&sc_key_truncated_count, self->options->stats_source | SCS_DESTINATION,
1521 self->stats_id, self->stats_instance, "truncated_count");
1522 stats_unregister_counter(&sc_key_truncated_count, SC_TYPE_SINGLE_VALUE, &self->truncated.count);
1523
1524 StatsClusterKey sc_key_truncated_bytes;
1525 stats_cluster_single_key_set_with_name(&sc_key_truncated_bytes, self->options->stats_source | SCS_DESTINATION,
1526 self->stats_id, self->stats_instance, "truncated_bytes");
1527 stats_unregister_counter(&sc_key_truncated_bytes, SC_TYPE_SINGLE_VALUE, &self->truncated.bytes);
1528
1529 log_queue_unregister_stats_counters(self->queue, &sc_key);
1530
1531 }
1532 stats_unlock();
1533 _unregister_aggregated_stats(self);
1534
1535 }
1536
1537 static gboolean
log_writer_deinit(LogPipe * s)1538 log_writer_deinit(LogPipe *s)
1539 {
1540 LogWriter *self = (LogWriter *) s;
1541
1542 main_loop_assert_main_thread();
1543
1544 log_queue_reset_parallel_push(self->queue);
1545 log_writer_forced_flush(self);
1546 /* FIXME: by the time we arrive here, it must be guaranteed that no
1547 * _queue() call is running in a different thread, otherwise we'd need
1548 * some kind of locking. */
1549
1550 log_writer_stop_watches(self);
1551
1552 iv_event_unregister(&self->queue_filled);
1553
1554 if (iv_timer_registered(&self->reopen_timer))
1555 iv_timer_unregister(&self->reopen_timer);
1556
1557 ml_batched_timer_unregister(&self->suppress_timer);
1558 ml_batched_timer_unregister(&self->mark_timer);
1559
1560 _unregister_counters(self);
1561
1562 return TRUE;
1563 }
1564
1565 static void
log_writer_free(LogPipe * s)1566 log_writer_free(LogPipe *s)
1567 {
1568 LogWriter *self = (LogWriter *) s;
1569
1570 log_writer_free_proto(self);
1571
1572 if (self->line_buffer)
1573 g_string_free(self->line_buffer, TRUE);
1574
1575 log_queue_unref(self->queue);
1576 if (self->last_msg)
1577 log_msg_unref(self->last_msg);
1578 g_free(self->stats_id);
1579 g_free(self->stats_instance);
1580 ml_batched_timer_free(&self->mark_timer);
1581 ml_batched_timer_free(&self->suppress_timer);
1582 g_mutex_clear(&self->suppress_lock);
1583 g_mutex_clear(&self->pending_proto_lock);
1584 g_cond_clear(&self->pending_proto_cond);
1585
1586 log_pipe_free_method(s);
1587 }
1588
1589 /* FIXME: this is inherently racy */
1590 gboolean
log_writer_has_pending_writes(LogWriter * self)1591 log_writer_has_pending_writes(LogWriter *self)
1592 {
1593 return !log_queue_is_empty_racy(self->queue) || !self->watches_running;
1594 }
1595
1596 gboolean
log_writer_opened(LogWriter * self)1597 log_writer_opened(LogWriter *self)
1598 {
1599 return self->proto != NULL;
1600 }
1601
1602
1603 static void
log_writer_free_proto(LogWriter * self)1604 log_writer_free_proto(LogWriter *self)
1605 {
1606 if (self->proto)
1607 log_proto_client_free(self->proto);
1608
1609 self->proto = NULL;
1610 }
1611
1612 static void
log_writer_set_proto(LogWriter * self,LogProtoClient * proto)1613 log_writer_set_proto(LogWriter *self, LogProtoClient *proto)
1614 {
1615 self->proto = proto;
1616
1617 if (proto)
1618 {
1619 LogProtoClientFlowControlFuncs flow_control_funcs;
1620 flow_control_funcs.ack_callback = log_writer_msg_ack;
1621 flow_control_funcs.rewind_callback = log_writer_msg_rewind_cb;
1622 flow_control_funcs.user_data = self;
1623
1624 log_proto_client_set_client_flow_control(self->proto, &flow_control_funcs);
1625 log_proto_client_set_options(self->proto, &self->options->proto_options.super);
1626 }
1627 }
1628
1629 static void
log_writer_set_pending_proto(LogWriter * self,LogProtoClient * proto,gboolean present)1630 log_writer_set_pending_proto(LogWriter *self, LogProtoClient *proto, gboolean present)
1631 {
1632 self->pending_proto = proto;
1633 self->pending_proto_present = present;
1634 }
1635
1636 /* run in the main thread in reaction to a log_writer_reopen to change
1637 * the destination LogProtoClient instance. It needs to be ran in the main
1638 * thread as it reregisters the watches associated with the main
1639 * thread. */
1640 static void
log_writer_reopen_deferred(gpointer s)1641 log_writer_reopen_deferred(gpointer s)
1642 {
1643 gpointer *args = (gpointer *) s;
1644 LogWriter *self = args[0];
1645 LogProtoClient *proto = args[1];
1646
1647 if (!proto)
1648 {
1649 iv_validate_now();
1650
1651 self->reopen_timer.expires = iv_now;
1652 self->reopen_timer.expires.tv_sec += self->options->time_reopen;
1653
1654 if (iv_timer_registered(&self->reopen_timer))
1655 iv_timer_unregister(&self->reopen_timer);
1656
1657 iv_timer_register(&self->reopen_timer);
1658 }
1659
1660 init_sequence_number(&self->seq_num);
1661
1662 if (self->io_job.working)
1663 {
1664 /* NOTE: proto can be NULL but it is present... */
1665 log_writer_set_pending_proto(self, proto, TRUE);
1666 return;
1667 }
1668
1669 log_writer_stop_watches(self);
1670
1671 if (self->partial_write)
1672 {
1673 log_queue_rewind_backlog_all(self->queue);
1674 }
1675 log_writer_free_proto(self);
1676 log_writer_set_proto(self, proto);
1677
1678 if (proto)
1679 log_writer_start_watches(self);
1680 }
1681
1682 /*
1683 * This function can be called from any threads, from the main thread
1684 * as well as I/O worker threads. It takes care about going to the
1685 * main thread to actually switch LogProtoClient under this writer.
1686 *
1687 * The writer may still be operating, (e.g. log_pipe_deinit/init is
1688 * not needed).
1689 *
1690 * In case we're running in a non-main thread, then by the time this
1691 * function returns, the reopen has finished. In case it is called
1692 * from the main thread, this function may defer updating self->proto
1693 * until the worker thread has finished. The reason for this
1694 * difference is:
1695 *
1696 * - if LogWriter is busy, then updating the LogProtoClient instance is
1697 * deferred to log_writer_work_finished(), but that runs in the
1698 * main thread.
1699 *
1700 * - normally, even this deferred update is waited for, but in case
1701 * we're in the main thread, we can't block.
1702 *
1703 * This situation could probably be improved, maybe the synchonous
1704 * return of log_writer_reopen() is not needed by call sites, but I
1705 * was not sure, and right before release I didn't want to take the
1706 * risky approach.
1707 */
1708 void
log_writer_reopen(LogWriter * s,LogProtoClient * proto)1709 log_writer_reopen(LogWriter *s, LogProtoClient *proto)
1710 {
1711 LogWriter *self = (LogWriter *) s;
1712 gpointer args[] = { s, proto };
1713
1714 main_loop_call((MainLoopTaskFunc) log_writer_reopen_deferred, args, TRUE);
1715
1716 if (!main_loop_is_main_thread())
1717 {
1718 g_mutex_lock(&self->pending_proto_lock);
1719 while (self->pending_proto_present)
1720 {
1721 g_cond_wait(&self->pending_proto_cond, &self->pending_proto_lock);
1722 }
1723 g_mutex_unlock(&self->pending_proto_lock);
1724 }
1725 }
1726
1727 void
log_writer_set_options(LogWriter * self,LogPipe * control,LogWriterOptions * options,const gchar * stats_id,const gchar * stats_instance)1728 log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options,
1729 const gchar *stats_id, const gchar *stats_instance)
1730 {
1731 self->control = control;
1732 self->options = options;
1733
1734 if (control)
1735 self->super.expr_node = control->expr_node;
1736
1737 if (self->stats_id)
1738 g_free(self->stats_id);
1739 self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
1740
1741 if (self->stats_instance)
1742 g_free(self->stats_instance);
1743 self->stats_instance = stats_instance ? g_strdup(stats_instance) : NULL;
1744 }
1745
1746 LogWriter *
log_writer_new(guint32 flags,GlobalConfig * cfg)1747 log_writer_new(guint32 flags, GlobalConfig *cfg)
1748 {
1749 LogWriter *self = g_new0(LogWriter, 1);
1750
1751 log_pipe_init_instance(&self->super, cfg);
1752 self->super.init = log_writer_init;
1753 self->super.deinit = log_writer_deinit;
1754 self->super.queue = log_writer_queue;
1755 self->super.free_fn = log_writer_free;
1756 self->flags = flags;
1757 self->line_buffer = g_string_sized_new(128);
1758 self->pollable_state = -1;
1759 init_sequence_number(&self->seq_num);
1760
1761 log_writer_init_watches(self);
1762 g_mutex_init(&self->suppress_lock);
1763 g_mutex_init(&self->pending_proto_lock);
1764 g_cond_init(&self->pending_proto_cond);
1765
1766 log_pipe_add_info(&self->super, "writer");
1767
1768 return self;
1769 }
1770
1771 void
log_writer_options_defaults(LogWriterOptions * options)1772 log_writer_options_defaults(LogWriterOptions *options)
1773 {
1774 options->template = NULL;
1775 options->flush_lines = -1;
1776 log_template_options_defaults(&options->template_options);
1777 options->time_reopen = -1;
1778 options->suppress = -1;
1779 options->padding = 0;
1780 options->mark_mode = MM_GLOBAL;
1781 options->mark_freq = -1;
1782 options->truncate_size = -1;
1783 host_resolve_options_defaults(&options->host_resolve_options);
1784 }
1785
1786 void
log_writer_options_set_template_escape(LogWriterOptions * options,gboolean enable)1787 log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable)
1788 {
1789 if (options->template && options->template->def_inline)
1790 {
1791 log_template_set_escape(options->template, enable);
1792 }
1793 else
1794 {
1795 msg_error("Macro escaping can only be specified for inline templates");
1796 }
1797 }
1798
1799 void
log_writer_options_set_mark_mode(LogWriterOptions * options,const gchar * mark_mode)1800 log_writer_options_set_mark_mode(LogWriterOptions *options, const gchar *mark_mode)
1801 {
1802 options->mark_mode = cfg_lookup_mark_mode(mark_mode);
1803 }
1804
1805 /*
1806 * NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy
1807 *
1808 * Rationale:
1809 * - init is called from driver init (e.g. affile_sd_init),
1810 * - destroy is called from driver free method (e.g. affile_sd_free, NOT affile_sd_deinit)
1811 *
1812 * The reason:
1813 * - when initializing the reloaded configuration fails for some reason,
1814 * we have to fall back to the old configuration, thus we cannot dump
1815 * the information stored in the Options structure at deinit time, but
1816 * have to recover it when the old configuration is initialized.
1817 *
1818 * For the reasons above, init and destroy behave the following way:
1819 *
1820 * - init is idempotent, it can be called multiple times without leaking
1821 * memory, and without loss of information
1822 * - destroy is only called once, when the options are indeed to be destroyed
1823 *
1824 * Also important to note is that when init is called multiple times, the
1825 * GlobalConfig reference is the same, this means that it is enough to
1826 * remember whether init was called already and return w/o doing anything in
1827 * that case, which is actually how idempotency is implemented here.
1828 */
1829 void
log_writer_options_init(LogWriterOptions * options,GlobalConfig * cfg,guint32 option_flags)1830 log_writer_options_init(LogWriterOptions *options, GlobalConfig *cfg, guint32 option_flags)
1831 {
1832 if (options->initialized)
1833 return;
1834
1835 log_template_options_init(&options->template_options, cfg);
1836 host_resolve_options_init(&options->host_resolve_options, &cfg->host_resolve_options);
1837 log_proto_client_options_init(&options->proto_options.super, cfg);
1838 options->options |= option_flags;
1839
1840 if (options->flush_lines == -1)
1841 options->flush_lines = cfg->flush_lines;
1842 if (options->suppress == -1)
1843 options->suppress = cfg->suppress;
1844 if (options->time_reopen == -1)
1845 options->time_reopen = cfg->time_reopen;
1846 options->file_template = log_template_ref(cfg->file_template);
1847 options->proto_template = log_template_ref(cfg->proto_template);
1848 if (cfg->threaded)
1849 options->options |= LWO_THREADED;
1850 /* per-destination MARK messages */
1851 if (options->mark_mode == MM_GLOBAL)
1852 {
1853 /* get the global option */
1854 options->mark_mode = cfg->mark_mode;
1855 }
1856 if (options->mark_freq == -1)
1857 {
1858 /* not initialized, use the global mark freq */
1859 options->mark_freq = cfg->mark_freq;
1860 }
1861
1862 options->initialized = TRUE;
1863 }
1864
1865 void
log_writer_options_destroy(LogWriterOptions * options)1866 log_writer_options_destroy(LogWriterOptions *options)
1867 {
1868 log_template_options_destroy(&options->template_options);
1869 host_resolve_options_destroy(&options->host_resolve_options);
1870 log_proto_client_options_destroy(&options->proto_options.super);
1871 log_template_unref(options->template);
1872 log_template_unref(options->file_template);
1873 log_template_unref(options->proto_template);
1874 options->initialized = FALSE;
1875 }
1876
1877 gint
log_writer_options_lookup_flag(const gchar * flag)1878 log_writer_options_lookup_flag(const gchar *flag)
1879 {
1880 if (strcmp(flag, "syslog-protocol") == 0)
1881 return LWO_SYSLOG_PROTOCOL;
1882 if (strcmp(flag, "no-multi-line") == 0)
1883 return LWO_NO_MULTI_LINE;
1884 if (strcmp(flag, "threaded") == 0)
1885 return LWO_THREADED;
1886 if (strcmp(flag, "ignore-errors") == 0)
1887 return LWO_IGNORE_ERRORS;
1888 msg_error("Unknown dest writer flag", evt_tag_str("flag", flag));
1889 return 0;
1890 }
1891