1 /*
2  * Copyright (c) 2002-2011 Balabit
3  * Copyright (c) 1998-2011 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logqueue.h"
26 #include "stats/stats-registry.h"
27 #include "messages.h"
28 #include "timeutils/misc.h"
29 
30 gint log_queue_max_threads = 0;
31 
32 void
log_queue_memory_usage_add(LogQueue * self,gsize value)33 log_queue_memory_usage_add(LogQueue *self, gsize value)
34 {
35   stats_counter_add(self->memory_usage, value);
36   atomic_gssize_add(&self->stats_cache.memory_usage, value);
37 }
38 
39 void
log_queue_memory_usage_sub(LogQueue * self,gsize value)40 log_queue_memory_usage_sub(LogQueue *self, gsize value)
41 {
42   stats_counter_sub(self->memory_usage, value);
43   atomic_gssize_sub(&self->stats_cache.memory_usage, value);
44 }
45 
46 void
log_queue_queued_messages_add(LogQueue * self,gsize value)47 log_queue_queued_messages_add(LogQueue *self, gsize value)
48 {
49   stats_counter_add(self->queued_messages, value);
50   atomic_gssize_add(&self->stats_cache.queued_messages, value);
51 }
52 
53 void
log_queue_queued_messages_sub(LogQueue * self,gsize value)54 log_queue_queued_messages_sub(LogQueue *self, gsize value)
55 {
56   stats_counter_sub(self->queued_messages, value);
57   atomic_gssize_sub(&self->stats_cache.queued_messages, value);
58 }
59 
60 void
log_queue_queued_messages_inc(LogQueue * self)61 log_queue_queued_messages_inc(LogQueue *self)
62 {
63   stats_counter_inc(self->queued_messages);
64   atomic_gssize_inc(&self->stats_cache.queued_messages);
65 }
66 
67 void
log_queue_queued_messages_dec(LogQueue * self)68 log_queue_queued_messages_dec(LogQueue *self)
69 {
70   stats_counter_dec(self->queued_messages);
71   atomic_gssize_dec(&self->stats_cache.queued_messages);
72 }
73 
74 /*
75  * When this is called, it is assumed that the output thread is currently
76  * not running (since this is the function that wakes it up), thus we can
77  * access the length of the output queue without acquiring a lock.  Memory
78  * barriers are covered by the use of the self->lock mutex, since
79  * push_notify is registered under the protection of self->lock and after
80  * that the length of the output queue will not change (since parallel_push
81  * is only registered if it has less than enough items).
82  *
83  * NOTE: self->lock must have been acquired before calling this function.
84  */
85 void
log_queue_push_notify(LogQueue * self)86 log_queue_push_notify(LogQueue *self)
87 {
88   if (self->parallel_push_notify)
89     {
90       /* make sure the callback can call log_queue_check_items() again */
91       GDestroyNotify destroy = self->parallel_push_data_destroy;
92       gpointer user_data = self->parallel_push_data;
93       LogQueuePushNotifyFunc func = self->parallel_push_notify;
94 
95       self->parallel_push_data = NULL;
96       self->parallel_push_data_destroy = NULL;
97       self->parallel_push_notify = NULL;
98 
99       g_mutex_unlock(&self->lock);
100 
101       func(user_data);
102       if (destroy && user_data)
103         destroy(user_data);
104 
105       g_mutex_lock(&self->lock);
106     }
107 }
108 
109 void
log_queue_reset_parallel_push(LogQueue * self)110 log_queue_reset_parallel_push(LogQueue *self)
111 {
112   g_mutex_lock(&self->lock);
113   self->parallel_push_notify = NULL;
114   self->parallel_push_data = NULL;
115   g_mutex_unlock(&self->lock);
116 }
117 
118 void
log_queue_set_parallel_push(LogQueue * self,LogQueuePushNotifyFunc parallel_push_notify,gpointer user_data,GDestroyNotify user_data_destroy)119 log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
120                             GDestroyNotify user_data_destroy)
121 {
122   g_mutex_lock(&self->lock);
123   self->parallel_push_notify = parallel_push_notify;
124   self->parallel_push_data = user_data;
125   self->parallel_push_data_destroy = user_data_destroy;
126   g_mutex_unlock(&self->lock);
127 }
128 
129 /*
130  *
131  * @batch_items: the number of items processed in a batch (e.g. the number of items the consumer is preferred to process at a single invocation)
132  * @partial_batch: true is returned if some elements (but less than batch_items) are already buffered
133  * @timeout: the number of milliseconds that the consumer needs to wait before we can possibly proceed
134  */
135 gboolean
log_queue_check_items(LogQueue * self,gint * timeout,LogQueuePushNotifyFunc parallel_push_notify,gpointer user_data,GDestroyNotify user_data_destroy)136 log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
137                       GDestroyNotify user_data_destroy)
138 {
139   gint64 num_elements;
140 
141   g_mutex_lock(&self->lock);
142 
143   /* drop reference to the previous callback/userdata */
144   if (self->parallel_push_data && self->parallel_push_data_destroy)
145     self->parallel_push_data_destroy(self->parallel_push_data);
146 
147   num_elements = log_queue_get_length(self);
148   if (num_elements == 0)
149     {
150       self->parallel_push_notify = parallel_push_notify;
151       self->parallel_push_data = user_data;
152       self->parallel_push_data_destroy = user_data_destroy;
153       g_mutex_unlock(&self->lock);
154       return FALSE;
155     }
156 
157   /* consume the user_data reference as we won't use the callback */
158   if (user_data && user_data_destroy)
159     user_data_destroy(user_data);
160 
161   self->parallel_push_notify = NULL;
162   self->parallel_push_data = NULL;
163 
164   g_mutex_unlock(&self->lock);
165 
166   /* recalculate buckets, throttle is only running in the output thread, no need to lock it. */
167 
168   if (self->throttle > 0)
169     {
170       gint64 diff;
171       gint new_buckets;
172       GTimeVal now;
173 
174       g_get_current_time(&now);
175       /* throttling is enabled, calculate new buckets */
176       if (self->last_throttle_check.tv_sec != 0)
177         {
178           diff = g_time_val_diff(&now, &self->last_throttle_check);
179         }
180       else
181         {
182           diff = 0;
183           self->last_throttle_check = now;
184         }
185       new_buckets = (self->throttle * diff) / G_USEC_PER_SEC;
186       if (new_buckets)
187         {
188 
189           /* if new_buckets is zero, we don't save the current time as
190            * last_throttle_check. The reason is that new_buckets could be
191            * rounded to zero when only a minimal interval passes between
192            * poll iterations.
193            */
194           self->throttle_buckets = MIN(self->throttle, self->throttle_buckets + new_buckets);
195           self->last_throttle_check = now;
196         }
197       if (num_elements && self->throttle_buckets == 0)
198         {
199           if (timeout)
200             {
201               /* we are unable to send because of throttling, make sure that we
202                * wake up when the rate limits lets us send at least 1 message */
203               *timeout = (1000 / self->throttle) + 1;
204               msg_debug("Throttling output",
205                         evt_tag_int("wait", *timeout));
206             }
207           return FALSE;
208         }
209     }
210 
211   return TRUE;
212 }
213 
214 static void
_register_common_counters(LogQueue * self,gint stats_level,const StatsClusterKey * sc_key)215 _register_common_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key)
216 {
217   stats_register_counter(stats_level, sc_key, SC_TYPE_QUEUED, &self->queued_messages);
218   stats_register_counter(stats_level, sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
219   stats_register_counter_and_index(STATS_LEVEL1, sc_key, SC_TYPE_MEMORY_USAGE, &self->memory_usage);
220   atomic_gssize_set(&self->stats_cache.queued_messages, log_queue_get_length(self));
221   stats_counter_add(self->queued_messages, atomic_gssize_get_unsigned(&self->stats_cache.queued_messages));
222   stats_counter_add(self->memory_usage, atomic_gssize_get_unsigned(&self->stats_cache.memory_usage));
223 }
224 
225 void
log_queue_register_stats_counters(LogQueue * self,gint stats_level,const StatsClusterKey * sc_key)226 log_queue_register_stats_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key)
227 {
228   _register_common_counters(self, stats_level, sc_key);
229 
230   if (self->register_stats_counters)
231     self->register_stats_counters(self, stats_level, sc_key);
232 }
233 
234 static void
_unregister_common_counters(LogQueue * self,const StatsClusterKey * sc_key)235 _unregister_common_counters(LogQueue *self, const StatsClusterKey *sc_key)
236 {
237   stats_counter_sub(self->queued_messages, atomic_gssize_get(&self->stats_cache.queued_messages));
238   stats_counter_sub(self->memory_usage, atomic_gssize_get(&self->stats_cache.memory_usage));
239   stats_unregister_counter(sc_key, SC_TYPE_QUEUED, &self->queued_messages);
240   stats_unregister_counter(sc_key, SC_TYPE_MEMORY_USAGE, &self->memory_usage);
241   stats_unregister_counter(sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
242 }
243 
244 void
log_queue_unregister_stats_counters(LogQueue * self,const StatsClusterKey * sc_key)245 log_queue_unregister_stats_counters(LogQueue *self, const StatsClusterKey *sc_key)
246 {
247   _unregister_common_counters(self, sc_key);
248 
249   if (self->unregister_stats_counters)
250     self->unregister_stats_counters(self, sc_key);
251 }
252 
253 void
log_queue_init_instance(LogQueue * self,const gchar * persist_name)254 log_queue_init_instance(LogQueue *self, const gchar *persist_name)
255 {
256   g_atomic_counter_set(&self->ref_cnt, 1);
257   self->free_fn = log_queue_free_method;
258 
259   self->persist_name = persist_name ? g_strdup(persist_name) : NULL;
260   g_mutex_init(&self->lock);
261 }
262 
263 void
log_queue_free_method(LogQueue * self)264 log_queue_free_method(LogQueue *self)
265 {
266   g_mutex_clear(&self->lock);
267   g_free(self->persist_name);
268   g_free(self);
269 }
270 
271 void
log_queue_set_max_threads(gint max_threads)272 log_queue_set_max_threads(gint max_threads)
273 {
274   log_queue_max_threads = max_threads;
275 }
276