1 /*
2 * Copyright (c) 2002-2011 Balabit
3 * Copyright (c) 1998-2011 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logqueue.h"
26 #include "stats/stats-registry.h"
27 #include "messages.h"
28 #include "timeutils/misc.h"
29
30 gint log_queue_max_threads = 0;
31
32 void
log_queue_memory_usage_add(LogQueue * self,gsize value)33 log_queue_memory_usage_add(LogQueue *self, gsize value)
34 {
35 stats_counter_add(self->memory_usage, value);
36 atomic_gssize_add(&self->stats_cache.memory_usage, value);
37 }
38
39 void
log_queue_memory_usage_sub(LogQueue * self,gsize value)40 log_queue_memory_usage_sub(LogQueue *self, gsize value)
41 {
42 stats_counter_sub(self->memory_usage, value);
43 atomic_gssize_sub(&self->stats_cache.memory_usage, value);
44 }
45
46 void
log_queue_queued_messages_add(LogQueue * self,gsize value)47 log_queue_queued_messages_add(LogQueue *self, gsize value)
48 {
49 stats_counter_add(self->queued_messages, value);
50 atomic_gssize_add(&self->stats_cache.queued_messages, value);
51 }
52
53 void
log_queue_queued_messages_sub(LogQueue * self,gsize value)54 log_queue_queued_messages_sub(LogQueue *self, gsize value)
55 {
56 stats_counter_sub(self->queued_messages, value);
57 atomic_gssize_sub(&self->stats_cache.queued_messages, value);
58 }
59
60 void
log_queue_queued_messages_inc(LogQueue * self)61 log_queue_queued_messages_inc(LogQueue *self)
62 {
63 stats_counter_inc(self->queued_messages);
64 atomic_gssize_inc(&self->stats_cache.queued_messages);
65 }
66
67 void
log_queue_queued_messages_dec(LogQueue * self)68 log_queue_queued_messages_dec(LogQueue *self)
69 {
70 stats_counter_dec(self->queued_messages);
71 atomic_gssize_dec(&self->stats_cache.queued_messages);
72 }
73
74 /*
75 * When this is called, it is assumed that the output thread is currently
76 * not running (since this is the function that wakes it up), thus we can
77 * access the length of the output queue without acquiring a lock. Memory
78 * barriers are covered by the use of the self->lock mutex, since
79 * push_notify is registered under the protection of self->lock and after
80 * that the length of the output queue will not change (since parallel_push
81 * is only registered if it has less than enough items).
82 *
83 * NOTE: self->lock must have been acquired before calling this function.
84 */
85 void
log_queue_push_notify(LogQueue * self)86 log_queue_push_notify(LogQueue *self)
87 {
88 if (self->parallel_push_notify)
89 {
90 /* make sure the callback can call log_queue_check_items() again */
91 GDestroyNotify destroy = self->parallel_push_data_destroy;
92 gpointer user_data = self->parallel_push_data;
93 LogQueuePushNotifyFunc func = self->parallel_push_notify;
94
95 self->parallel_push_data = NULL;
96 self->parallel_push_data_destroy = NULL;
97 self->parallel_push_notify = NULL;
98
99 g_mutex_unlock(&self->lock);
100
101 func(user_data);
102 if (destroy && user_data)
103 destroy(user_data);
104
105 g_mutex_lock(&self->lock);
106 }
107 }
108
109 void
log_queue_reset_parallel_push(LogQueue * self)110 log_queue_reset_parallel_push(LogQueue *self)
111 {
112 g_mutex_lock(&self->lock);
113 self->parallel_push_notify = NULL;
114 self->parallel_push_data = NULL;
115 g_mutex_unlock(&self->lock);
116 }
117
118 void
log_queue_set_parallel_push(LogQueue * self,LogQueuePushNotifyFunc parallel_push_notify,gpointer user_data,GDestroyNotify user_data_destroy)119 log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
120 GDestroyNotify user_data_destroy)
121 {
122 g_mutex_lock(&self->lock);
123 self->parallel_push_notify = parallel_push_notify;
124 self->parallel_push_data = user_data;
125 self->parallel_push_data_destroy = user_data_destroy;
126 g_mutex_unlock(&self->lock);
127 }
128
129 /*
130 *
131 * @batch_items: the number of items processed in a batch (e.g. the number of items the consumer is preferred to process at a single invocation)
132 * @partial_batch: true is returned if some elements (but less than batch_items) are already buffered
133 * @timeout: the number of milliseconds that the consumer needs to wait before we can possibly proceed
134 */
135 gboolean
log_queue_check_items(LogQueue * self,gint * timeout,LogQueuePushNotifyFunc parallel_push_notify,gpointer user_data,GDestroyNotify user_data_destroy)136 log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
137 GDestroyNotify user_data_destroy)
138 {
139 gint64 num_elements;
140
141 g_mutex_lock(&self->lock);
142
143 /* drop reference to the previous callback/userdata */
144 if (self->parallel_push_data && self->parallel_push_data_destroy)
145 self->parallel_push_data_destroy(self->parallel_push_data);
146
147 num_elements = log_queue_get_length(self);
148 if (num_elements == 0)
149 {
150 self->parallel_push_notify = parallel_push_notify;
151 self->parallel_push_data = user_data;
152 self->parallel_push_data_destroy = user_data_destroy;
153 g_mutex_unlock(&self->lock);
154 return FALSE;
155 }
156
157 /* consume the user_data reference as we won't use the callback */
158 if (user_data && user_data_destroy)
159 user_data_destroy(user_data);
160
161 self->parallel_push_notify = NULL;
162 self->parallel_push_data = NULL;
163
164 g_mutex_unlock(&self->lock);
165
166 /* recalculate buckets, throttle is only running in the output thread, no need to lock it. */
167
168 if (self->throttle > 0)
169 {
170 gint64 diff;
171 gint new_buckets;
172 GTimeVal now;
173
174 g_get_current_time(&now);
175 /* throttling is enabled, calculate new buckets */
176 if (self->last_throttle_check.tv_sec != 0)
177 {
178 diff = g_time_val_diff(&now, &self->last_throttle_check);
179 }
180 else
181 {
182 diff = 0;
183 self->last_throttle_check = now;
184 }
185 new_buckets = (self->throttle * diff) / G_USEC_PER_SEC;
186 if (new_buckets)
187 {
188
189 /* if new_buckets is zero, we don't save the current time as
190 * last_throttle_check. The reason is that new_buckets could be
191 * rounded to zero when only a minimal interval passes between
192 * poll iterations.
193 */
194 self->throttle_buckets = MIN(self->throttle, self->throttle_buckets + new_buckets);
195 self->last_throttle_check = now;
196 }
197 if (num_elements && self->throttle_buckets == 0)
198 {
199 if (timeout)
200 {
201 /* we are unable to send because of throttling, make sure that we
202 * wake up when the rate limits lets us send at least 1 message */
203 *timeout = (1000 / self->throttle) + 1;
204 msg_debug("Throttling output",
205 evt_tag_int("wait", *timeout));
206 }
207 return FALSE;
208 }
209 }
210
211 return TRUE;
212 }
213
214 static void
_register_common_counters(LogQueue * self,gint stats_level,const StatsClusterKey * sc_key)215 _register_common_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key)
216 {
217 stats_register_counter(stats_level, sc_key, SC_TYPE_QUEUED, &self->queued_messages);
218 stats_register_counter(stats_level, sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
219 stats_register_counter_and_index(STATS_LEVEL1, sc_key, SC_TYPE_MEMORY_USAGE, &self->memory_usage);
220 atomic_gssize_set(&self->stats_cache.queued_messages, log_queue_get_length(self));
221 stats_counter_add(self->queued_messages, atomic_gssize_get_unsigned(&self->stats_cache.queued_messages));
222 stats_counter_add(self->memory_usage, atomic_gssize_get_unsigned(&self->stats_cache.memory_usage));
223 }
224
225 void
log_queue_register_stats_counters(LogQueue * self,gint stats_level,const StatsClusterKey * sc_key)226 log_queue_register_stats_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key)
227 {
228 _register_common_counters(self, stats_level, sc_key);
229
230 if (self->register_stats_counters)
231 self->register_stats_counters(self, stats_level, sc_key);
232 }
233
234 static void
_unregister_common_counters(LogQueue * self,const StatsClusterKey * sc_key)235 _unregister_common_counters(LogQueue *self, const StatsClusterKey *sc_key)
236 {
237 stats_counter_sub(self->queued_messages, atomic_gssize_get(&self->stats_cache.queued_messages));
238 stats_counter_sub(self->memory_usage, atomic_gssize_get(&self->stats_cache.memory_usage));
239 stats_unregister_counter(sc_key, SC_TYPE_QUEUED, &self->queued_messages);
240 stats_unregister_counter(sc_key, SC_TYPE_MEMORY_USAGE, &self->memory_usage);
241 stats_unregister_counter(sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
242 }
243
244 void
log_queue_unregister_stats_counters(LogQueue * self,const StatsClusterKey * sc_key)245 log_queue_unregister_stats_counters(LogQueue *self, const StatsClusterKey *sc_key)
246 {
247 _unregister_common_counters(self, sc_key);
248
249 if (self->unregister_stats_counters)
250 self->unregister_stats_counters(self, sc_key);
251 }
252
253 void
log_queue_init_instance(LogQueue * self,const gchar * persist_name)254 log_queue_init_instance(LogQueue *self, const gchar *persist_name)
255 {
256 g_atomic_counter_set(&self->ref_cnt, 1);
257 self->free_fn = log_queue_free_method;
258
259 self->persist_name = persist_name ? g_strdup(persist_name) : NULL;
260 g_mutex_init(&self->lock);
261 }
262
263 void
log_queue_free_method(LogQueue * self)264 log_queue_free_method(LogQueue *self)
265 {
266 g_mutex_clear(&self->lock);
267 g_free(self->persist_name);
268 g_free(self);
269 }
270
271 void
log_queue_set_max_threads(gint max_threads)272 log_queue_set_max_threads(gint max_threads)
273 {
274 log_queue_max_threads = max_threads;
275 }
276