1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logsource.h"
26 #include "messages.h"
27 #include "host-resolve.h"
28 #include "stats/stats-registry.h"
29 #include "stats/stats-cluster-single.h"
30 #include "msg-stats.h"
31 #include "logmsg/tags.h"
32 #include "ack-tracker/ack_tracker.h"
33 #include "ack-tracker/ack_tracker_factory.h"
34 #include "timeutils/misc.h"
35 #include "scratch-buffers.h"
36 
37 #include <string.h>
38 #include <unistd.h>
39 
40 gboolean accurate_nanosleep = FALSE;
41 
42 void
log_source_wakeup(LogSource * self)43 log_source_wakeup(LogSource *self)
44 {
45   if (self->wakeup)
46     self->wakeup(self);
47 
48   msg_diagnostics("Source has been resumed", log_pipe_location_tag(&self->super));
49 }
50 
51 static inline guint32
_take_reclaimed_window(LogSource * self,guint32 window_size_increment)52 _take_reclaimed_window(LogSource *self, guint32 window_size_increment)
53 {
54   gssize old = atomic_gssize_sub(&self->window_size_to_be_reclaimed, window_size_increment);
55   gboolean reclaim_in_progress = (old > 0);
56 
57   if (!reclaim_in_progress)
58     {
59       return window_size_increment;
60     }
61 
62   guint32 remaining_window_size_increment = MAX(window_size_increment - old, 0);
63   guint32 reclaimed = window_size_increment - remaining_window_size_increment;
64   atomic_gssize_add(&self->pending_reclaimed, reclaimed);
65 
66   return remaining_window_size_increment;
67 }
68 
69 static inline void
_flow_control_window_size_adjust(LogSource * self,guint32 window_size_increment,gboolean last_ack_type_is_suspended)70 _flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment, gboolean last_ack_type_is_suspended)
71 {
72   gboolean suspended;
73 
74   if (G_UNLIKELY(dynamic_window_is_enabled(&self->dynamic_window)))
75     window_size_increment = _take_reclaimed_window(self, window_size_increment);
76 
77   gsize old_window_size = window_size_counter_add(&self->window_size, window_size_increment, &suspended);
78   stats_counter_add(self->stat_window_size, window_size_increment);
79 
80   msg_diagnostics("Window size adjustment",
81                   evt_tag_int("old_window_size", old_window_size),
82                   evt_tag_int("window_size_increment", window_size_increment),
83                   evt_tag_str("suspended_before_increment", suspended ? "TRUE" : "FALSE"),
84                   evt_tag_str("last_ack_type_is_suspended", last_ack_type_is_suspended ? "TRUE" : "FALSE"));
85 
86   gboolean need_to_resume_counter = !last_ack_type_is_suspended && suspended;
87   if (need_to_resume_counter)
88     window_size_counter_resume(&self->window_size);
89   if (old_window_size == 0 || need_to_resume_counter)
90     log_source_wakeup(self);
91 }
92 
93 static void
_flow_control_rate_adjust(LogSource * self)94 _flow_control_rate_adjust(LogSource *self)
95 {
96 #ifdef SYSLOG_NG_HAVE_CLOCK_GETTIME
97   guint32 cur_ack_count, last_ack_count;
98   /* NOTE: this is racy. msg_ack may be executing in different writer
99    * threads. I don't want to lock, all we need is an approximate value of
100    * the ACK rate of the last couple of seconds.  */
101 
102   if (accurate_nanosleep && self->threaded)
103     {
104       cur_ack_count = ++self->ack_count;
105       if ((cur_ack_count & 0x3FFF) == 0)
106         {
107           struct timespec now;
108           glong diff;
109 
110           /* do this every once in a while, once in 16k messages should be fine */
111 
112           last_ack_count = self->last_ack_count;
113 
114           /* make sure that we have at least 16k messages to measure the rate
115            * for.  Because of the race we may have last_ack_count ==
116            * cur_ack_count if another thread already measured the same span */
117 
118           if (last_ack_count < cur_ack_count - 16383)
119             {
120               clock_gettime(CLOCK_MONOTONIC, &now);
121               if (now.tv_sec > self->last_ack_rate_time.tv_sec + 6)
122                 {
123                   /* last check was too far apart, this means the rate is quite slow. turn off sleeping. */
124                   self->window_full_sleep_nsec = 0;
125                   self->last_ack_rate_time = now;
126                 }
127               else
128                 {
129                   /* ok, we seem to have a close enough measurement, this means
130                    * we do have a high rate.  Calculate how much we should sleep
131                    * in case the window gets full */
132 
133                   diff = timespec_diff_nsec(&now, &self->last_ack_rate_time);
134                   self->window_full_sleep_nsec = (diff / (cur_ack_count - last_ack_count));
135                   if (self->window_full_sleep_nsec > 1e6)
136                     {
137                       /* in case we'd be waiting for 1msec for another free slot in the window, let's go to background instead */
138                       self->window_full_sleep_nsec = 0;
139                     }
140                   else
141                     {
142                       /* otherwise let's wait for about 8 message to be emptied before going back to the loop, but clamp the maximum time to 0.1msec */
143                       self->window_full_sleep_nsec <<= 3;
144                       if (self->window_full_sleep_nsec > 1e5)
145                         self->window_full_sleep_nsec = 1e5;
146                     }
147                   self->last_ack_count = cur_ack_count;
148                   self->last_ack_rate_time = now;
149                 }
150             }
151         }
152     }
153 #endif
154 }
155 
156 void
log_source_flow_control_adjust(LogSource * self,guint32 window_size_increment)157 log_source_flow_control_adjust(LogSource *self, guint32 window_size_increment)
158 {
159   _flow_control_window_size_adjust(self, window_size_increment, FALSE);
160   _flow_control_rate_adjust(self);
161 }
162 
163 void
log_source_flow_control_adjust_when_suspended(LogSource * self,guint32 window_size_increment)164 log_source_flow_control_adjust_when_suspended(LogSource *self, guint32 window_size_increment)
165 {
166   _flow_control_window_size_adjust(self, window_size_increment, TRUE);
167   _flow_control_rate_adjust(self);
168 }
169 
170 void
log_source_disable_bookmark_saving(LogSource * self)171 log_source_disable_bookmark_saving(LogSource *self)
172 {
173   ack_tracker_disable_bookmark_saving(self->ack_tracker);
174 }
175 
176 /**
177  * log_source_msg_ack:
178  *
179  * This is running in the same thread as the _destination_, thus care must
180  * be taken when manipulating the LogSource data structure.
181  **/
182 static void
log_source_msg_ack(LogMessage * msg,AckType ack_type)183 log_source_msg_ack(LogMessage *msg, AckType ack_type)
184 {
185   AckTracker *ack_tracker = msg->ack_record->tracker;
186   ack_tracker_manage_msg_ack(ack_tracker, msg, ack_type);
187 }
188 
189 void
log_source_flow_control_suspend(LogSource * self)190 log_source_flow_control_suspend(LogSource *self)
191 {
192   msg_debug("Source has been suspended",
193             log_pipe_location_tag(&self->super),
194             evt_tag_str("function", __FUNCTION__));
195 
196   window_size_counter_suspend(&self->window_size);
197 }
198 
199 void
log_source_enable_dynamic_window(LogSource * self,DynamicWindowPool * window_pool)200 log_source_enable_dynamic_window(LogSource *self, DynamicWindowPool *window_pool)
201 {
202   dynamic_window_set_pool(&self->dynamic_window, dynamic_window_pool_ref(window_pool));
203 }
204 
205 gboolean
log_source_is_dynamic_window_enabled(LogSource * self)206 log_source_is_dynamic_window_enabled(LogSource *self)
207 {
208   return dynamic_window_is_enabled(&self->dynamic_window);
209 }
210 
211 void
log_source_dynamic_window_update_statistics(LogSource * self)212 log_source_dynamic_window_update_statistics(LogSource *self)
213 {
214   dynamic_window_stat_update(&self->dynamic_window.stat, window_size_counter_get(&self->window_size, NULL));
215   msg_trace("Updating dynamic window statistic", evt_tag_int("avg window size",
216                                                              dynamic_window_stat_get_avg(&self->dynamic_window.stat)));
217 }
218 
219 static void
_reclaim_dynamic_window(LogSource * self,gsize window_size)220 _reclaim_dynamic_window(LogSource *self, gsize window_size)
221 {
222   g_assert(self->full_window_size - window_size >= self->initial_window_size);
223   atomic_gssize_set(&self->window_size_to_be_reclaimed, window_size);
224 }
225 
226 static void
_release_dynamic_window(LogSource * self)227 _release_dynamic_window(LogSource *self)
228 {
229   g_assert(self->ack_tracker == NULL);
230 
231   gsize dynamic_part = self->full_window_size - self->initial_window_size;
232   msg_trace("Releasing dynamic part of the window", evt_tag_int("dynamic_window_to_be_released", dynamic_part),
233             log_pipe_location_tag(&self->super));
234 
235   self->full_window_size -= dynamic_part;
236   stats_counter_sub(self->stat_full_window, dynamic_part);
237 
238   window_size_counter_sub(&self->window_size, dynamic_part, NULL);
239   stats_counter_sub(self->stat_window_size, dynamic_part);
240   dynamic_window_release(&self->dynamic_window, dynamic_part);
241 
242   dynamic_window_pool_unref(self->dynamic_window.pool);
243 }
244 
245 static void
_inc_balanced(LogSource * self,gsize inc)246 _inc_balanced(LogSource *self, gsize inc)
247 {
248   gsize offered_dynamic = dynamic_window_request(&self->dynamic_window, inc);
249 
250   msg_trace("Balance::increase",
251             log_pipe_location_tag(&self->super),
252             evt_tag_printf("connection", "%p", self),
253             evt_tag_int("old_full_window_size", self->full_window_size),
254             evt_tag_int("new_full_window_size", self->full_window_size + offered_dynamic));
255 
256   self->full_window_size += offered_dynamic;
257   stats_counter_add(self->stat_full_window, offered_dynamic);
258 
259   gsize old_window_size = window_size_counter_add(&self->window_size, offered_dynamic, NULL);
260   stats_counter_add(self->stat_window_size, offered_dynamic);
261   if (old_window_size == 0 && offered_dynamic != 0)
262     log_source_wakeup(self);
263 }
264 
265 static void
_dec_balanced(LogSource * self,gsize dec)266 _dec_balanced(LogSource *self, gsize dec)
267 {
268   gsize new_full_window_size = self->full_window_size - dec;
269 
270   gsize empty_window = window_size_counter_get(&self->window_size, NULL);
271   gsize remaining_sub = 0;
272 
273   if (empty_window <= dec)
274     {
275       remaining_sub = dec - empty_window;
276       if (empty_window == 0)
277         {
278           dec = 0;
279         }
280       else
281         {
282           dec = empty_window - 1;
283         }
284 
285       new_full_window_size = self->full_window_size - dec;
286       _reclaim_dynamic_window(self, remaining_sub);
287     }
288 
289   window_size_counter_sub(&self->window_size, dec, NULL);
290   stats_counter_sub(self->stat_window_size, dec);
291 
292   msg_trace("Balance::decrease",
293             log_pipe_location_tag(&self->super),
294             evt_tag_printf("connection", "%p", self),
295             evt_tag_int("old_full_window_size", self->full_window_size),
296             evt_tag_int("new_full_window_size", new_full_window_size),
297             evt_tag_int("to_be_reclaimed", remaining_sub));
298 
299   self->full_window_size = new_full_window_size;
300   stats_counter_set(self->stat_full_window, new_full_window_size);
301   dynamic_window_release(&self->dynamic_window, dec);
302 }
303 
304 static gboolean
_reclaim_window_instead_of_rebalance(LogSource * self)305 _reclaim_window_instead_of_rebalance(LogSource *self)
306 {
307   //check pending_reclaimed
308   gssize total_reclaim = atomic_gssize_set_and_get(&self->pending_reclaimed, 0);
309   gssize to_be_reclaimed = atomic_gssize_get(&self->window_size_to_be_reclaimed);
310   gboolean reclaim_in_progress = (to_be_reclaimed > 0);
311 
312   if (total_reclaim > 0)
313     {
314       self->full_window_size -= total_reclaim;
315       stats_counter_sub(self->stat_full_window, total_reclaim);
316       dynamic_window_release(&self->dynamic_window, total_reclaim);
317     }
318   else
319     {
320       //to avoid underflow, we need to set a value <= 0
321       if (to_be_reclaimed < 0)
322         atomic_gssize_set(&self->window_size_to_be_reclaimed, 0);
323     }
324 
325   msg_trace("Checking if reclaim is in progress...",
326             log_pipe_location_tag(&self->super),
327             evt_tag_printf("connection", "%p", self),
328             evt_tag_printf("in progress", "%s", reclaim_in_progress ? "yes" : "no"),
329             evt_tag_long("total_reclaim", total_reclaim));
330 
331   return reclaim_in_progress;
332 }
333 
334 static void
_dynamic_window_rebalance(LogSource * self)335 _dynamic_window_rebalance(LogSource *self)
336 {
337   gsize current_dynamic_win = self->full_window_size - self->initial_window_size;
338   gboolean have_to_increase = current_dynamic_win < self->dynamic_window.pool->balanced_window;
339   gboolean have_to_decrease = current_dynamic_win > self->dynamic_window.pool->balanced_window;
340 
341   msg_trace("Rebalance dynamic window",
342             log_pipe_location_tag(&self->super),
343             evt_tag_printf("connection", "%p", self),
344             evt_tag_int("full_window", self->full_window_size),
345             evt_tag_int("dynamic_win", current_dynamic_win),
346             evt_tag_int("static_window", self->initial_window_size),
347             evt_tag_int("balanced_window", self->dynamic_window.pool->balanced_window),
348             evt_tag_int("avg_free", dynamic_window_stat_get_avg(&self->dynamic_window.stat)));
349 
350   if (have_to_increase)
351     _inc_balanced(self, self->dynamic_window.pool->balanced_window - current_dynamic_win);
352   else if (have_to_decrease)
353     _dec_balanced(self, current_dynamic_win - self->dynamic_window.pool->balanced_window);
354 }
355 
356 void
log_source_dynamic_window_realloc(LogSource * self)357 log_source_dynamic_window_realloc(LogSource *self)
358 {
359   /* it is safe to assume that the window size is not decremented while this function runs,
360    * only incrementation is possible by destination threads */
361 
362   if (!_reclaim_window_instead_of_rebalance(self))
363     _dynamic_window_rebalance(self);
364 
365   dynamic_window_stat_reset(&self->dynamic_window.stat);
366 }
367 
368 void
log_source_mangle_hostname(LogSource * self,LogMessage * msg)369 log_source_mangle_hostname(LogSource *self, LogMessage *msg)
370 {
371   const gchar *resolved_name;
372   gsize resolved_name_len;
373   const gchar *orig_host;
374 
375   resolved_name = resolve_sockaddr_to_hostname(&resolved_name_len, msg->saddr, &self->options->host_resolve_options);
376   log_msg_set_value(msg, LM_V_HOST_FROM, resolved_name, resolved_name_len);
377 
378   orig_host = log_msg_get_value(msg, LM_V_HOST, NULL);
379   if (!self->options->keep_hostname || !orig_host || !orig_host[0])
380     {
381       gchar host[256];
382       gint host_len = -1;
383       if (G_UNLIKELY(self->options->chain_hostnames))
384         {
385           msg->flags |= LF_CHAINED_HOSTNAME;
386           if (msg->flags & LF_SIMPLE_HOSTNAME)
387             {
388               /* local without group name */
389               host_len = g_snprintf(host, sizeof(host), "%s", resolved_name);
390             }
391           else if (msg->flags & LF_LOCAL)
392             {
393               /* local */
394               host_len = g_snprintf(host, sizeof(host), "%s@%s", self->options->group_name, resolved_name);
395             }
396           else if (!orig_host || !orig_host[0])
397             {
398               /* remote && no hostname */
399               host_len = g_snprintf(host, sizeof(host), "%s/%s", resolved_name, resolved_name);
400             }
401           else
402             {
403               /* everything else, append source hostname */
404               host_len = g_snprintf(host, sizeof(host), "%s/%s", orig_host, resolved_name);
405             }
406           if (host_len >= sizeof(host))
407             host_len = sizeof(host) - 1;
408           log_msg_set_value(msg, LM_V_HOST, host, host_len);
409         }
410       else
411         {
412           log_msg_set_value(msg, LM_V_HOST, resolved_name, resolved_name_len);
413         }
414     }
415 }
416 
417 static void
_register_window_stats(LogSource * self)418 _register_window_stats(LogSource *self)
419 {
420   if (!stats_check_level(4))
421     return;
422 
423   const gchar *instance_name = self->name ? : self->stats_instance;
424 
425   StatsClusterKey sc_key;
426   stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id,
427                                          instance_name, "free_window");
428   self->stat_window_size_cluster = stats_register_dynamic_counter(4, &sc_key, SC_TYPE_SINGLE_VALUE,
429                                    &self->stat_window_size);
430   stats_counter_set(self->stat_window_size, window_size_counter_get(&self->window_size, NULL));
431 
432 
433   stats_cluster_single_key_set_with_name(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id,
434                                          instance_name, "full_window");
435   self->stat_full_window_cluster = stats_register_dynamic_counter(4, &sc_key, SC_TYPE_SINGLE_VALUE,
436                                    &self->stat_full_window);
437   stats_counter_set(self->stat_full_window, self->full_window_size);
438 
439 }
440 
441 static void
_unregister_window_stats(LogSource * self)442 _unregister_window_stats(LogSource *self)
443 {
444   if (!stats_check_level(4))
445     return;
446 
447   stats_unregister_dynamic_counter(self->stat_window_size_cluster, SC_TYPE_SINGLE_VALUE, &self->stat_window_size);
448   stats_unregister_dynamic_counter(self->stat_full_window_cluster, SC_TYPE_SINGLE_VALUE, &self->stat_full_window);
449 }
450 
451 static inline void
_create_ack_tracker_if_not_exists(LogSource * self)452 _create_ack_tracker_if_not_exists(LogSource *self)
453 {
454   if (!self->ack_tracker)
455     {
456       if (!self->ack_tracker_factory)
457         self->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
458       self->ack_tracker = ack_tracker_factory_create(self->ack_tracker_factory, self);
459     }
460 }
461 
462 gboolean
log_source_init(LogPipe * s)463 log_source_init(LogPipe *s)
464 {
465   LogSource *self = (LogSource *) s;
466 
467   _create_ack_tracker_if_not_exists(self);
468   if (!ack_tracker_init(self->ack_tracker))
469     {
470       msg_error("Failed to initialize AckTracker");
471       return FALSE;
472     }
473 
474   stats_lock();
475   StatsClusterKey sc_key;
476   stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance);
477   stats_register_counter(self->options->stats_level, &sc_key,
478                          SC_TYPE_PROCESSED, &self->recvd_messages);
479   stats_register_counter(self->options->stats_level, &sc_key, SC_TYPE_STAMP, &self->last_message_seen);
480 
481   _register_window_stats(self);
482 
483   stats_unlock();
484 
485   return TRUE;
486 }
487 
488 gboolean
log_source_deinit(LogPipe * s)489 log_source_deinit(LogPipe *s)
490 {
491   LogSource *self = (LogSource *) s;
492   ack_tracker_deinit(self->ack_tracker);
493 
494   stats_lock();
495   StatsClusterKey sc_key;
496   stats_cluster_logpipe_key_set(&sc_key, self->options->stats_source | SCS_SOURCE, self->stats_id, self->stats_instance);
497   stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->recvd_messages);
498   stats_unregister_counter(&sc_key, SC_TYPE_STAMP, &self->last_message_seen);
499 
500   _unregister_window_stats(self);
501 
502   stats_unlock();
503 
504   return TRUE;
505 }
506 
507 void
log_source_post(LogSource * self,LogMessage * msg)508 log_source_post(LogSource *self, LogMessage *msg)
509 {
510   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
511   gint old_window_size;
512 
513   ack_tracker_track_msg(self->ack_tracker, msg);
514 
515   /* NOTE: we start by enabling flow-control, thus we need an acknowledgement */
516   path_options.ack_needed = TRUE;
517   log_msg_ref(msg);
518   log_msg_add_ack(msg, &path_options);
519   msg->ack_func = log_source_msg_ack;
520 
521   old_window_size = window_size_counter_sub(&self->window_size, 1, NULL);
522   stats_counter_sub(self->stat_window_size, 1);
523 
524   if (G_UNLIKELY(old_window_size == 1))
525     {
526       msg_debug("Source has been suspended",
527                 log_pipe_location_tag(&self->super),
528                 evt_tag_str("function", __FUNCTION__));
529     }
530 
531   /*
532    * NOTE: this assertion validates that the source is not overflowing its
533    * own flow-control window size, decreased above, by the atomic statement.
534    *
535    * If the _old_ value is zero, that means that the decrement operation
536    * above has decreased the value to -1.
537    */
538 
539   g_assert(old_window_size > 0);
540 
541   ScratchBuffersMarker mark;
542   scratch_buffers_mark(&mark);
543   log_pipe_queue(&self->super, msg, &path_options);
544   scratch_buffers_reclaim_marked(mark);
545 }
546 
547 static gboolean
_invoke_mangle_callbacks(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)548 _invoke_mangle_callbacks(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
549 {
550   LogSource *self = (LogSource *) s;
551   GList *next_item = g_list_first(self->options->source_queue_callbacks);
552 
553   while(next_item)
554     {
555       if(next_item->data)
556         {
557           if(!((mangle_callback) (next_item->data))(log_pipe_get_config(s), msg, self))
558             {
559               log_msg_drop(msg, path_options, AT_PROCESSED);
560               return FALSE;
561             }
562         }
563       next_item = next_item->next;
564     }
565   return TRUE;
566 }
567 
568 static void
log_source_override_host(LogSource * self,LogMessage * msg)569 log_source_override_host(LogSource *self, LogMessage *msg)
570 {
571   if (self->options->host_override_len < 0)
572     self->options->host_override_len = strlen(self->options->host_override);
573   log_msg_set_value(msg, LM_V_HOST, self->options->host_override, self->options->host_override_len);
574 }
575 
576 static void
log_source_override_program(LogSource * self,LogMessage * msg)577 log_source_override_program(LogSource *self, LogMessage *msg)
578 {
579   if (self->options->program_override_len < 0)
580     self->options->program_override_len = strlen(self->options->program_override);
581   log_msg_set_value(msg, LM_V_PROGRAM, self->options->program_override, self->options->program_override_len);
582 }
583 
584 static gchar *
_get_pid_string(void)585 _get_pid_string(void)
586 {
587 #define MAX_PID_CHAR_COUNT 20 /* max PID on 64 bit systems is 2^64 - 1, which is 19 characters, +1 for terminating 0 */
588 
589   static gchar pid_string[MAX_PID_CHAR_COUNT];
590 
591   if (pid_string[0] == '\0')
592     {
593 #ifdef _WIN32
594       g_snprintf(pid_string, MAX_PID_CHAR_COUNT, "%lu", GetCurrentProcessId());
595 #else
596       g_snprintf(pid_string, MAX_PID_CHAR_COUNT, "%d", getpid());
597 #endif
598     }
599 
600   return pid_string;
601 }
602 
603 static void
log_source_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)604 log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
605 {
606   LogSource *self = (LogSource *) s;
607   gint i;
608 
609   msg_set_context(msg);
610 
611   msg_diagnostics(">>>>>> Source side message processing begin",
612                   evt_tag_str("instance", self->stats_instance ? self->stats_instance : "internal"),
613                   log_pipe_location_tag(s),
614                   evt_tag_printf("msg", "%p", msg));
615 
616   /* $HOST setup */
617   log_source_mangle_hostname(self, msg);
618 
619   if (self->options->use_syslogng_pid)
620     log_msg_set_value(msg, LM_V_PID, _get_pid_string(), -1);
621 
622   /* source specific tags */
623   if (self->options->tags)
624     {
625       for (i = 0; i < self->options->tags->len; i++)
626         {
627           log_msg_set_tag_by_id(msg, g_array_index(self->options->tags, LogTagId, i));
628         }
629     }
630 
631   log_msg_set_tag_by_id(msg, self->options->source_group_tag);
632 
633 
634   if (!_invoke_mangle_callbacks(s, msg, path_options))
635     return;
636 
637   if (self->options->host_override)
638     log_source_override_host(self, msg);
639 
640   if (self->options->program_override)
641     log_source_override_program(self, msg);
642 
643   msg_stats_update_counters(self->stats_id, msg);
644 
645   /* message setup finished, send it out */
646 
647   stats_counter_inc(self->recvd_messages);
648   stats_counter_set(self->last_message_seen, msg->timestamps[LM_TS_RECVD].ut_sec);
649   log_pipe_forward_msg(s, msg, path_options);
650 
651   if (accurate_nanosleep && self->threaded && self->window_full_sleep_nsec > 0 && !log_source_free_to_send(self))
652     {
653       struct timespec ts;
654 
655       /* wait one 0.1msec in the hope that the buffer clears up */
656       ts.tv_sec = 0;
657       ts.tv_nsec = self->window_full_sleep_nsec;
658       nanosleep(&ts, NULL);
659     }
660   msg_diagnostics("<<<<<< Source side message processing finish",
661                   evt_tag_str("instance", self->stats_instance ? self->stats_instance : "internal"),
662                   log_pipe_location_tag(s),
663                   evt_tag_printf("msg", "%p", msg));
664 
665   msg_set_context(NULL);
666 }
667 
668 static void
_initialize_window(LogSource * self,gint init_window_size)669 _initialize_window(LogSource *self, gint init_window_size)
670 {
671   self->window_initialized = TRUE;
672   window_size_counter_set(&self->window_size, init_window_size);
673 
674   self->initial_window_size = init_window_size;
675   self->full_window_size = init_window_size;
676 }
677 
678 static gboolean
_is_window_initialized(LogSource * self)679 _is_window_initialized(LogSource *self)
680 {
681   return self->window_initialized;
682 }
683 
684 void
log_source_set_options(LogSource * self,LogSourceOptions * options,const gchar * stats_id,const gchar * stats_instance,gboolean threaded,LogExprNode * expr_node)685 log_source_set_options(LogSource *self, LogSourceOptions *options,
686                        const gchar *stats_id, const gchar *stats_instance,
687                        gboolean threaded, LogExprNode *expr_node)
688 {
689   /* NOTE: we don't adjust window_size even in case it was changed in the
690    * configuration and we received a SIGHUP.  This means that opened
691    * connections will not have their window_size changed. */
692 
693   if (!_is_window_initialized(self))
694     _initialize_window(self, options->init_window_size);
695 
696   self->options = options;
697   if (self->stats_id)
698     g_free(self->stats_id);
699   self->stats_id = stats_id ? g_strdup(stats_id) : NULL;
700   if (self->stats_instance)
701     g_free(self->stats_instance);
702   self->stats_instance = stats_instance ? g_strdup(stats_instance): NULL;
703   self->threaded = threaded;
704 
705   log_pipe_detach_expr_node(&self->super);
706   log_pipe_attach_expr_node(&self->super, expr_node);
707 }
708 
709 void
log_source_set_ack_tracker_factory(LogSource * self,AckTrackerFactory * factory)710 log_source_set_ack_tracker_factory(LogSource *self, AckTrackerFactory *factory)
711 {
712   ack_tracker_factory_unref(self->ack_tracker_factory);
713   self->ack_tracker_factory = factory;
714 }
715 
716 void
log_source_set_name(LogSource * self,const gchar * name)717 log_source_set_name(LogSource *self, const gchar *name)
718 {
719   g_free(self->name);
720   self->name = name ? g_strdup(name) : NULL;
721 }
722 
723 void
log_source_init_instance(LogSource * self,GlobalConfig * cfg)724 log_source_init_instance(LogSource *self, GlobalConfig *cfg)
725 {
726   log_pipe_init_instance(&self->super, cfg);
727   self->super.queue = log_source_queue;
728   self->super.free_fn = log_source_free;
729   self->super.init = log_source_init;
730   self->super.deinit = log_source_deinit;
731   self->window_initialized = FALSE;
732   self->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
733   self->ack_tracker = NULL;
734 }
735 
736 void
log_source_free(LogPipe * s)737 log_source_free(LogPipe *s)
738 {
739   LogSource *self = (LogSource *) s;
740 
741   ack_tracker_free(self->ack_tracker);
742   self->ack_tracker = NULL;
743 
744   g_free(self->name);
745   g_free(self->stats_id);
746   g_free(self->stats_instance);
747   log_pipe_detach_expr_node(&self->super);
748   log_pipe_free_method(s);
749 
750   ack_tracker_factory_unref(self->ack_tracker_factory);
751   if (G_UNLIKELY(dynamic_window_is_enabled(&self->dynamic_window)))
752     _release_dynamic_window(self);
753 }
754 
755 void
log_source_options_defaults(LogSourceOptions * options)756 log_source_options_defaults(LogSourceOptions *options)
757 {
758   options->init_window_size = -1;
759   options->keep_hostname = -1;
760   options->chain_hostnames = -1;
761   options->keep_timestamp = -1;
762   options->program_override_len = -1;
763   options->host_override_len = -1;
764   options->tags = NULL;
765   options->read_old_records = TRUE;
766   host_resolve_options_defaults(&options->host_resolve_options);
767 }
768 
769 /* NOTE: _init needs to be idempotent when called multiple times w/o invoking _destroy */
770 void
log_source_options_init(LogSourceOptions * options,GlobalConfig * cfg,const gchar * group_name)771 log_source_options_init(LogSourceOptions *options, GlobalConfig *cfg, const gchar *group_name)
772 {
773   gchar *source_group_name;
774 
775   options->source_queue_callbacks = cfg->source_mangle_callback_list;
776 
777   if (options->init_window_size == -1)
778     options->init_window_size = 100;
779   if (options->keep_hostname == -1)
780     options->keep_hostname = cfg->keep_hostname;
781   if (options->chain_hostnames == -1)
782     options->chain_hostnames = cfg->chain_hostnames;
783   if (options->keep_timestamp == -1)
784     options->keep_timestamp = cfg->keep_timestamp;
785   options->group_name = group_name;
786 
787   source_group_name = g_strdup_printf(".source.%s", group_name);
788   options->source_group_tag = log_tags_get_by_name(source_group_name);
789   g_free(source_group_name);
790   host_resolve_options_init(&options->host_resolve_options, &cfg->host_resolve_options);
791 }
792 
793 void
log_source_options_destroy(LogSourceOptions * options)794 log_source_options_destroy(LogSourceOptions *options)
795 {
796   host_resolve_options_destroy(&options->host_resolve_options);
797   if (options->program_override)
798     g_free(options->program_override);
799   if (options->host_override)
800     g_free(options->host_override);
801   if (options->tags)
802     {
803       g_array_free(options->tags, TRUE);
804       options->tags = NULL;
805     }
806 }
807 
808 void
log_source_options_set_tags(LogSourceOptions * options,GList * tags)809 log_source_options_set_tags(LogSourceOptions *options, GList *tags)
810 {
811   LogTagId id;
812 
813   if (!options->tags)
814     options->tags = g_array_new(FALSE, FALSE, sizeof(LogTagId));
815 
816   while (tags)
817     {
818       id = log_tags_get_by_name((gchar *) tags->data);
819       g_array_append_val(options->tags, id);
820 
821       g_free(tags->data);
822       tags = g_list_delete_link(tags, tags);
823     }
824 }
825 
826 void
log_source_global_init(void)827 log_source_global_init(void)
828 {
829   accurate_nanosleep = check_nanosleep();
830   if (!accurate_nanosleep)
831     {
832       msg_debug("nanosleep() is not accurate enough to introduce minor stalls on the reader side, multi-threaded performance may be affected");
833     }
834 }
835