1 /*
2 * Copyright (c) 2013, 2014 Balabit
3 * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #ifndef LOGTHRDESTDRV_H
26 #define LOGTHRDESTDRV_H
27
28 #include "syslog-ng.h"
29 #include "driver.h"
30 #include "stats/stats-registry.h"
31 #include "stats/aggregator/stats-aggregator.h"
32 #include "logqueue.h"
33 #include "mainloop-worker.h"
34 #include "seqnum.h"
35
36 #include <iv.h>
37 #include <iv_event.h>
38
39 typedef enum
40 {
41 /* flush modes */
42
43 /* flush the infligh messages */
44 LTF_FLUSH_NORMAL,
45
46 /* expedite flush, to be used at reload, when the persistency of the queue
47 * contents is ensured */
48 LTF_FLUSH_EXPEDITE,
49 } LogThreadedFlushMode;
50
51 typedef enum
52 {
53 LTR_DROP,
54 LTR_ERROR,
55 LTR_EXPLICIT_ACK_MGMT,
56 LTR_SUCCESS,
57 LTR_QUEUED,
58 LTR_NOT_CONNECTED,
59 LTR_RETRY,
60 LTR_MAX
61 } LogThreadedResult;
62
63 typedef struct _LogThreadedDestDriver LogThreadedDestDriver;
64 typedef struct _LogThreadedDestWorker LogThreadedDestWorker;
65
66 struct _LogThreadedDestWorker
67 {
68 LogQueue *queue;
69 struct iv_task do_work;
70 struct iv_event wake_up_event;
71 struct iv_event shutdown_event;
72 struct iv_timer timer_reopen;
73 struct iv_timer timer_throttle;
74 struct iv_timer timer_flush;
75
76 LogThreadedDestDriver *owner;
77
78 gint worker_index;
79 gboolean connected;
80 gint batch_size;
81 gint rewound_batch_size;
82 gint retries_on_error_counter;
83 guint retries_counter;
84 gint32 seq_num;
85 struct timespec last_flush_time;
86 gboolean enable_batching;
87 gboolean suspended;
88 gboolean startup_finished;
89 gboolean startup_failure;
90 GCond started_up;
91 time_t time_reopen;
92
93 gboolean (*thread_init)(LogThreadedDestWorker *s);
94 void (*thread_deinit)(LogThreadedDestWorker *s);
95 gboolean (*connect)(LogThreadedDestWorker *s);
96 void (*disconnect)(LogThreadedDestWorker *s);
97 LogThreadedResult (*insert)(LogThreadedDestWorker *s, LogMessage *msg);
98 LogThreadedResult (*flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode);
99 void (*free_fn)(LogThreadedDestWorker *s);
100 };
101
102 const gchar *log_threaded_result_to_str(LogThreadedResult self);
103
104 struct _LogThreadedDestDriver
105 {
106 LogDestDriver super;
107 GMutex lock;
108
109 StatsCounterItem *dropped_messages;
110 StatsCounterItem *processed_messages;
111 StatsCounterItem *written_messages;
112 StatsAggregator *max_message_size;
113 StatsAggregator *average_messages_size;
114 StatsAggregator *max_batch_size;
115 StatsAggregator *average_batch_size;
116 StatsAggregator *CPS;
117
118 gint batch_lines;
119 gint batch_timeout;
120 gboolean under_termination;
121 time_t time_reopen;
122 gint retries_on_error_max;
123 guint retries_max;
124
125 struct
126 {
127 LogThreadedDestWorker *(*construct)(LogThreadedDestDriver *s, gint worker_index);
128
129 /* this is a compatibility layer that can be removed once all drivers have
130 * been migrated to the use of LogThreadedDestWorker based interface.
131 * Right now, if a driver is not overriding the Worker instance, we would
132 * be calling these methods from the functions named `_compat_*()`. */
133 LogThreadedDestWorker instance;
134 void (*thread_init)(LogThreadedDestDriver *s);
135 void (*thread_deinit)(LogThreadedDestDriver *s);
136 gboolean (*connect)(LogThreadedDestDriver *s);
137 void (*disconnect)(LogThreadedDestDriver *s);
138 LogThreadedResult (*insert)(LogThreadedDestDriver *s, LogMessage *msg);
139 LogThreadedResult (*flush)(LogThreadedDestDriver *s);
140 } worker;
141
142 LogThreadedDestWorker **workers;
143 gint num_workers;
144 gint created_workers;
145 guint last_worker;
146
147 gint stats_source;
148
149 /* this counter is not thread safe if there are multiple worker threads,
150 * in that case, one needs to use LogThreadedDestWorker->seq_num, which is
151 * static for a single insert() invocation, whereas this might be
152 * increased in parallel by the multiple threads. */
153
154 gint32 shared_seq_num;
155
156 WorkerOptions worker_options;
157 const gchar *(*format_stats_instance)(LogThreadedDestDriver *s);
158 };
159
160 static inline gboolean
log_threaded_dest_worker_thread_init(LogThreadedDestWorker * self)161 log_threaded_dest_worker_thread_init(LogThreadedDestWorker *self)
162 {
163 if (self->thread_init)
164 return self->thread_init(self);
165 return TRUE;
166 }
167
168 static inline void
log_threaded_dest_worker_thread_deinit(LogThreadedDestWorker * self)169 log_threaded_dest_worker_thread_deinit(LogThreadedDestWorker *self)
170 {
171 if (self->thread_deinit)
172 self->thread_deinit(self);
173 }
174
175 static inline gboolean
log_threaded_dest_worker_connect(LogThreadedDestWorker * self)176 log_threaded_dest_worker_connect(LogThreadedDestWorker *self)
177 {
178 if (self->connect)
179 self->connected = self->connect(self);
180 else
181 self->connected = TRUE;
182
183 return self->connected;
184 }
185
186 static inline void
log_threaded_dest_worker_disconnect(LogThreadedDestWorker * self)187 log_threaded_dest_worker_disconnect(LogThreadedDestWorker *self)
188 {
189 if (self->disconnect)
190 self->disconnect(self);
191 self->connected = FALSE;
192 }
193
194 static inline LogThreadedResult
log_threaded_dest_worker_insert(LogThreadedDestWorker * self,LogMessage * msg)195 log_threaded_dest_worker_insert(LogThreadedDestWorker *self, LogMessage *msg)
196 {
197 if (self->owner->num_workers > 1)
198 self->seq_num = step_sequence_number_atomic(&self->owner->shared_seq_num);
199 else
200 self->seq_num = step_sequence_number(&self->owner->shared_seq_num);
201 return self->insert(self, msg);
202 }
203
204 static inline LogThreadedResult
log_threaded_dest_worker_flush(LogThreadedDestWorker * self,LogThreadedFlushMode mode)205 log_threaded_dest_worker_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
206 {
207 LogThreadedResult result = LTR_SUCCESS;
208
209 if (self->flush)
210 result = self->flush(self, mode);
211 iv_validate_now();
212 self->last_flush_time = iv_now;
213 return result;
214 }
215
216 /* function for drivers that are not yet using the worker API */
217 static inline LogThreadedResult
log_threaded_dest_driver_flush(LogThreadedDestDriver * self)218 log_threaded_dest_driver_flush(LogThreadedDestDriver *self)
219 {
220 return log_threaded_dest_worker_flush(&self->worker.instance, LTF_FLUSH_NORMAL);
221 }
222
223 void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size);
224 void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size);
225 void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size);
226 void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self);
227 gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self);
228 void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self);
229 void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self,
230 LogThreadedDestDriver *owner,
231 gint worker_index);
232 void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self);
233 void log_threaded_dest_worker_free(LogThreadedDestWorker *self);
234
235 void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len);
236 void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len);
237 void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self);
238 void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self);
239
240 gboolean log_threaded_dest_driver_deinit_method(LogPipe *s);
241 gboolean log_threaded_dest_driver_init_method(LogPipe *s);
242 gboolean log_threaded_dest_driver_start_workers(LogPipe *s);
243
244 void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg);
245 void log_threaded_dest_driver_free(LogPipe *s);
246
247 void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries);
248 void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers);
249 void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines);
250 void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout);
251 void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen);
252
253 #endif
254