1 /*
2  * Copyright (c) 2002-2011 Balabit
3  * Copyright (c) 1998-2011 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGQUEUE_H_INCLUDED
26 #define LOGQUEUE_H_INCLUDED
27 
28 #include "logmsg/logmsg.h"
29 #include "stats/stats-registry.h"
30 
31 extern gint log_queue_max_threads;
32 
33 typedef void (*LogQueuePushNotifyFunc)(gpointer user_data);
34 
35 typedef struct _LogQueue LogQueue;
36 
37 typedef const char *QueueType;
38 
39 struct _LogQueue
40 {
41   QueueType type;
42   GAtomicCounter ref_cnt;
43   gboolean use_backlog;
44 
45   gint throttle;
46   gint throttle_buckets;
47   GTimeVal last_throttle_check;
48 
49   gchar *persist_name;
50   StatsCounterItem *queued_messages;
51   StatsCounterItem *dropped_messages;
52   StatsCounterItem *memory_usage;
53 
54   struct
55   {
56     atomic_gssize memory_usage;
57     atomic_gssize queued_messages;
58   } stats_cache;
59 
60   GMutex lock;
61   LogQueuePushNotifyFunc parallel_push_notify;
62   gpointer parallel_push_data;
63   GDestroyNotify parallel_push_data_destroy;
64 
65   /* queue management */
66   gboolean (*keep_on_reload)(LogQueue *self);
67   gint64 (*get_length)(LogQueue *self);
68   gboolean (*is_empty_racy)(LogQueue *self);
69   void (*push_tail)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options);
70   void (*push_head)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options);
71   LogMessage *(*pop_head)(LogQueue *self, LogPathOptions *path_options);
72   void (*ack_backlog)(LogQueue *self, gint n);
73   void (*rewind_backlog)(LogQueue *self, guint rewind_count);
74   void (*rewind_backlog_all)(LogQueue *self);
75   void (*register_stats_counters)(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key);
76   void (*unregister_stats_counters)(LogQueue *self, const StatsClusterKey *sc_key);
77 
78   void (*free_fn)(LogQueue *self);
79 };
80 
81 static inline gboolean
log_queue_keep_on_reload(LogQueue * self)82 log_queue_keep_on_reload(LogQueue *self)
83 {
84   if (self->keep_on_reload)
85     return self->keep_on_reload(self);
86   return TRUE;
87 }
88 
89 static inline gint64
log_queue_get_length(LogQueue * self)90 log_queue_get_length(LogQueue *self)
91 {
92   return self->get_length(self);
93 }
94 
95 static inline gboolean
log_queue_is_empty_racy(LogQueue * self)96 log_queue_is_empty_racy(LogQueue *self)
97 {
98   if (self->is_empty_racy)
99     return self->is_empty_racy(self);
100   else
101     return (self->get_length(self) == 0);
102 }
103 
104 static inline void
log_queue_push_tail(LogQueue * self,LogMessage * msg,const LogPathOptions * path_options)105 log_queue_push_tail(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
106 {
107   self->push_tail(self, msg, path_options);
108 }
109 
110 static inline void
log_queue_push_head(LogQueue * self,LogMessage * msg,const LogPathOptions * path_options)111 log_queue_push_head(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
112 {
113   self->push_head(self, msg, path_options);
114 }
115 
116 static inline LogMessage *
log_queue_pop_head(LogQueue * self,LogPathOptions * path_options)117 log_queue_pop_head(LogQueue *self, LogPathOptions *path_options)
118 {
119   LogMessage *msg = NULL;
120 
121   if (self->throttle && self->throttle_buckets == 0)
122     return NULL;
123 
124   msg = self->pop_head(self, path_options);
125 
126   if (msg && self->throttle_buckets > 0)
127     self->throttle_buckets--;
128 
129   return msg;
130 }
131 
132 static inline LogMessage *
log_queue_pop_head_ignore_throttle(LogQueue * self,LogPathOptions * path_options)133 log_queue_pop_head_ignore_throttle(LogQueue *self, LogPathOptions *path_options)
134 {
135   return self->pop_head(self, path_options);
136 }
137 
138 static inline void
log_queue_rewind_backlog(LogQueue * self,guint rewind_count)139 log_queue_rewind_backlog(LogQueue *self, guint rewind_count)
140 {
141   if (!self->use_backlog)
142     return;
143 
144   self->rewind_backlog(self, rewind_count);
145 }
146 
147 static inline void
log_queue_rewind_backlog_all(LogQueue * self)148 log_queue_rewind_backlog_all(LogQueue *self)
149 {
150   if (!self->use_backlog)
151     return;
152 
153   self->rewind_backlog_all(self);
154 }
155 
156 static inline void
log_queue_ack_backlog(LogQueue * self,guint rewind_count)157 log_queue_ack_backlog(LogQueue *self, guint rewind_count)
158 {
159   if (!self->use_backlog)
160     return;
161 
162   self->ack_backlog(self, rewind_count);
163 }
164 
165 static inline LogQueue *
log_queue_ref(LogQueue * self)166 log_queue_ref(LogQueue *self)
167 {
168   g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
169 
170   if (self)
171     {
172       g_atomic_counter_inc(&self->ref_cnt);
173     }
174   return self;
175 }
176 
177 static inline void
log_queue_unref(LogQueue * self)178 log_queue_unref(LogQueue *self)
179 {
180   g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
181 
182   if (self && g_atomic_counter_dec_and_test(&self->ref_cnt))
183     {
184       if (self->free_fn)
185         self->free_fn(self);
186     }
187 }
188 
189 static inline void
log_queue_set_throttle(LogQueue * self,gint throttle)190 log_queue_set_throttle(LogQueue *self, gint throttle)
191 {
192   self->throttle = throttle;
193   self->throttle_buckets = throttle;
194 }
195 
196 static inline void
log_queue_set_use_backlog(LogQueue * self,gboolean use_backlog)197 log_queue_set_use_backlog(LogQueue *self, gboolean use_backlog)
198 {
199   if (self)
200     self->use_backlog = use_backlog;
201 }
202 
203 static inline gboolean
log_queue_has_type(LogQueue * self,QueueType type)204 log_queue_has_type(LogQueue *self, QueueType type)
205 {
206   return g_strcmp0(self->type, type) == 0;
207 }
208 
209 void log_queue_memory_usage_add(LogQueue *self, gsize value);
210 void log_queue_memory_usage_sub(LogQueue *self, gsize value);
211 
212 void log_queue_queued_messages_add(LogQueue *self, gsize value);
213 void log_queue_queued_messages_sub(LogQueue *self, gsize value);
214 void log_queue_queued_messages_inc(LogQueue *self);
215 void log_queue_queued_messages_dec(LogQueue *self);
216 
217 void log_queue_push_notify(LogQueue *self);
218 void log_queue_reset_parallel_push(LogQueue *self);
219 void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
220                                  GDestroyNotify user_data_destroy);
221 gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify,
222                                gpointer user_data, GDestroyNotify user_data_destroy);
223 void log_queue_init_instance(LogQueue *self, const gchar *persist_name);
224 void log_queue_register_stats_counters(LogQueue *self, gint stats_level, const StatsClusterKey *sc_key);
225 void log_queue_unregister_stats_counters(LogQueue *self, const StatsClusterKey *sc_key);
226 
227 void log_queue_free_method(LogQueue *self);
228 
229 void log_queue_set_max_threads(gint max_threads);
230 
231 #endif
232