1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logsource.h"
26 #include "messages.h"
27 #include "host-resolve.h"
28 #include "stats/stats-registry.h"
29 #include "stats/stats-cluster-single.h"
30 #include "msg-stats.h"
31 #include "logmsg/tags.h"
32 #include "ack-tracker/ack_tracker.h"
33 #include "ack-tracker/ack_tracker_factory.h"
34 #include "timeutils/misc.h"
35 #include "scratch-buffers.h"
36
37 #include <string.h>
38 #include <unistd.h>
39
40 gboolean accurate_nanosleep = FALSE;
41
42 void
log_source_wakeup(LogSource * self)43 log_source_wakeup(LogSource *self)
44 {
45 if (self->wakeup)
46 self->wakeup(self);
47
48 msg_diagnostics("Source has been resumed", log_pipe_location_tag(&self->super));
49 }
50
51 static inline guint32
_take_reclaimed_window(LogSource * self,guint32 window_size_increment)52 _take_reclaimed_window(LogSource *self, guint32 window_size_increment)
53 {
54 gssize old = atomic_gssize_sub(&self->window_size_to_be_reclaimed, window_size_increment);
55 gboolean reclaim_in_progress = (old > 0);
56
57 if (!reclaim_in_progress)
58 {
59 return window_size_increment;
60 }
61
62 guint32 remaining_window_size_increment = MAX(window_size_increment - old, 0);
63 guint32 reclaimed = window_size_increment - remaining_window_size_increment;
64 atomic_gssize_add(&self->pending_reclaimed, reclaimed);
65
66 return remaining_window_size_increment;
67 }
68
69 static inline void
_flow_control_window_size_adjust(LogSource * self,guint32 window_size_increment,gboolean last_ack_type_is_suspended)70 _flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment, gboolean last_ack_type_is_suspended)
71 {
72 gboolean suspended;
73
74 if (G_UNLIKELY(dynamic_window_is_enabled(&self->dynamic_window)))
75 window_size_increment = _take_reclaimed_window(self, window_size_increment);
76
77 gsize old_window_size = window_size_counter_add(&self->window_size, window_size_increment, &suspended);
78 stats_counter_add(self->stat_window_size, window_size_increment);
79
80 msg_diagnostics("Window size adjustment",
81 evt_tag_int("old_window_size", old_window_size),
82 evt_tag_int("window_size_increment", window_size_increment),
83 evt_tag_str("suspended_before_increment", suspended ? "TRUE" : "FALSE"),
84 evt_tag_str("last_ack_type_is_suspended", last_ack_type_is_suspended ? "TRUE" : "FALSE"));
85
86 gboolean need_to_resume_counter = !last_ack_type_is_suspended && suspended;
87 if (need_to_resume_counter)
88 window_size_counter_resume(&self->window_size);
89 if (old_window_size == 0 || need_to_resume_counter)
90 log_source_wakeup(self);
91 }
92
93 static void
_flow_control_rate_adjust(LogSource * self)94 _flow_control_rate_adjust(LogSource *self)
95 {
96 #ifdef SYSLOG_NG_HAVE_CLOCK_GETTIME
97 guint32 cur_ack_count, last_ack_count;
98 /* NOTE: this is racy. msg_ack may be executing in different writer
99 * threads. I don't want to lock, all we need is an approximate value of
100 * the ACK rate of the last couple of seconds. */
101
102 if (accurate_nanosleep && self->threaded)
103 {
104 cur_ack_count = ++self->ack_count;
105 if ((cur_ack_count & 0x3FFF) == 0)
106 {
107 struct timespec now;
108 glong diff;
109
110 /* do this every once in a while, once in 16k messages should be fine */
111
112 last_ack_count = self->last_ack_count;
113
114 /* make sure that we have at least 16k messages to measure the rate
115 * for. Because of the race we may have last_ack_count ==
116 * cur_ack_count if another thread already measured the same span */
117
118 if (last_ack_count < cur_ack_count - 16383)
119 {
120 clock_gettime(CLOCK_MONOTONIC, &now);
121 if (now.tv_sec > self->last_ack_rate_time.tv_sec + 6)
122 {
123 /* last check was too far apart, this means the rate is quite slow. turn off sleeping. */
124 self->window_full_sleep_nsec = 0;
125 self->last_ack_rate_time = now;
126 }
127 else
128 {
129 /* ok, we seem to have a close enough measurement, this means
130 * we do have a high rate. Calculate how much we should sleep
131 * in case the window gets full */
132
133 diff = timespec_diff_nsec(&now, &self->last_ack_rate_time);
134 self->window_full_sleep_nsec = (diff / (cur_ack_count - last_ack_count));
135 if (self->window_full_sleep_nsec > 1e6)
136 {
137 /* in case we'd be waiting for 1msec for another free slot in the window, let's go to background instead */
138 self->window_full_sleep_nsec = 0;
139 }
140 else
141 {
142 /* otherwise let's wait for about 8 message to be emptied before going back to the loop, but clamp the maximum time to 0.1msec */
143 self->window_full_sleep_nsec <<= 3;
144 if (self->window_full_sleep_nsec > 1e5)
145 self->window_full_sleep_nsec = 1e5;
146 }
147 self->last_ack_count = cur_ack_count;
148 self->last_ack_rate_time = now;
149 }
150 }
151 }
152 }
153 #endif
154 }
155
156 void
log_source_flow_control_adjust(LogSource * self,guint32 window_size_increment)157 log_source_flow_control_adjust(LogSource *self, guint32 window_size_increment)
158 {
159 _flow_control_window_size_adjust(self, window_size_increment, FALSE);
160 _flow_control_rate_adjust(self);
161 }
162
163 void
log_source_flow_control_adjust_when_suspended(LogSource * self,guint32 window_size_increment)164 log_source_flow_control_adjust_when_suspended(LogSource *self, guint32 window_size_increment)
165 {
166 _flow_control_window_size_adjust(self, window_size_increment, TRUE);
167 _flow_control_rate_adjust(self);
168 }
169
170 void
log_source_disable_bookmark_saving(LogSource * self)171 log_source_disable_bookmark_saving(LogSource *self)
172 {
173 ack_tracker_disable_bookmark_saving(self->ack_tracker);
174 }
175
176 /**
177 * log_source_msg_ack:
178 *
179 * This is running in the same thread as the _destination_, thus care must
180 * be taken when manipulating the LogSource data structure.
181 **/
182 static void
log_source_msg_ack(LogMessage * msg,AckType ack_type)183 log_source_msg_ack(LogMessage *msg, AckType ack_type)
184 {
185 AckTracker *ack_tracker = msg->ack_record->tracker;
186 ack_tracker_manage_msg_ack(ack_tracker, msg, ack_type);
187 }
188
189 void
log_source_flow_control_suspend(LogSource * self)190 log_source_flow_control_suspend(LogSource *self)
191 {
192 msg_debug("Source has been suspended",
193 log_pipe_location_tag(&self->super),
194 evt_tag_str("function", __FUNCTION__));
195
196 window_size_counter_suspend(&self->window_size);
197 }
198
199 void
log_source_enable_dynamic_window(LogSource * self,DynamicWindowPool * window_pool)200 log_source_enable_dynamic_window(LogSource *self, DynamicWindowPool *window_pool)
201 {
202 dynamic_window_set_pool(&self->dynamic_window, dynamic_window_pool_ref(window_pool));
203 }
204
205 gboolean
log_source_is_dynamic_window_enabled(LogSource * self)206 log_source_is_dynamic_window_enabled(LogSource *self)
207 {
208 return dynamic_window_is_enabled(&self->dynamic_window);
209 }
210
211 void
log_source_dynamic_window_update_statistics(LogSource * self)212 log_source_dynamic_window_update_statistics(LogSource *self)
213 {
214 dynamic_window_stat_update(&self->dynamic_window.stat, window_size_counter_get(&self->window_size, NULL));
215 msg_trace("Updating dynamic window statistic", evt_tag_int("avg window size",
216 dynamic_window_stat_get_avg(&self->dynamic_window.stat)));
217 }
218
219 static void
_reclaim_dynamic_window(LogSource * self,gsize window_size)220 _reclaim_dynamic_window(LogSource *self, gsize window_size)
221 {
222 g_assert(self->full_window_size - window_size >= self->initial_window_size);
223 atomic_gssize_set(&self->window_size_to_be_reclaimed, window_size);
224 }
225
226 static void
_release_dynamic_window(LogSource * self)227 _release_dynamic_window(LogSource *self)
228 {
229 g_assert(self->ack_tracker == NULL);
230
231 gsize dynamic_part = self->full_window_size - self->initial_window_size;
232 msg_trace("Releasing dynamic part of the window", evt_tag_int("dynamic_window_to_be_released", dynamic_part),
233 log_pipe_location_tag(&self->super));
234
235 self->full_window_size -= dynamic_part;
236 stats_counter_sub(self->stat_full_window, dynamic_part);
237
238 window_size_counter_sub(&self->window_size, dynamic_part, NULL);
239 stats_counter_sub(self->stat_window_size, dynamic_part);
240 dynamic_window_release(&self->dynamic_window, dynamic_part);
241
242 dynamic_window_pool_unref(self->dynamic_window.pool);
243 }
244
245 static void
_inc_balanced(LogSource * self,gsize inc)246 _inc_balanced(LogSource *self, gsize inc)
247 {
248 gsize offered_dynamic = dynamic_window_request(&self->dynamic_window, inc);
249
250 msg_trace("Balance::increase",
251 log_pipe_location_tag(&self->super),
252 evt_tag_printf("connection", "%p", self),
253 evt_tag_int("old_full_window_size", self->full_window_size),
254 evt_tag_int("new_full_window_size", self->full_window_size + offered_dynamic));
255
256 self->full_window_size += offered_dynamic;
257 stats_counter_add(self->stat_full_window, offered_dynamic);
258
259 gsize old_window_size = window_size_counter_add(&self->window_size, offered_dynamic, NULL);
260 stats_counter_add(self->stat_window_size, offered_dynamic);
261 if (old_window_size == 0 && offered_dynamic != 0)
262 log_source_wakeup(self);
263 }
264
265 static void
_dec_balanced(LogSource * self,gsize dec)266 _dec_balanced(LogSource *self, gsize dec)
267 {
268 gsize new_full_window_size = self->full_window_size - dec;
269
270 gsize empty_window = window_size_counter_get(&self->window_size, NULL);
271 gsize remaining_sub = 0;
272
273 if (empty_window <= dec)
274 {
275 remaining_sub = dec - empty_window;
276 if (empty_window == 0)
277 {
278 dec = 0;
279 }
280 else
281 {
282 dec = empty_window - 1;
283 }
284
285 new_full_window_size = self->full_window_size - dec;
286 _reclaim_dynamic_window(self, remaining_sub);
287 }
288
289 window_size_counter_sub(&self->window_size, dec, NULL);
290 stats_counter_sub(self->stat_window_size, dec);
291
292 msg_trace("Balance::decrease",
293 log_pipe_location_tag(&self->super),
294 evt_tag_printf("connection", "%p", self),
295 evt_tag_int("old_full_window_size", self->full_window_size),
296 evt_tag_int("new_full_window_size", new_full_window_size),
297 evt_tag_int("to_be_reclaimed", remaining_sub));
298
299 self->full_window_size = new_full_window_size;
300 stats_counter_set(self->stat_full_window, new_full_window_size);
301 dynamic_window_release(&self->dynamic_window, dec);
302 }
303
304 static gboolean
_reclaim_window_instead_of_rebalance(LogSource * self)305 _reclaim_window_instead_of_rebalance(LogSource *self)
306 {
307 //check pending_reclaimed
308 gssize total_reclaim = atomic_gssize_set_and_get(&self->pending_reclaimed, 0);
309 gssize to_be_reclaimed = atomic_gssize_get(&self->window_size_to_be_reclaimed);
310 gboolean reclaim_in_progress = (to_be_reclaimed > 0);
311
312 if (total_reclaim > 0)
313 {
314 self->full_window_size -= total_reclaim;
315 stats_counter_sub(self->stat_full_window, total_reclaim);
316 dynamic_window_release(&self->dynamic_window, total_reclaim);
317 }
318 else
319 {
320 //to avoid underflow, we need to set a value <= 0
321 if (to_be_reclaimed < 0)
322 atomic_gssize_set(&self->window_size_to_be_reclaimed, 0);
323 }
324
325 msg_trace("Checking if reclaim is in progress...",
326 log_pipe_location_tag(&self->super),
327 evt_tag_printf("connection", "%p", self),
328 evt_tag_printf("in progress", "%s", reclaim_in_progress ? "yes" : "no"),
329 evt_tag_long("total_reclaim", total_reclaim));
330
331 return reclaim_in_progress;
332 }
333
334 static void
_dynamic_window_rebalance(LogSource * self)335 _dynamic_window_rebalance(LogSource *self)
336 {
337 gsize current_dynamic_win = self->full_window_size - self->initial_window_size;
338 gboolean have_to_increase = current_dynamic_win < self->dynamic_window.pool->balanced_window;
339 gboolean have_to_decrease = current_dynamic_win > self->dynamic_window.pool->balanced_window;
340
341 msg_trace("Rebalance dynamic window",
342 log_pipe_location_tag(&self->super),
343 evt_tag_printf("connection", "%p", self),
344 evt_tag_int("full_window", self->full_window_size),
345 evt_tag_int("dynamic_win", current_dynamic_win),
346 evt_tag_int("static_window", self->initial_window_size),
347 evt_tag_int("balanced_window", self->dynamic_window.pool->balanced_window),
348 evt_tag_int("avg_free", dynamic_window_stat_get_avg(&self->dynamic_window.stat)));
349
350 if (have_to_increase)
351 _inc_balanced(self, self->dynamic_window.pool->balanced_window - current_dynamic_win);
352 else if (have_to_decrease)
353 _dec_balanced(self, current_dynamic_win - self->dynamic_window.pool->balanced_window);
354 }
355
356 void
log_source_dynamic_window_realloc(LogSource * self)357 log_source_dynamic_window_realloc(LogSource *self)
358 {
359 /* it is safe to assume that the window size is not decremented while this function runs,
360 * only incrementation is possible by destination threads */
361
362 if (!_reclaim_window_instead_of_rebalance(self))
363 _dynamic_window_rebalance(self);
364
365 dynamic_window_stat_reset(&self->dynamic_window.stat);
366 }
367
368 void
log_source_mangle_hostname(LogSource * self,LogMessage * msg)369 log_source_mangle_hostname(LogSource *self, LogMessage *msg)
370 {
371 const gchar *resolved_name;
372 gsize resolved_name_len;
373 const gchar *orig_host;
374
375 resolved_name = resolve_sockaddr_to_hostname(&resolved_name_len, msg->saddr, &self->options->host_resolve_options);
376 log_msg_set_value(msg, LM_V_HOST_FROM, resolved_name, resolved_name_len);
377
378 orig_host = log_msg_get_value(msg, LM_V_HOST, NULL);
379 if (!self->options->keep_hostname || !orig_host || !orig_host[0])
380 {
381 gchar host[256];
382 gint host_len = -1;
383 if (G_UNLIKELY(self->options->chain_hostnames))
384 {
385 msg->flags |= LF_CHAINED_HOSTNAME;
386 if (msg->flags & LF_SIMPLE_HOSTNAME)
387 {
388 /* local without group name */
389 host_len = g_snprintf(host, sizeof(host), "%s", resolved_name);
390 }
391 else if (msg->flags & LF_LOCAL)
392 {
393 /* local */
394 host_len = g_snprintf(host, sizeof(host), "%s@%s", self->options->group_name, resolved_name);
395 }
396 else if (!orig_host || !orig_host[0])
397 {
398 /* remote && no hostname */
399 host_len = g_snprintf(host, sizeof(host), "%s/%s", resolved_name, resolved_name);
400 }
401 else
402 {
403 /* everything else, append source hostname */
404 host_len = g_snprintf(host, sizeof(host), "%s/%s", orig_host, resolved_name);
405 }
406 if (host_len >= sizeof(host))
407 host_len = sizeof(host) - 1;
408 log_msg_set_value(msg, LM_V_HOST, host, host_len);
409 }
410 else
411 {
412 log_msg_set_value(msg, LM_V_HOST, resolved_name, resolved_name_len);
413 }
414 }
415 }
416
417 static void
_register_window_stats(LogSource * self)418 _register_window_stats(LogSource *self)
419 {
420 if (!stats_check_level(4))
421 return;
422
423 const gchar *instance_name = self->name ? : self->stats_instance;
424
425 StatsClusterKey sc_key;
426 stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id,
427 instance_name, "free_window");
428 self->stat_window_size_cluster = stats_register_dynamic_counter(4, &sc_key, SC_TYPE_SINGLE_VALUE,
429 &self->stat_window_size);
430 stats_counter_set(self->stat_window_size, window_size_counter_get(&self->window_size, NULL));
431
432
433 stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id,
434 instance_name, "full_window");
435 self->stat_full_window_cluster = stats_register_dynamic_counter(4, &sc_key, SC_TYPE_SINGLE_VALUE,
436 &self->stat_full_window);
437 stats_counter_set(self->stat_full_window, self->full_window_size);
438
439 }
440
441 static void
_unregister_window_stats(LogSource * self)442 _unregister_window_stats(LogSource *self)
443 {
444 if (!stats_check_level(4))
445 return;
446
447 stats_unregister_dynamic_counter(self->stat_window_size_cluster, SC_TYPE_SINGLE_VALUE, &self->stat_window_size);
448 stats_unregister_dynamic_counter(self->stat_full_window_cluster, SC_TYPE_SINGLE_VALUE, &self->stat_full_window);
449 }
450
451 static inline void
_create_ack_tracker_if_not_exists(LogSource * self)452 _create_ack_tracker_if_not_exists(LogSource *self)
453 {
454 if (!self->ack_tracker)
455 {
456 if (!self->ack_tracker_factory)
457 self->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
458 self->ack_tracker = ack_tracker_factory_create(self->ack_tracker_factory, self);
459 }
460 }
461
462 gboolean
log_source_init(LogPipe * s)463 log_source_init(LogPipe *s)
464 {
465 LogSource *self = (LogSource *) s;
466
467 _create_ack_tracker_if_not_exists(self);
468 if (!ack_tracker_init(self->ack_tracker))
469 {
470 msg_error("Failed to initialize AckTracker");
471 return FALSE;
472 }
473
474 stats_lock();
475 StatsClusterKey sc_key;
476 stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance);
477 stats_register_counter(self->options->stats_level, &sc_key,
478 SC_TYPE_PROCESSED, &self->recvd_messages);
479 stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_STAMP, &self->last_message_seen);
480
481 _register_window_stats(self);
482
483 stats_unlock();
484
485 return TRUE;
486 }
487
488 gboolean
log_source_deinit(LogPipe * s)489 log_source_deinit(LogPipe *s)
490 {
491 LogSource *self = (LogSource *) s;
492 ack_tracker_deinit(self->ack_tracker);
493
494 stats_lock();
495 StatsClusterKey sc_key;
496 stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance);
497 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->recvd_messages);
498 stats_unregister_counter(&sc_key, SC_TYPE_STAMP, &self->last_message_seen);
499
500 _unregister_window_stats(self);
501
502 stats_unlock();
503
504 return TRUE;
505 }
506
507 void
log_source_post(LogSource * self,LogMessage * msg)508 log_source_post(LogSource *self, LogMessage *msg)
509 {
510 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
511 gint old_window_size;
512
513 ack_tracker_track_msg(self->ack_tracker, msg);
514
515 /* NOTE: we start by enabling flow-control, thus we need an acknowledgement */
516 path_options.ack_needed = TRUE;
517 log_msg_ref(msg);
518 log_msg_add_ack(msg, &path_options);
519 msg->ack_func = log_source_msg_ack;
520
521 old_window_size = window_size_counter_sub(&self->window_size, 1, NULL);
522 stats_counter_sub(self->stat_window_size, 1);
523
524 if (G_UNLIKELY(old_window_size == 1))
525 {
526 msg_debug("Source has been suspended",
527 log_pipe_location_tag(&self->super),
528 evt_tag_str("function", __FUNCTION__));
529 }
530
531 /*
532 * NOTE: this assertion validates that the source is not overflowing its
533 * own flow-control window size, decreased above, by the atomic statement.
534 *
535 * If the _old_ value is zero, that means that the decrement operation
536 * above has decreased the value to -1.
537 */
538
539 g_assert(old_window_size > 0);
540
541 ScratchBuffersMarker mark;
542 scratch_buffers_mark(&mark);
543 log_pipe_queue(&self->super, msg, &path_options);
544 scratch_buffers_reclaim_marked(mark);
545 }
546
547 static gboolean
_invoke_mangle_callbacks(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)548 _invoke_mangle_callbacks(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
549 {
550 LogSource *self = (LogSource *) s;
551 GList *next_item = g_list_first(self->options->source_queue_callbacks);
552
553 while(next_item)
554 {
555 if(next_item->data)
556 {
557 if(!((mangle_callback) (next_item->data))(log_pipe_get_config(s), msg, self))
558 {
559 log_msg_drop(msg, path_options, AT_PROCESSED);
560 return FALSE;
561 }
562 }
563 next_item = next_item->next;
564 }
565 return TRUE;
566 }
567
568 static void
log_source_override_host(LogSource * self,LogMessage * msg)569 log_source_override_host(LogSource *self, LogMessage *msg)
570 {
571 if (self->options->host_override_len < 0)
572 self->options->host_override_len = strlen(self->options->host_override);
573 log_msg_set_value(msg, LM_V_HOST, self->options->host_override, self->options->host_override_len);
574 }
575
576 static void
log_source_override_program(LogSource * self,LogMessage * msg)577 log_source_override_program(LogSource *self, LogMessage *msg)
578 {
579 if (self->options->program_override_len < 0)
580 self->options->program_override_len = strlen(self->options->program_override);
581 log_msg_set_value(msg, LM_V_PROGRAM, self->options->program_override, self->options->program_override_len);
582 }
583
584 static gchar *
_get_pid_string(void)585 _get_pid_string(void)
586 {
587 #define MAX_PID_CHAR_COUNT 20 /* max PID on 64 bit systems is 2^64 - 1, which is 19 characters, +1 for terminating 0 */
588
589 static gchar pid_string[MAX_PID_CHAR_COUNT];
590
591 if (pid_string[0] == '\0')
592 {
593 #ifdef _WIN32
594 g_snprintf(pid_string, MAX_PID_CHAR_COUNT, "%lu", GetCurrentProcessId());
595 #else
596 g_snprintf(pid_string, MAX_PID_CHAR_COUNT, "%d", getpid());
597 #endif
598 }
599
600 return pid_string;
601 }
602
603 static void
log_source_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)604 log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
605 {
606 LogSource *self = (LogSource *) s;
607 gint i;
608
609 msg_set_context(msg);
610
611 msg_diagnostics(">>>>>> Source side message processing begin",
612 evt_tag_str("instance", self->stats_instance ? self->stats_instance : "internal"),
613 log_pipe_location_tag(s),
614 evt_tag_printf("msg", "%p", msg));
615
616 /* $HOST setup */
617 log_source_mangle_hostname(self, msg);
618
619 if (self->options->use_syslogng_pid)
620 log_msg_set_value(msg, LM_V_PID, _get_pid_string(), -1);
621
622 /* source specific tags */
623 if (self->options->tags)
624 {
625 for (i = 0; i < self->options->tags->len; i++)
626 {
627 log_msg_set_tag_by_id(msg, g_array_index(self->options->tags, LogTagId, i));
628 }
629 }
630
631 log_msg_set_tag_by_id(msg, self->options->source_group_tag);
632
633
634 if (!_invoke_mangle_callbacks(s, msg, path_options))
635 return;
636
637 if (self->options->host_override)
638 log_source_override_host(self, msg);
639
640 if (self->options->program_override)
641 log_source_override_program(self, msg);
642
643 msg_stats_update_counters(self->stats_id, msg);
644
645 /* message setup finished, send it out */
646
647 stats_counter_inc(self->recvd_messages);
648 stats_counter_set(self->last_message_seen, msg->timestamps[LM_TS_RECVD].ut_sec);
649 log_pipe_forward_msg(s, msg, path_options);
650
651 if (accurate_nanosleep && self->threaded && self->window_full_sleep_nsec > 0 && !log_source_free_to_send(self))
652 {
653 struct timespec ts;
654
655 /* wait one 0.1msec in the hope that the buffer clears up */
656 ts.tv_sec = 0;
657 ts.tv_nsec = self->window_full_sleep_nsec;
658 nanosleep(&ts, NULL);
659 }
660 msg_diagnostics("<<<<<< Source side message processing finish",
661 evt_tag_str("instance", self->stats_instance ? self->stats_instance : "internal"),
662 log_pipe_location_tag(s),
663 evt_tag_printf("msg", "%p", msg));
664
665 msg_set_context(NULL);
666 }
667
668 static void
_initialize_window(LogSource * self,gint init_window_size)669 _initialize_window(LogSource *self, gint init_window_size)
670 {
671 self->window_initialized = TRUE;
672 window_size_counter_set(&self->window_size, init_window_size);
673
674 self->initial_window_size = init_window_size;
675 self->full_window_size = init_window_size;
676 }
677
678 static gboolean
_is_window_initialized(LogSource * self)679 _is_window_initialized(LogSource *self)
680 {
681 return self->window_initialized;
682 }
683
684 void
log_source_set_options(LogSource * self,LogSourceOptions * options,const gchar * stats_id,const gchar * stats_instance,gboolean threaded,LogExprNode * expr_node)685 log_source_set_options(LogSource *self, LogSourceOptions *options,
686 const gchar *stats_id, const gchar *stats_instance,
687 gboolean threaded, LogExprNode *expr_node)
688 {
689 /* NOTE: we don't adjust window_size even in case it was changed in the
690 * configuration and we received a SIGHUP. This means that opened
691 * connections will not have their window_size changed. */
692
693 if (!_is_window_initialized(self))
694 _initialize_window(self, options->init_window_size);
695
696 self->options = options;
697 if (self->stats_id)
698 g_free(self->stats_id);
699 self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
700 if (self->stats_instance)
701 g_free(self->stats_instance);
702 self->stats_instance = stats_instance ? g_strdup(stats_instance): NULL;
703 self->threaded = threaded;
704
705 log_pipe_detach_expr_node(&self->super);
706 log_pipe_attach_expr_node(&self->super, expr_node);
707 }
708
709 void
log_source_set_ack_tracker_factory(LogSource * self,AckTrackerFactory * factory)710 log_source_set_ack_tracker_factory(LogSource *self, AckTrackerFactory *factory)
711 {
712 ack_tracker_factory_unref(self->ack_tracker_factory);
713 self->ack_tracker_factory = factory;
714 }
715
716 void
log_source_set_name(LogSource * self,const gchar * name)717 log_source_set_name(LogSource *self, const gchar *name)
718 {
719 g_free(self->name);
720 self->name = name ? g_strdup(name) : NULL;
721 }
722
723 void
log_source_init_instance(LogSource * self,GlobalConfig * cfg)724 log_source_init_instance(LogSource *self, GlobalConfig *cfg)
725 {
726 log_pipe_init_instance(&self->super, cfg);
727 self->super.queue = log_source_queue;
728 self->super.free_fn = log_source_free;
729 self->super.init = log_source_init;
730 self->super.deinit = log_source_deinit;
731 self->window_initialized = FALSE;
732 self->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
733 self->ack_tracker = NULL;
734 }
735
736 void
log_source_free(LogPipe * s)737 log_source_free(LogPipe *s)
738 {
739 LogSource *self = (LogSource *) s;
740
741 ack_tracker_free(self->ack_tracker);
742 self->ack_tracker = NULL;
743
744 g_free(self->name);
745 g_free(self->stats_id);
746 g_free(self->stats_instance);
747 log_pipe_detach_expr_node(&self->super);
748 log_pipe_free_method(s);
749
750 ack_tracker_factory_unref(self->ack_tracker_factory);
751 if (G_UNLIKELY(dynamic_window_is_enabled(&self->dynamic_window)))
752 _release_dynamic_window(self);
753 }
754
755 void
log_source_options_defaults(LogSourceOptions * options)756 log_source_options_defaults(LogSourceOptions *options)
757 {
758 options->init_window_size = -1;
759 options->keep_hostname = -1;
760 options->chain_hostnames = -1;
761 options->keep_timestamp = -1;
762 options->program_override_len = -1;
763 options->host_override_len = -1;
764 options->tags = NULL;
765 options->read_old_records = TRUE;
766 host_resolve_options_defaults(&options->host_resolve_options);
767 }
768
769 /* NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy */
770 void
log_source_options_init(LogSourceOptions * options,GlobalConfig * cfg,const gchar * group_name)771 log_source_options_init(LogSourceOptions *options, GlobalConfig *cfg, const gchar *group_name)
772 {
773 gchar *source_group_name;
774
775 options->source_queue_callbacks = cfg->source_mangle_callback_list;
776
777 if (options->init_window_size == -1)
778 options->init_window_size = 100;
779 if (options->keep_hostname == -1)
780 options->keep_hostname = cfg->keep_hostname;
781 if (options->chain_hostnames == -1)
782 options->chain_hostnames = cfg->chain_hostnames;
783 if (options->keep_timestamp == -1)
784 options->keep_timestamp = cfg->keep_timestamp;
785 options->group_name = group_name;
786
787 source_group_name = g_strdup_printf(".source.%s", group_name);
788 options->source_group_tag = log_tags_get_by_name(source_group_name);
789 g_free(source_group_name);
790 host_resolve_options_init(&options->host_resolve_options, &cfg->host_resolve_options);
791 }
792
793 void
log_source_options_destroy(LogSourceOptions * options)794 log_source_options_destroy(LogSourceOptions *options)
795 {
796 host_resolve_options_destroy(&options->host_resolve_options);
797 if (options->program_override)
798 g_free(options->program_override);
799 if (options->host_override)
800 g_free(options->host_override);
801 if (options->tags)
802 {
803 g_array_free(options->tags, TRUE);
804 options->tags = NULL;
805 }
806 }
807
808 void
log_source_options_set_tags(LogSourceOptions * options,GList * tags)809 log_source_options_set_tags(LogSourceOptions *options, GList *tags)
810 {
811 LogTagId id;
812
813 if (!options->tags)
814 options->tags = g_array_new(FALSE, FALSE, sizeof(LogTagId));
815
816 while (tags)
817 {
818 id = log_tags_get_by_name((gchar *) tags->data);
819 g_array_append_val(options->tags, id);
820
821 g_free(tags->data);
822 tags = g_list_delete_link(tags, tags);
823 }
824 }
825
826 void
log_source_global_init(void)827 log_source_global_init(void)
828 {
829 accurate_nanosleep = check_nanosleep();
830 if (!accurate_nanosleep)
831 {
832 msg_debug("nanosleep() is not accurate enough to introduce minor stalls on the reader side, multi-threaded performance may be affected");
833 }
834 }
835