1 /*
2  * Copyright (c) 2018 Balabit
3  * Copyright (c) 2018 László Várady <laszlo.varady@balabit.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logthrfetcherdrv.h"
26 #include "messages.h"
27 #include "timeutils/misc.h"
28 
29 #define SEC_TO_MSEC(x) ((x) * 1000)
30 
31 void
log_threaded_fetcher_driver_set_fetch_no_data_delay(LogDriver * s,gdouble no_data_delay)32 log_threaded_fetcher_driver_set_fetch_no_data_delay(LogDriver *s, gdouble no_data_delay)
33 {
34   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
35   self->no_data_delay = (gint64) SEC_TO_MSEC(no_data_delay);
36 }
37 
38 void
log_threaded_fetcher_driver_set_time_reopen(LogDriver * s,time_t time_reopen)39 log_threaded_fetcher_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
40 {
41   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
42   self->time_reopen = time_reopen;
43 }
44 
45 static EVTTAG *
_tag_driver(LogThreadedFetcherDriver * f)46 _tag_driver(LogThreadedFetcherDriver *f)
47 {
48   return evt_tag_str("driver", f->super.super.super.id);
49 }
50 
51 static inline void
_thread_init(LogThreadedFetcherDriver * self)52 _thread_init(LogThreadedFetcherDriver *self)
53 {
54   msg_trace("Fetcher thread_init()", _tag_driver(self));
55   if (self->thread_init)
56     self->thread_init(self);
57 }
58 
59 static inline void
_thread_deinit(LogThreadedFetcherDriver * self)60 _thread_deinit(LogThreadedFetcherDriver *self)
61 {
62   msg_trace("Fetcher thread_deinit()", _tag_driver(self));
63   if (self->thread_deinit)
64     self->thread_deinit(self);
65 }
66 
67 static inline gboolean
_connect(LogThreadedFetcherDriver * self)68 _connect(LogThreadedFetcherDriver *self)
69 {
70   msg_trace("Fetcher connect()", _tag_driver(self));
71   if (!self->connect)
72     return TRUE;
73 
74   if (!self->connect(self))
75     {
76       msg_debug("Error establishing connection", _tag_driver(self));
77       return FALSE;
78     }
79 
80   return TRUE;
81 }
82 
83 static inline void
_disconnect(LogThreadedFetcherDriver * self)84 _disconnect(LogThreadedFetcherDriver *self)
85 {
86   msg_trace("Fetcher disconnect()", _tag_driver(self));
87   if (self->disconnect)
88     self->disconnect(self);
89 }
90 
91 static void
_start_reconnect_timer(LogThreadedFetcherDriver * self)92 _start_reconnect_timer(LogThreadedFetcherDriver *self)
93 {
94   iv_validate_now();
95   self->reconnect_timer.expires  = iv_now;
96   self->reconnect_timer.expires.tv_sec += self->time_reopen;
97   iv_timer_register(&self->reconnect_timer);
98 }
99 
100 static void
_start_no_data_timer(LogThreadedFetcherDriver * self)101 _start_no_data_timer(LogThreadedFetcherDriver *self)
102 {
103   iv_validate_now();
104   self->no_data_timer.expires  = iv_now;
105   timespec_add_msec(&self->no_data_timer.expires, self->no_data_delay);
106   iv_timer_register(&self->no_data_timer);
107 }
108 
109 static void
_worker_run(LogThreadedSourceDriver * s)110 _worker_run(LogThreadedSourceDriver *s)
111 {
112   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
113 
114   /* iv_init() and iv_deinit() are called by LogThreadedSourceDriver */
115 
116   iv_event_register(&self->wakeup_event);
117   iv_event_register(&self->shutdown_event);
118 
119   _thread_init(self);
120   if (_connect(self))
121     iv_task_register(&self->fetch_task);
122   else
123     _start_reconnect_timer(self);
124 
125   iv_main();
126 
127   _disconnect(self);
128   _thread_deinit(self);
129 }
130 
131 static void
_worker_request_exit(LogThreadedSourceDriver * s)132 _worker_request_exit(LogThreadedSourceDriver *s)
133 {
134   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
135 
136   self->under_termination = TRUE;
137 
138   iv_event_post(&self->shutdown_event);
139 
140   if (self->request_exit)
141     self->request_exit(self);
142 }
143 
144 static void
_wakeup(LogThreadedSourceDriver * s)145 _wakeup(LogThreadedSourceDriver *s)
146 {
147   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
148 
149   if (!self->under_termination)
150     iv_event_post(&self->wakeup_event);
151 }
152 
153 static inline void
_schedule_next_fetch_if_free_to_send(LogThreadedFetcherDriver * self)154 _schedule_next_fetch_if_free_to_send(LogThreadedFetcherDriver *self)
155 {
156   if (log_threaded_source_free_to_send(&self->super))
157     iv_task_register(&self->fetch_task);
158   else
159     self->suspended = TRUE;
160 }
161 
162 static void
_on_fetch_error(LogThreadedFetcherDriver * self)163 _on_fetch_error(LogThreadedFetcherDriver *self)
164 {
165   msg_error("Error during fetching messages", _tag_driver(self));
166   _disconnect(self);
167   _start_reconnect_timer(self);
168 }
169 
170 static void
_on_not_connected(LogThreadedFetcherDriver * self)171 _on_not_connected(LogThreadedFetcherDriver *self)
172 {
173   msg_info("Fetcher disconnected while receiving messages, reconnecting", _tag_driver(self));
174   _start_reconnect_timer(self);
175 }
176 
177 static void
_on_fetch_success(LogThreadedFetcherDriver * self,LogMessage * msg)178 _on_fetch_success(LogThreadedFetcherDriver *self, LogMessage *msg)
179 {
180   log_threaded_source_post(&self->super, msg);
181   _schedule_next_fetch_if_free_to_send(self);
182 }
183 
184 static void
_on_fetch_try_again(LogThreadedFetcherDriver * self)185 _on_fetch_try_again(LogThreadedFetcherDriver *self)
186 {
187   msg_debug("Try again when fetching messages", _tag_driver(self));
188   iv_task_register(&self->fetch_task);
189 }
190 
191 static void
_on_fetch_no_data(LogThreadedFetcherDriver * self)192 _on_fetch_no_data(LogThreadedFetcherDriver *self)
193 {
194   msg_debug("No data during fetching messages", _tag_driver(self));
195   _start_no_data_timer(self);
196 }
197 
198 
199 static void
_fetch(gpointer data)200 _fetch(gpointer data)
201 {
202   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
203 
204   msg_trace("Fetcher fetch()", _tag_driver(self));
205 
206   LogThreadedFetchResult fetch_result = self->fetch(self);
207 
208   switch (fetch_result.result)
209     {
210     case THREADED_FETCH_ERROR:
211       _on_fetch_error(self);
212       break;
213 
214     case THREADED_FETCH_NOT_CONNECTED:
215       _on_not_connected(self);
216       break;
217 
218     case THREADED_FETCH_SUCCESS:
219       _on_fetch_success(self, fetch_result.msg);
220       break;
221 
222     case THREADED_FETCH_TRY_AGAIN:
223       _on_fetch_try_again(self);
224       break;
225 
226     case THREADED_FETCH_NO_DATA:
227       _on_fetch_no_data(self);
228       break;
229 
230     default:
231       g_assert_not_reached();
232     }
233 }
234 
235 static void
_wakeup_event_handler(gpointer data)236 _wakeup_event_handler(gpointer data)
237 {
238   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
239 
240   if (self->suspended && log_threaded_source_free_to_send(&self->super))
241     {
242       self->suspended = FALSE;
243 
244       if (!iv_task_registered(&self->fetch_task))
245         iv_task_register(&self->fetch_task);
246     }
247 }
248 
249 static void
_stop_watches(LogThreadedFetcherDriver * self)250 _stop_watches(LogThreadedFetcherDriver *self)
251 {
252   iv_event_unregister(&self->wakeup_event);
253   iv_event_unregister(&self->shutdown_event);
254 
255   if (iv_task_registered(&self->fetch_task))
256     iv_task_unregister(&self->fetch_task);
257 
258   if (iv_timer_registered(&self->reconnect_timer))
259     iv_timer_unregister(&self->reconnect_timer);
260 
261   if (iv_timer_registered(&self->no_data_timer))
262     iv_timer_unregister(&self->no_data_timer);
263 }
264 
265 static void
_shutdown_event_handler(gpointer data)266 _shutdown_event_handler(gpointer data)
267 {
268   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
269 
270   _stop_watches(self);
271 
272   iv_quit();
273 }
274 
275 static void
_reconnect(gpointer data)276 _reconnect(gpointer data)
277 {
278   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
279 
280   if (_connect(self))
281     _schedule_next_fetch_if_free_to_send(self);
282   else
283     _start_reconnect_timer(self);
284 }
285 
286 static void
_no_data(gpointer data)287 _no_data(gpointer data)
288 {
289   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) data;
290 
291   iv_task_register(&self->fetch_task);
292 }
293 
294 static void
_init_watches(LogThreadedFetcherDriver * self)295 _init_watches(LogThreadedFetcherDriver *self)
296 {
297   IV_TASK_INIT(&self->fetch_task);
298   self->fetch_task.cookie = self;
299   self->fetch_task.handler = _fetch;
300 
301   IV_EVENT_INIT(&self->wakeup_event);
302   self->wakeup_event.cookie = self;
303   self->wakeup_event.handler = _wakeup_event_handler;
304 
305   IV_EVENT_INIT(&self->shutdown_event);
306   self->shutdown_event.cookie = self;
307   self->shutdown_event.handler = _shutdown_event_handler;
308 
309   IV_TIMER_INIT(&self->reconnect_timer);
310   self->reconnect_timer.cookie = self;
311   self->reconnect_timer.handler = _reconnect;
312 
313   IV_TIMER_INIT(&self->no_data_timer);
314   self->no_data_timer.cookie = self;
315   self->no_data_timer.handler = _no_data;
316 
317 }
318 
319 gboolean
log_threaded_fetcher_driver_init_method(LogPipe * s)320 log_threaded_fetcher_driver_init_method(LogPipe *s)
321 {
322   LogThreadedFetcherDriver *self = (LogThreadedFetcherDriver *) s;
323   GlobalConfig *cfg = log_pipe_get_config(s);
324 
325   if (!log_threaded_source_driver_init_method(s))
326     return FALSE;
327 
328   log_threaded_source_set_wakeup_func(&self->super, _wakeup);
329   log_threaded_source_driver_set_worker_run_func(&self->super, _worker_run);
330   log_threaded_source_driver_set_worker_request_exit_func(&self->super, _worker_request_exit);
331 
332   g_assert(self->fetch);
333 
334   if (self->time_reopen == -1)
335     self->time_reopen = cfg->time_reopen;
336 
337   if (self->no_data_delay == -1)
338     log_threaded_fetcher_driver_set_fetch_no_data_delay(&self->super.super.super, cfg->time_reopen);
339 
340   return TRUE;
341 }
342 
343 gboolean
log_threaded_fetcher_driver_deinit_method(LogPipe * s)344 log_threaded_fetcher_driver_deinit_method(LogPipe *s)
345 {
346   return log_threaded_source_driver_deinit_method(s);
347 }
348 
349 void
log_threaded_fetcher_driver_free_method(LogPipe * s)350 log_threaded_fetcher_driver_free_method(LogPipe *s)
351 {
352   log_threaded_source_driver_free_method(s);
353 }
354 
355 void
log_threaded_fetcher_driver_init_instance(LogThreadedFetcherDriver * self,GlobalConfig * cfg)356 log_threaded_fetcher_driver_init_instance(LogThreadedFetcherDriver *self, GlobalConfig *cfg)
357 {
358   log_threaded_source_driver_init_instance(&self->super, cfg);
359 
360   self->time_reopen = -1;
361   self->no_data_delay = -1;
362 
363   _init_watches(self);
364 
365   self->super.super.super.super.init = log_threaded_fetcher_driver_init_method;
366   self->super.super.super.super.deinit = log_threaded_fetcher_driver_deinit_method;
367   self->super.super.super.super.free_fn = log_threaded_fetcher_driver_free_method;
368 }
369