1 /*
2  * Copyright (c) 2018 Balabit
3  * Copyright (c) 2018 László Várady <laszlo.varady@balabit.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logthrsourcedrv.h"
26 #include "mainloop-worker.h"
27 #include "messages.h"
28 #include "apphook.h"
29 #include "ack-tracker/ack_tracker_factory.h"
30 
31 #include <iv.h>
32 
33 typedef struct _WakeupCondition
34 {
35   GMutex lock;
36   GCond cond;
37   gboolean awoken;
38 } WakeupCondition;
39 
40 struct _LogThreadedSourceWorker
41 {
42   LogSource super;
43   LogThreadedSourceDriver *control;
44   WakeupCondition wakeup_cond;
45   WorkerOptions options;
46   gboolean under_termination;
47 
48   LogThreadedSourceWorkerRunFunc run;
49   LogThreadedSourceWorkerRequestExitFunc request_exit;
50   LogThreadedSourceWorkerWakeupFunc wakeup;
51 };
52 
53 static void
wakeup_cond_init(WakeupCondition * cond)54 wakeup_cond_init(WakeupCondition *cond)
55 {
56   g_mutex_init(&cond->lock);
57   g_cond_init(&cond->cond);
58   cond->awoken = TRUE;
59 }
60 
61 static void
wakeup_cond_destroy(WakeupCondition * cond)62 wakeup_cond_destroy(WakeupCondition *cond)
63 {
64   g_cond_clear(&cond->cond);
65   g_mutex_clear(&cond->lock);
66 }
67 
68 static inline void
wakeup_cond_lock(WakeupCondition * cond)69 wakeup_cond_lock(WakeupCondition *cond)
70 {
71   g_mutex_lock(&cond->lock);
72 }
73 
74 static inline void
wakeup_cond_unlock(WakeupCondition * cond)75 wakeup_cond_unlock(WakeupCondition *cond)
76 {
77   g_mutex_unlock(&cond->lock);
78 }
79 
80 /* The wakeup lock must be held before calling this function. */
81 static inline void
wakeup_cond_wait(WakeupCondition * cond)82 wakeup_cond_wait(WakeupCondition *cond)
83 {
84   cond->awoken = FALSE;
85   while (!cond->awoken)
86     g_cond_wait(&cond->cond, &cond->lock);
87 }
88 
89 static inline void
wakeup_cond_signal(WakeupCondition * cond)90 wakeup_cond_signal(WakeupCondition *cond)
91 {
92   g_mutex_lock(&cond->lock);
93   cond->awoken = TRUE;
94   g_cond_signal(&cond->cond);
95   g_mutex_unlock(&cond->lock);
96 }
97 
98 static LogPipe *
log_threaded_source_worker_logpipe(LogThreadedSourceWorker * self)99 log_threaded_source_worker_logpipe(LogThreadedSourceWorker *self)
100 {
101   return &self->super.super;
102 }
103 
104 static void
log_threaded_source_worker_set_options(LogThreadedSourceWorker * self,LogThreadedSourceDriver * control,LogThreadedSourceWorkerOptions * options,const gchar * stats_id,const gchar * stats_instance)105 log_threaded_source_worker_set_options(LogThreadedSourceWorker *self, LogThreadedSourceDriver *control,
106                                        LogThreadedSourceWorkerOptions *options,
107                                        const gchar *stats_id, const gchar *stats_instance)
108 {
109   log_source_set_options(&self->super, &options->super, stats_id, stats_instance, TRUE,
110                          control->super.super.super.expr_node);
111   log_source_set_ack_tracker_factory(&self->super, ack_tracker_factory_ref(options->ack_tracker_factory));
112 
113   log_pipe_unref(&self->control->super.super.super);
114   log_pipe_ref(&control->super.super.super);
115   self->control = control;
116 }
117 
118 void
log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions * options)119 log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions *options)
120 {
121   log_source_options_defaults(&options->super);
122   msg_format_options_defaults(&options->parse_options);
123   options->parse_options.flags |= LP_SYSLOG_PROTOCOL;
124   options->ack_tracker_factory = NULL;
125 }
126 
127 void
log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions * options,GlobalConfig * cfg,const gchar * group_name)128 log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions *options, GlobalConfig *cfg,
129                                         const gchar *group_name)
130 {
131   log_source_options_init(&options->super, cfg, group_name);
132   msg_format_options_init(&options->parse_options, cfg);
133 }
134 
135 void
log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions * options)136 log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions *options)
137 {
138   log_source_options_destroy(&options->super);
139   msg_format_options_destroy(&options->parse_options);
140   ack_tracker_factory_unref(options->ack_tracker_factory);
141 }
142 
143 /* The wakeup lock must be held before calling this function. */
144 static void
log_threaded_source_suspend(LogThreadedSourceDriver * self)145 log_threaded_source_suspend(LogThreadedSourceDriver *self)
146 {
147   LogThreadedSourceWorker *worker = self->worker;
148 
149   while (!log_threaded_source_free_to_send(self) && !worker->under_termination)
150     wakeup_cond_wait(&worker->wakeup_cond);
151 }
152 
153 static void
log_threaded_source_wakeup(LogThreadedSourceDriver * self)154 log_threaded_source_wakeup(LogThreadedSourceDriver *self)
155 {
156   LogThreadedSourceWorker *worker = self->worker;
157 
158   wakeup_cond_signal(&worker->wakeup_cond);
159 }
160 
161 static void
log_threaded_source_worker_run(LogThreadedSourceWorker * self)162 log_threaded_source_worker_run(LogThreadedSourceWorker *self)
163 {
164   msg_debug("Worker thread started", evt_tag_str("driver", self->control->super.super.id));
165 
166   /* ivykis is not used here, but mark-freq() requires all source threads to be iv-initialized. */
167   iv_init();
168 
169   self->run(self->control);
170 
171   iv_deinit();
172 
173   msg_debug("Worker thread finished", evt_tag_str("driver", self->control->super.super.id));
174 }
175 
176 static void
log_threaded_source_worker_request_exit(LogThreadedSourceWorker * self)177 log_threaded_source_worker_request_exit(LogThreadedSourceWorker *self)
178 {
179   msg_debug("Requesting worker thread exit", evt_tag_str("driver", self->control->super.super.id));
180   self->under_termination = TRUE;
181   self->request_exit(self->control);
182   log_threaded_source_wakeup(self->control);
183 }
184 
185 static void
_worker_wakeup(LogSource * s)186 _worker_wakeup(LogSource *s)
187 {
188   LogThreadedSourceWorker *self = (LogThreadedSourceWorker *) s;
189 
190   self->wakeup(self->control);
191 }
192 
193 static gboolean
log_threaded_source_worker_init(LogPipe * s)194 log_threaded_source_worker_init(LogPipe *s)
195 {
196   if (!log_source_init(s))
197     return FALSE;
198 
199   return TRUE;
200 }
201 
202 static void
log_threaded_source_worker_free(LogPipe * s)203 log_threaded_source_worker_free(LogPipe *s)
204 {
205   LogThreadedSourceWorker *self = (LogThreadedSourceWorker *) s;
206 
207   wakeup_cond_destroy(&self->wakeup_cond);
208 
209   log_pipe_unref(&self->control->super.super.super);
210   self->control = NULL;
211 
212   log_source_free(s);
213 }
214 
215 static LogThreadedSourceWorker *
log_threaded_source_worker_new(GlobalConfig * cfg)216 log_threaded_source_worker_new(GlobalConfig *cfg)
217 {
218   LogThreadedSourceWorker *self = g_new0(LogThreadedSourceWorker, 1);
219   log_source_init_instance(&self->super, cfg);
220 
221   wakeup_cond_init(&self->wakeup_cond);
222 
223   self->options.is_external_input = TRUE;
224 
225   self->super.super.init = log_threaded_source_worker_init;
226   self->super.super.free_fn = log_threaded_source_worker_free;
227   self->super.wakeup = _worker_wakeup;
228 
229   return self;
230 }
231 
232 gboolean
log_threaded_source_driver_init_method(LogPipe * s)233 log_threaded_source_driver_init_method(LogPipe *s)
234 {
235   LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
236   GlobalConfig *cfg = log_pipe_get_config(s);
237 
238   self->worker = log_threaded_source_worker_new(cfg);
239   self->worker->wakeup = log_threaded_source_wakeup;
240 
241   if (!log_src_driver_init_method(s))
242     return FALSE;
243 
244   g_assert(self->format_stats_instance);
245 
246   log_threaded_source_worker_options_init(&self->worker_options, cfg, self->super.super.group);
247   log_threaded_source_worker_set_options(self->worker, self, &self->worker_options,
248                                          self->super.super.id, self->format_stats_instance(self));
249 
250   LogPipe *worker_pipe = log_threaded_source_worker_logpipe(self->worker);
251   log_pipe_append(worker_pipe, s);
252   if (!log_pipe_init(worker_pipe))
253     {
254       log_pipe_unref(worker_pipe);
255       self->worker = NULL;
256       return FALSE;
257     }
258 
259   return TRUE;
260 }
261 
262 gboolean
log_threaded_source_driver_deinit_method(LogPipe * s)263 log_threaded_source_driver_deinit_method(LogPipe *s)
264 {
265   LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
266   LogPipe *worker_pipe = log_threaded_source_worker_logpipe(self->worker);
267 
268   log_pipe_deinit(worker_pipe);
269   log_pipe_unref(worker_pipe);
270 
271   return log_src_driver_deinit_method(s);
272 }
273 
274 void
log_threaded_source_driver_free_method(LogPipe * s)275 log_threaded_source_driver_free_method(LogPipe *s)
276 {
277   LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
278 
279   log_threaded_source_worker_options_destroy(&self->worker_options);
280 
281   log_src_driver_free(s);
282 }
283 
284 gboolean
log_threaded_source_driver_start_worker(LogPipe * s)285 log_threaded_source_driver_start_worker(LogPipe *s)
286 {
287   LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
288 
289   main_loop_create_worker_thread((WorkerThreadFunc) log_threaded_source_worker_run,
290                                  (WorkerExitNotificationFunc) log_threaded_source_worker_request_exit,
291                                  self->worker, &self->worker->options);
292 
293   return TRUE;
294 }
295 
296 void
log_threaded_source_driver_set_worker_run_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerRunFunc run)297 log_threaded_source_driver_set_worker_run_func(LogThreadedSourceDriver *self, LogThreadedSourceWorkerRunFunc run)
298 {
299   self->worker->run = run;
300 }
301 
302 void
log_threaded_source_driver_set_worker_request_exit_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerRequestExitFunc request_exit)303 log_threaded_source_driver_set_worker_request_exit_func(LogThreadedSourceDriver *self,
304                                                         LogThreadedSourceWorkerRequestExitFunc request_exit)
305 {
306   self->worker->request_exit = request_exit;
307 }
308 
309 void
log_threaded_source_set_wakeup_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerWakeupFunc wakeup)310 log_threaded_source_set_wakeup_func(LogThreadedSourceDriver *self, LogThreadedSourceWorkerWakeupFunc wakeup)
311 {
312   self->worker->wakeup = wakeup;
313 }
314 
315 static gboolean
_is_default_priority_or_facility_set(MsgFormatOptions * parse_options)316 _is_default_priority_or_facility_set(MsgFormatOptions *parse_options)
317 {
318   return parse_options->default_pri != 0xFFFF;
319 }
320 
321 static void
_apply_default_priority_and_facility(LogThreadedSourceDriver * self,LogMessage * msg)322 _apply_default_priority_and_facility(LogThreadedSourceDriver *self, LogMessage *msg)
323 {
324   MsgFormatOptions *parse_options = &self->worker_options.parse_options;
325   if (!_is_default_priority_or_facility_set(parse_options))
326     return;
327   msg->pri = parse_options->default_pri;
328 }
329 
330 void
log_threaded_source_post(LogThreadedSourceDriver * self,LogMessage * msg)331 log_threaded_source_post(LogThreadedSourceDriver *self, LogMessage *msg)
332 {
333   msg_debug("Incoming log message", evt_tag_str("msg", log_msg_get_value(msg, LM_V_MESSAGE, NULL)));
334   _apply_default_priority_and_facility(self, msg);
335   log_source_post(&self->worker->super, msg);
336 }
337 
338 gboolean
log_threaded_source_free_to_send(LogThreadedSourceDriver * self)339 log_threaded_source_free_to_send(LogThreadedSourceDriver *self)
340 {
341   return log_source_free_to_send(&self->worker->super);
342 }
343 
344 void
log_threaded_source_blocking_post(LogThreadedSourceDriver * self,LogMessage * msg)345 log_threaded_source_blocking_post(LogThreadedSourceDriver *self, LogMessage *msg)
346 {
347   LogThreadedSourceWorker *worker = self->worker;
348 
349   log_threaded_source_post(self, msg);
350 
351   /*
352    * The wakeup lock must be held before calling free_to_send() and suspend(),
353    * otherwise g_cond_signal() might be called between free_to_send() and
354    * suspend(). We'd hang in that case.
355    *
356    * LogReader does not have such a lock, but this is because it runs an ivykis
357    * loop with a _synchronized_ event queue, where suspend() and the
358    * "schedule_wakeup" event are guaranteed to be scheduled in the right order.
359    */
360 
361   wakeup_cond_lock(&worker->wakeup_cond);
362   if (!log_threaded_source_free_to_send(self))
363     log_threaded_source_suspend(self);
364   wakeup_cond_unlock(&worker->wakeup_cond);
365 }
366 
367 void
log_threaded_source_driver_init_instance(LogThreadedSourceDriver * self,GlobalConfig * cfg)368 log_threaded_source_driver_init_instance(LogThreadedSourceDriver *self, GlobalConfig *cfg)
369 {
370   log_src_driver_init_instance(&self->super, cfg);
371 
372   log_threaded_source_worker_options_defaults(&self->worker_options);
373 
374   self->super.super.super.init = log_threaded_source_driver_init_method;
375   self->super.super.super.deinit = log_threaded_source_driver_deinit_method;
376   self->super.super.super.free_fn = log_threaded_source_driver_free_method;
377   self->super.super.super.on_config_inited = log_threaded_source_driver_start_worker;
378 }
379