1 /*
2 * Copyright (c) 2002-2011 Balabit
3 * Copyright (c) 1998-2011 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #ifndef LOGQUEUE_H_INCLUDED
26 #define LOGQUEUE_H_INCLUDED
27
28 #include "logmsg/logmsg.h"
29 #include "stats/stats-registry.h"
30
31 extern gint log_queue_max_threads;
32
33 typedef void (*LogQueuePushNotifyFunc)(gpointer user_data);
34
35 typedef struct _LogQueue LogQueue;
36
37 typedef const char *QueueType;
38
39 struct _LogQueue
40 {
41 QueueType type;
42 GAtomicCounter ref_cnt;
43 gboolean use_backlog;
44
45 gint throttle;
46 gint throttle_buckets;
47 GTimeVal last_throttle_check;
48
49 gchar *persist_name;
50 StatsCounterItem *queued_messages;
51 StatsCounterItem *dropped_messages;
52 StatsCounterItem *memory_usage;
53
54 struct
55 {
56 atomic_gssize memory_usage;
57 atomic_gssize queued_messages;
58 } stats_cache;
59
60 GMutex lock;
61 LogQueuePushNotifyFunc parallel_push_notify;
62 gpointer parallel_push_data;
63 GDestroyNotify parallel_push_data_destroy;
64
65 /* queue management */
66 gboolean (*keep_on_reload)(LogQueue *self);
67 gint64 (*get_length)(LogQueue *self);
68 gboolean (*is_empty_racy)(LogQueue *self);
69 void (*push_tail)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options);
70 void (*push_head)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options);
71 LogMessage *(*pop_head)(LogQueue *self, LogPathOptions *path_options);
72 void (*ack_backlog)(LogQueue *self, gint n);
73 void (*rewind_backlog)(LogQueue *self, guint rewind_count);
74 void (*rewind_backlog_all)(LogQueue *self);
75 void (*register_stats_counters)(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key);
76 void (*unregister_stats_counters)(LogQueue *self, const StatsClusterKey *sc_key);
77
78 void (*free_fn)(LogQueue *self);
79 };
80
81 static inline gboolean
log_queue_keep_on_reload(LogQueue * self)82 log_queue_keep_on_reload(LogQueue *self)
83 {
84 if (self->keep_on_reload)
85 return self->keep_on_reload(self);
86 return TRUE;
87 }
88
89 static inline gint64
log_queue_get_length(LogQueue * self)90 log_queue_get_length(LogQueue *self)
91 {
92 return self->get_length(self);
93 }
94
95 static inline gboolean
log_queue_is_empty_racy(LogQueue * self)96 log_queue_is_empty_racy(LogQueue *self)
97 {
98 if (self->is_empty_racy)
99 return self->is_empty_racy(self);
100 else
101 return (self->get_length(self) == 0);
102 }
103
104 static inline void
log_queue_push_tail(LogQueue * self,LogMessage * msg,const LogPathOptions * path_options)105 log_queue_push_tail(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
106 {
107 self->push_tail(self, msg, path_options);
108 }
109
110 static inline void
log_queue_push_head(LogQueue * self,LogMessage * msg,const LogPathOptions * path_options)111 log_queue_push_head(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
112 {
113 self->push_head(self, msg, path_options);
114 }
115
116 static inline LogMessage *
log_queue_pop_head(LogQueue * self,LogPathOptions * path_options)117 log_queue_pop_head(LogQueue *self, LogPathOptions *path_options)
118 {
119 LogMessage *msg = NULL;
120
121 if (self->throttle && self->throttle_buckets == 0)
122 return NULL;
123
124 msg = self->pop_head(self, path_options);
125
126 if (msg && self->throttle_buckets > 0)
127 self->throttle_buckets--;
128
129 return msg;
130 }
131
132 static inline LogMessage *
log_queue_pop_head_ignore_throttle(LogQueue * self,LogPathOptions * path_options)133 log_queue_pop_head_ignore_throttle(LogQueue *self, LogPathOptions *path_options)
134 {
135 return self->pop_head(self, path_options);
136 }
137
138 static inline void
log_queue_rewind_backlog(LogQueue * self,guint rewind_count)139 log_queue_rewind_backlog(LogQueue *self, guint rewind_count)
140 {
141 if (!self->use_backlog)
142 return;
143
144 self->rewind_backlog(self, rewind_count);
145 }
146
147 static inline void
log_queue_rewind_backlog_all(LogQueue * self)148 log_queue_rewind_backlog_all(LogQueue *self)
149 {
150 if (!self->use_backlog)
151 return;
152
153 self->rewind_backlog_all(self);
154 }
155
156 static inline void
log_queue_ack_backlog(LogQueue * self,guint rewind_count)157 log_queue_ack_backlog(LogQueue *self, guint rewind_count)
158 {
159 if (!self->use_backlog)
160 return;
161
162 self->ack_backlog(self, rewind_count);
163 }
164
165 static inline LogQueue *
log_queue_ref(LogQueue * self)166 log_queue_ref(LogQueue *self)
167 {
168 g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
169
170 if (self)
171 {
172 g_atomic_counter_inc(&self->ref_cnt);
173 }
174 return self;
175 }
176
177 static inline void
log_queue_unref(LogQueue * self)178 log_queue_unref(LogQueue *self)
179 {
180 g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
181
182 if (self && g_atomic_counter_dec_and_test(&self->ref_cnt))
183 {
184 if (self->free_fn)
185 self->free_fn(self);
186 }
187 }
188
189 static inline void
log_queue_set_throttle(LogQueue * self,gint throttle)190 log_queue_set_throttle(LogQueue *self, gint throttle)
191 {
192 self->throttle = throttle;
193 self->throttle_buckets = throttle;
194 }
195
196 static inline void
log_queue_set_use_backlog(LogQueue * self,gboolean use_backlog)197 log_queue_set_use_backlog(LogQueue *self, gboolean use_backlog)
198 {
199 if (self)
200 self->use_backlog = use_backlog;
201 }
202
203 static inline gboolean
log_queue_has_type(LogQueue * self,QueueType type)204 log_queue_has_type(LogQueue *self, QueueType type)
205 {
206 return g_strcmp0(self->type, type) == 0;
207 }
208
209 void log_queue_memory_usage_add(LogQueue *self, gsize value);
210 void log_queue_memory_usage_sub(LogQueue *self, gsize value);
211
212 void log_queue_queued_messages_add(LogQueue *self, gsize value);
213 void log_queue_queued_messages_sub(LogQueue *self, gsize value);
214 void log_queue_queued_messages_inc(LogQueue *self);
215 void log_queue_queued_messages_dec(LogQueue *self);
216
217 void log_queue_push_notify(LogQueue *self);
218 void log_queue_reset_parallel_push(LogQueue *self);
219 void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
220 GDestroyNotify user_data_destroy);
221 gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify,
222 gpointer user_data, GDestroyNotify user_data_destroy);
223 void log_queue_init_instance(LogQueue *self, const gchar *persist_name);
224 void log_queue_register_stats_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key);
225 void log_queue_unregister_stats_counters(LogQueue *self, const StatsClusterKey *sc_key);
226
227 void log_queue_free_method(LogQueue *self);
228
229 void log_queue_set_max_threads(gint max_threads);
230
231 #endif
232