1 /*
2  * Copyright (c) 2013, 2014 Balabit
3  * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGTHRDESTDRV_H
26 #define LOGTHRDESTDRV_H
27 
28 #include "syslog-ng.h"
29 #include "driver.h"
30 #include "stats/stats-registry.h"
31 #include "stats/aggregator/stats-aggregator.h"
32 #include "logqueue.h"
33 #include "mainloop-worker.h"
34 #include "seqnum.h"
35 
36 #include <iv.h>
37 #include <iv_event.h>
38 
39 typedef enum
40 {
41   /* flush modes */
42 
43   /* flush the infligh messages */
44   LTF_FLUSH_NORMAL,
45 
46   /* expedite flush, to be used at reload, when the persistency of the queue
47    * contents is ensured */
48   LTF_FLUSH_EXPEDITE,
49 } LogThreadedFlushMode;
50 
51 typedef enum
52 {
53   LTR_DROP,
54   LTR_ERROR,
55   LTR_EXPLICIT_ACK_MGMT,
56   LTR_SUCCESS,
57   LTR_QUEUED,
58   LTR_NOT_CONNECTED,
59   LTR_RETRY,
60   LTR_MAX
61 } LogThreadedResult;
62 
63 typedef struct _LogThreadedDestDriver LogThreadedDestDriver;
64 typedef struct _LogThreadedDestWorker LogThreadedDestWorker;
65 
66 struct _LogThreadedDestWorker
67 {
68   LogQueue *queue;
69   struct iv_task  do_work;
70   struct iv_event wake_up_event;
71   struct iv_event shutdown_event;
72   struct iv_timer timer_reopen;
73   struct iv_timer timer_throttle;
74   struct iv_timer timer_flush;
75 
76   LogThreadedDestDriver *owner;
77 
78   gint worker_index;
79   gboolean connected;
80   gint batch_size;
81   gint rewound_batch_size;
82   gint retries_on_error_counter;
83   guint retries_counter;
84   gint32 seq_num;
85   struct timespec last_flush_time;
86   gboolean enable_batching;
87   gboolean suspended;
88   gboolean startup_finished;
89   gboolean startup_failure;
90   GCond started_up;
91   time_t time_reopen;
92 
93   gboolean (*thread_init)(LogThreadedDestWorker *s);
94   void (*thread_deinit)(LogThreadedDestWorker *s);
95   gboolean (*connect)(LogThreadedDestWorker *s);
96   void (*disconnect)(LogThreadedDestWorker *s);
97   LogThreadedResult (*insert)(LogThreadedDestWorker *s, LogMessage *msg);
98   LogThreadedResult (*flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode);
99   void (*free_fn)(LogThreadedDestWorker *s);
100 };
101 
102 const gchar *log_threaded_result_to_str(LogThreadedResult self);
103 
104 struct _LogThreadedDestDriver
105 {
106   LogDestDriver super;
107   GMutex lock;
108 
109   StatsCounterItem *dropped_messages;
110   StatsCounterItem *processed_messages;
111   StatsCounterItem *written_messages;
112   StatsAggregator *max_message_size;
113   StatsAggregator *average_messages_size;
114   StatsAggregator *max_batch_size;
115   StatsAggregator *average_batch_size;
116   StatsAggregator *CPS;
117 
118   gint batch_lines;
119   gint batch_timeout;
120   gboolean under_termination;
121   time_t time_reopen;
122   gint retries_on_error_max;
123   guint retries_max;
124 
125   struct
126   {
127     LogThreadedDestWorker *(*construct)(LogThreadedDestDriver *s, gint worker_index);
128 
129     /* this is a compatibility layer that can be removed once all drivers have
130      * been migrated to the use of LogThreadedDestWorker based interface.
131      * Right now, if a driver is not overriding the Worker instance, we would
132      * be calling these methods from the functions named `_compat_*()`. */
133     LogThreadedDestWorker instance;
134     void (*thread_init)(LogThreadedDestDriver *s);
135     void (*thread_deinit)(LogThreadedDestDriver *s);
136     gboolean (*connect)(LogThreadedDestDriver *s);
137     void (*disconnect)(LogThreadedDestDriver *s);
138     LogThreadedResult (*insert)(LogThreadedDestDriver *s, LogMessage *msg);
139     LogThreadedResult (*flush)(LogThreadedDestDriver *s);
140   } worker;
141 
142   LogThreadedDestWorker **workers;
143   gint num_workers;
144   gint created_workers;
145   guint last_worker;
146 
147   gint stats_source;
148 
149   /* this counter is not thread safe if there are multiple worker threads,
150    * in that case, one needs to use LogThreadedDestWorker->seq_num, which is
151    * static for a single insert() invocation, whereas this might be
152    * increased in parallel by the multiple threads. */
153 
154   gint32 shared_seq_num;
155 
156   WorkerOptions worker_options;
157   const gchar *(*format_stats_instance)(LogThreadedDestDriver *s);
158 };
159 
160 static inline gboolean
log_threaded_dest_worker_thread_init(LogThreadedDestWorker * self)161 log_threaded_dest_worker_thread_init(LogThreadedDestWorker *self)
162 {
163   if (self->thread_init)
164     return self->thread_init(self);
165   return TRUE;
166 }
167 
168 static inline void
log_threaded_dest_worker_thread_deinit(LogThreadedDestWorker * self)169 log_threaded_dest_worker_thread_deinit(LogThreadedDestWorker *self)
170 {
171   if (self->thread_deinit)
172     self->thread_deinit(self);
173 }
174 
175 static inline gboolean
log_threaded_dest_worker_connect(LogThreadedDestWorker * self)176 log_threaded_dest_worker_connect(LogThreadedDestWorker *self)
177 {
178   if (self->connect)
179     self->connected = self->connect(self);
180   else
181     self->connected = TRUE;
182 
183   return self->connected;
184 }
185 
186 static inline void
log_threaded_dest_worker_disconnect(LogThreadedDestWorker * self)187 log_threaded_dest_worker_disconnect(LogThreadedDestWorker *self)
188 {
189   if (self->disconnect)
190     self->disconnect(self);
191   self->connected = FALSE;
192 }
193 
194 static inline LogThreadedResult
log_threaded_dest_worker_insert(LogThreadedDestWorker * self,LogMessage * msg)195 log_threaded_dest_worker_insert(LogThreadedDestWorker *self, LogMessage *msg)
196 {
197   if (self->owner->num_workers > 1)
198     self->seq_num = step_sequence_number_atomic(&self->owner->shared_seq_num);
199   else
200     self->seq_num = step_sequence_number(&self->owner->shared_seq_num);
201   return self->insert(self, msg);
202 }
203 
204 static inline LogThreadedResult
log_threaded_dest_worker_flush(LogThreadedDestWorker * self,LogThreadedFlushMode mode)205 log_threaded_dest_worker_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
206 {
207   LogThreadedResult result = LTR_SUCCESS;
208 
209   if (self->flush)
210     result = self->flush(self, mode);
211   iv_validate_now();
212   self->last_flush_time = iv_now;
213   return result;
214 }
215 
216 /* function for drivers that are not yet using the worker API */
217 static inline LogThreadedResult
log_threaded_dest_driver_flush(LogThreadedDestDriver * self)218 log_threaded_dest_driver_flush(LogThreadedDestDriver *self)
219 {
220   return log_threaded_dest_worker_flush(&self->worker.instance, LTF_FLUSH_NORMAL);
221 }
222 
223 void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size);
224 void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size);
225 void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size);
226 void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self);
227 gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self);
228 void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self);
229 void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self,
230                                             LogThreadedDestDriver *owner,
231                                             gint worker_index);
232 void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self);
233 void log_threaded_dest_worker_free(LogThreadedDestWorker *self);
234 
235 void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len);
236 void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len);
237 void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self);
238 void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self);
239 
240 gboolean log_threaded_dest_driver_deinit_method(LogPipe *s);
241 gboolean log_threaded_dest_driver_init_method(LogPipe *s);
242 gboolean log_threaded_dest_driver_start_workers(LogPipe *s);
243 
244 void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg);
245 void log_threaded_dest_driver_free(LogPipe *s);
246 
247 void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries);
248 void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers);
249 void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines);
250 void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout);
251 void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen);
252 
253 #endif
254