1 /*
2 * Copyright (c) 2018 Balabit
3 * Copyright (c) 2018 László Várady <laszlo.varady@balabit.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logthrfetcherdrv.h"
26 #include "messages.h"
27 #include "timeutils/misc.h"
28
29 #define SEC_TO_MSEC(x) ((x) * 1000)
30
31 void
log_threaded_fetcher_driver_set_fetch_no_data_delay(LogDriver * s,gdouble no_data_delay)32 log_threaded_fetcher_driver_set_fetch_no_data_delay(LogDriver *s, gdouble no_data_delay)
33 {
34 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
35 self->no_data_delay = (gint64) SEC_TO_MSEC(no_data_delay);
36 }
37
38 void
log_threaded_fetcher_driver_set_time_reopen(LogDriver * s,time_t time_reopen)39 log_threaded_fetcher_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
40 {
41 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
42 self->time_reopen = time_reopen;
43 }
44
45 static EVTTAG *
_tag_driver(LogThreadedFetcherDriver * f)46 _tag_driver(LogThreadedFetcherDriver *f)
47 {
48 return evt_tag_str("driver", f->super.super.super.id);
49 }
50
51 static inline void
_thread_init(LogThreadedFetcherDriver * self)52 _thread_init(LogThreadedFetcherDriver *self)
53 {
54 msg_trace("Fetcher thread_init()", _tag_driver(self));
55 if (self->thread_init)
56 self->thread_init(self);
57 }
58
59 static inline void
_thread_deinit(LogThreadedFetcherDriver * self)60 _thread_deinit(LogThreadedFetcherDriver *self)
61 {
62 msg_trace("Fetcher thread_deinit()", _tag_driver(self));
63 if (self->thread_deinit)
64 self->thread_deinit(self);
65 }
66
67 static inline gboolean
_connect(LogThreadedFetcherDriver * self)68 _connect(LogThreadedFetcherDriver *self)
69 {
70 msg_trace("Fetcher connect()", _tag_driver(self));
71 if (!self->connect)
72 return TRUE;
73
74 if (!self->connect(self))
75 {
76 msg_debug("Error establishing connection", _tag_driver(self));
77 return FALSE;
78 }
79
80 return TRUE;
81 }
82
83 static inline void
_disconnect(LogThreadedFetcherDriver * self)84 _disconnect(LogThreadedFetcherDriver *self)
85 {
86 msg_trace("Fetcher disconnect()", _tag_driver(self));
87 if (self->disconnect)
88 self->disconnect(self);
89 }
90
91 static void
_start_reconnect_timer(LogThreadedFetcherDriver * self)92 _start_reconnect_timer(LogThreadedFetcherDriver *self)
93 {
94 iv_validate_now();
95 self->reconnect_timer.expires = iv_now;
96 self->reconnect_timer.expires.tv_sec += self->time_reopen;
97 iv_timer_register(&self->reconnect_timer);
98 }
99
100 static void
_start_no_data_timer(LogThreadedFetcherDriver * self)101 _start_no_data_timer(LogThreadedFetcherDriver *self)
102 {
103 iv_validate_now();
104 self->no_data_timer.expires = iv_now;
105 timespec_add_msec(&self->no_data_timer.expires, self->no_data_delay);
106 iv_timer_register(&self->no_data_timer);
107 }
108
109 static void
_worker_run(LogThreadedSourceDriver * s)110 _worker_run(LogThreadedSourceDriver *s)
111 {
112 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
113
114 /* iv_init() and iv_deinit() are called by LogThreadedSourceDriver */
115
116 iv_event_register(&self->wakeup_event);
117 iv_event_register(&self->shutdown_event);
118
119 _thread_init(self);
120 if (_connect(self))
121 iv_task_register(&self->fetch_task);
122 else
123 _start_reconnect_timer(self);
124
125 iv_main();
126
127 _disconnect(self);
128 _thread_deinit(self);
129 }
130
131 static void
_worker_request_exit(LogThreadedSourceDriver * s)132 _worker_request_exit(LogThreadedSourceDriver *s)
133 {
134 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
135
136 self->under_termination = TRUE;
137
138 iv_event_post(&self->shutdown_event);
139
140 if (self->request_exit)
141 self->request_exit(self);
142 }
143
144 static void
_wakeup(LogThreadedSourceDriver * s)145 _wakeup(LogThreadedSourceDriver *s)
146 {
147 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
148
149 if (!self->under_termination)
150 iv_event_post(&self->wakeup_event);
151 }
152
153 static inline void
_schedule_next_fetch_if_free_to_send(LogThreadedFetcherDriver * self)154 _schedule_next_fetch_if_free_to_send(LogThreadedFetcherDriver *self)
155 {
156 if (log_threaded_source_free_to_send(&self->super))
157 iv_task_register(&self->fetch_task);
158 else
159 self->suspended = TRUE;
160 }
161
162 static void
_on_fetch_error(LogThreadedFetcherDriver * self)163 _on_fetch_error(LogThreadedFetcherDriver *self)
164 {
165 msg_error("Error during fetching messages", _tag_driver(self));
166 _disconnect(self);
167 _start_reconnect_timer(self);
168 }
169
170 static void
_on_not_connected(LogThreadedFetcherDriver * self)171 _on_not_connected(LogThreadedFetcherDriver *self)
172 {
173 msg_info("Fetcher disconnected while receiving messages, reconnecting", _tag_driver(self));
174 _start_reconnect_timer(self);
175 }
176
177 static void
_on_fetch_success(LogThreadedFetcherDriver * self,LogMessage * msg)178 _on_fetch_success(LogThreadedFetcherDriver *self, LogMessage *msg)
179 {
180 log_threaded_source_post(&self->super, msg);
181 _schedule_next_fetch_if_free_to_send(self);
182 }
183
184 static void
_on_fetch_try_again(LogThreadedFetcherDriver * self)185 _on_fetch_try_again(LogThreadedFetcherDriver *self)
186 {
187 msg_debug("Try again when fetching messages", _tag_driver(self));
188 iv_task_register(&self->fetch_task);
189 }
190
191 static void
_on_fetch_no_data(LogThreadedFetcherDriver * self)192 _on_fetch_no_data(LogThreadedFetcherDriver *self)
193 {
194 msg_debug("No data during fetching messages", _tag_driver(self));
195 _start_no_data_timer(self);
196 }
197
198
199 static void
_fetch(gpointer data)200 _fetch(gpointer data)
201 {
202 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
203
204 msg_trace("Fetcher fetch()", _tag_driver(self));
205
206 LogThreadedFetchResult fetch_result = self->fetch(self);
207
208 switch (fetch_result.result)
209 {
210 case THREADED_FETCH_ERROR:
211 _on_fetch_error(self);
212 break;
213
214 case THREADED_FETCH_NOT_CONNECTED:
215 _on_not_connected(self);
216 break;
217
218 case THREADED_FETCH_SUCCESS:
219 _on_fetch_success(self, fetch_result.msg);
220 break;
221
222 case THREADED_FETCH_TRY_AGAIN:
223 _on_fetch_try_again(self);
224 break;
225
226 case THREADED_FETCH_NO_DATA:
227 _on_fetch_no_data(self);
228 break;
229
230 default:
231 g_assert_not_reached();
232 }
233 }
234
235 static void
_wakeup_event_handler(gpointer data)236 _wakeup_event_handler(gpointer data)
237 {
238 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
239
240 if (self->suspended && log_threaded_source_free_to_send(&self->super))
241 {
242 self->suspended = FALSE;
243
244 if (!iv_task_registered(&self->fetch_task))
245 iv_task_register(&self->fetch_task);
246 }
247 }
248
249 static void
_stop_watches(LogThreadedFetcherDriver * self)250 _stop_watches(LogThreadedFetcherDriver *self)
251 {
252 iv_event_unregister(&self->wakeup_event);
253 iv_event_unregister(&self->shutdown_event);
254
255 if (iv_task_registered(&self->fetch_task))
256 iv_task_unregister(&self->fetch_task);
257
258 if (iv_timer_registered(&self->reconnect_timer))
259 iv_timer_unregister(&self->reconnect_timer);
260
261 if (iv_timer_registered(&self->no_data_timer))
262 iv_timer_unregister(&self->no_data_timer);
263 }
264
265 static void
_shutdown_event_handler(gpointer data)266 _shutdown_event_handler(gpointer data)
267 {
268 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
269
270 _stop_watches(self);
271
272 iv_quit();
273 }
274
275 static void
_reconnect(gpointer data)276 _reconnect(gpointer data)
277 {
278 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
279
280 if (_connect(self))
281 _schedule_next_fetch_if_free_to_send(self);
282 else
283 _start_reconnect_timer(self);
284 }
285
286 static void
_no_data(gpointer data)287 _no_data(gpointer data)
288 {
289 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
290
291 iv_task_register(&self->fetch_task);
292 }
293
294 static void
_init_watches(LogThreadedFetcherDriver * self)295 _init_watches(LogThreadedFetcherDriver *self)
296 {
297 IV_TASK_INIT(&self->fetch_task);
298 self->fetch_task.cookie = self;
299 self->fetch_task.handler = _fetch;
300
301 IV_EVENT_INIT(&self->wakeup_event);
302 self->wakeup_event.cookie = self;
303 self->wakeup_event.handler = _wakeup_event_handler;
304
305 IV_EVENT_INIT(&self->shutdown_event);
306 self->shutdown_event.cookie = self;
307 self->shutdown_event.handler = _shutdown_event_handler;
308
309 IV_TIMER_INIT(&self->reconnect_timer);
310 self->reconnect_timer.cookie = self;
311 self->reconnect_timer.handler = _reconnect;
312
313 IV_TIMER_INIT(&self->no_data_timer);
314 self->no_data_timer.cookie = self;
315 self->no_data_timer.handler = _no_data;
316
317 }
318
319 gboolean
log_threaded_fetcher_driver_init_method(LogPipe * s)320 log_threaded_fetcher_driver_init_method(LogPipe *s)
321 {
322 LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
323 GlobalConfig *cfg = log_pipe_get_config(s);
324
325 if (!log_threaded_source_driver_init_method(s))
326 return FALSE;
327
328 log_threaded_source_set_wakeup_func(&self->super, _wakeup);
329 log_threaded_source_driver_set_worker_run_func(&self->super, _worker_run);
330 log_threaded_source_driver_set_worker_request_exit_func(&self->super, _worker_request_exit);
331
332 g_assert(self->fetch);
333
334 if (self->time_reopen == -1)
335 self->time_reopen = cfg->time_reopen;
336
337 if (self->no_data_delay == -1)
338 log_threaded_fetcher_driver_set_fetch_no_data_delay(&self->super.super.super, cfg->time_reopen);
339
340 return TRUE;
341 }
342
343 gboolean
log_threaded_fetcher_driver_deinit_method(LogPipe * s)344 log_threaded_fetcher_driver_deinit_method(LogPipe *s)
345 {
346 return log_threaded_source_driver_deinit_method(s);
347 }
348
349 void
log_threaded_fetcher_driver_free_method(LogPipe * s)350 log_threaded_fetcher_driver_free_method(LogPipe *s)
351 {
352 log_threaded_source_driver_free_method(s);
353 }
354
355 void
log_threaded_fetcher_driver_init_instance(LogThreadedFetcherDriver * self,GlobalConfig * cfg)356 log_threaded_fetcher_driver_init_instance(LogThreadedFetcherDriver *self, GlobalConfig *cfg)
357 {
358 log_threaded_source_driver_init_instance(&self->super, cfg);
359
360 self->time_reopen = -1;
361 self->no_data_delay = -1;
362
363 _init_watches(self);
364
365 self->super.super.super.super.init = log_threaded_fetcher_driver_init_method;
366 self->super.super.super.super.deinit = log_threaded_fetcher_driver_deinit_method;
367 self->super.super.super.super.free_fn = log_threaded_fetcher_driver_free_method;
368 }
369