1 /*
2 * Copyright (c) 2018 Balabit
3 * Copyright (c) 2018 László Várady <laszlo.varady@balabit.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logthrsourcedrv.h"
26 #include "mainloop-worker.h"
27 #include "messages.h"
28 #include "apphook.h"
29 #include "ack-tracker/ack_tracker_factory.h"
30
31 #include <iv.h>
32
33 typedef struct _WakeupCondition
34 {
35 GMutex lock;
36 GCond cond;
37 gboolean awoken;
38 } WakeupCondition;
39
40 struct _LogThreadedSourceWorker
41 {
42 LogSource super;
43 LogThreadedSourceDriver *control;
44 WakeupCondition wakeup_cond;
45 WorkerOptions options;
46 gboolean under_termination;
47
48 LogThreadedSourceWorkerRunFunc run;
49 LogThreadedSourceWorkerRequestExitFunc request_exit;
50 LogThreadedSourceWorkerWakeupFunc wakeup;
51 };
52
53 static void
wakeup_cond_init(WakeupCondition * cond)54 wakeup_cond_init(WakeupCondition *cond)
55 {
56 g_mutex_init(&cond->lock);
57 g_cond_init(&cond->cond);
58 cond->awoken = TRUE;
59 }
60
61 static void
wakeup_cond_destroy(WakeupCondition * cond)62 wakeup_cond_destroy(WakeupCondition *cond)
63 {
64 g_cond_clear(&cond->cond);
65 g_mutex_clear(&cond->lock);
66 }
67
68 static inline void
wakeup_cond_lock(WakeupCondition * cond)69 wakeup_cond_lock(WakeupCondition *cond)
70 {
71 g_mutex_lock(&cond->lock);
72 }
73
74 static inline void
wakeup_cond_unlock(WakeupCondition * cond)75 wakeup_cond_unlock(WakeupCondition *cond)
76 {
77 g_mutex_unlock(&cond->lock);
78 }
79
80 /* The wakeup lock must be held before calling this function. */
81 static inline void
wakeup_cond_wait(WakeupCondition * cond)82 wakeup_cond_wait(WakeupCondition *cond)
83 {
84 cond->awoken = FALSE;
85 while (!cond->awoken)
86 g_cond_wait(&cond->cond, &cond->lock);
87 }
88
89 static inline void
wakeup_cond_signal(WakeupCondition * cond)90 wakeup_cond_signal(WakeupCondition *cond)
91 {
92 g_mutex_lock(&cond->lock);
93 cond->awoken = TRUE;
94 g_cond_signal(&cond->cond);
95 g_mutex_unlock(&cond->lock);
96 }
97
98 static LogPipe *
log_threaded_source_worker_logpipe(LogThreadedSourceWorker * self)99 log_threaded_source_worker_logpipe(LogThreadedSourceWorker *self)
100 {
101 return &self->super.super;
102 }
103
104 static void
log_threaded_source_worker_set_options(LogThreadedSourceWorker * self,LogThreadedSourceDriver * control,LogThreadedSourceWorkerOptions * options,const gchar * stats_id,const gchar * stats_instance)105 log_threaded_source_worker_set_options(LogThreadedSourceWorker *self, LogThreadedSourceDriver *control,
106 LogThreadedSourceWorkerOptions *options,
107 const gchar *stats_id, const gchar *stats_instance)
108 {
109 log_source_set_options(&self->super, &options->super, stats_id, stats_instance, TRUE,
110 control->super.super.super.expr_node);
111 log_source_set_ack_tracker_factory(&self->super, ack_tracker_factory_ref(options->ack_tracker_factory));
112
113 log_pipe_unref(&self->control->super.super.super);
114 log_pipe_ref(&control->super.super.super);
115 self->control = control;
116 }
117
118 void
log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions * options)119 log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions *options)
120 {
121 log_source_options_defaults(&options->super);
122 msg_format_options_defaults(&options->parse_options);
123 options->parse_options.flags |= LP_SYSLOG_PROTOCOL;
124 options->ack_tracker_factory = NULL;
125 }
126
127 void
log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions * options,GlobalConfig * cfg,const gchar * group_name)128 log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions *options, GlobalConfig *cfg,
129 const gchar *group_name)
130 {
131 log_source_options_init(&options->super, cfg, group_name);
132 msg_format_options_init(&options->parse_options, cfg);
133 }
134
135 void
log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions * options)136 log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions *options)
137 {
138 log_source_options_destroy(&options->super);
139 msg_format_options_destroy(&options->parse_options);
140 ack_tracker_factory_unref(options->ack_tracker_factory);
141 }
142
143 /* The wakeup lock must be held before calling this function. */
144 static void
log_threaded_source_suspend(LogThreadedSourceDriver * self)145 log_threaded_source_suspend(LogThreadedSourceDriver *self)
146 {
147 LogThreadedSourceWorker *worker = self->worker;
148
149 while (!log_threaded_source_free_to_send(self) && !worker->under_termination)
150 wakeup_cond_wait(&worker->wakeup_cond);
151 }
152
153 static void
log_threaded_source_wakeup(LogThreadedSourceDriver * self)154 log_threaded_source_wakeup(LogThreadedSourceDriver *self)
155 {
156 LogThreadedSourceWorker *worker = self->worker;
157
158 wakeup_cond_signal(&worker->wakeup_cond);
159 }
160
161 static void
log_threaded_source_worker_run(LogThreadedSourceWorker * self)162 log_threaded_source_worker_run(LogThreadedSourceWorker *self)
163 {
164 msg_debug("Worker thread started", evt_tag_str("driver", self->control->super.super.id));
165
166 /* ivykis is not used here, but mark-freq() requires all source threads to be iv-initialized. */
167 iv_init();
168
169 self->run(self->control);
170
171 iv_deinit();
172
173 msg_debug("Worker thread finished", evt_tag_str("driver", self->control->super.super.id));
174 }
175
176 static void
log_threaded_source_worker_request_exit(LogThreadedSourceWorker * self)177 log_threaded_source_worker_request_exit(LogThreadedSourceWorker *self)
178 {
179 msg_debug("Requesting worker thread exit", evt_tag_str("driver", self->control->super.super.id));
180 self->under_termination = TRUE;
181 self->request_exit(self->control);
182 log_threaded_source_wakeup(self->control);
183 }
184
185 static void
_worker_wakeup(LogSource * s)186 _worker_wakeup(LogSource *s)
187 {
188 LogThreadedSourceWorker *self = (LogThreadedSourceWorker *) s;
189
190 self->wakeup(self->control);
191 }
192
193 static gboolean
log_threaded_source_worker_init(LogPipe * s)194 log_threaded_source_worker_init(LogPipe *s)
195 {
196 if (!log_source_init(s))
197 return FALSE;
198
199 return TRUE;
200 }
201
202 static void
log_threaded_source_worker_free(LogPipe * s)203 log_threaded_source_worker_free(LogPipe *s)
204 {
205 LogThreadedSourceWorker *self = (LogThreadedSourceWorker *) s;
206
207 wakeup_cond_destroy(&self->wakeup_cond);
208
209 log_pipe_unref(&self->control->super.super.super);
210 self->control = NULL;
211
212 log_source_free(s);
213 }
214
215 static LogThreadedSourceWorker *
log_threaded_source_worker_new(GlobalConfig * cfg)216 log_threaded_source_worker_new(GlobalConfig *cfg)
217 {
218 LogThreadedSourceWorker *self = g_new0(LogThreadedSourceWorker, 1);
219 log_source_init_instance(&self->super, cfg);
220
221 wakeup_cond_init(&self->wakeup_cond);
222
223 self->options.is_external_input = TRUE;
224
225 self->super.super.init = log_threaded_source_worker_init;
226 self->super.super.free_fn = log_threaded_source_worker_free;
227 self->super.wakeup = _worker_wakeup;
228
229 return self;
230 }
231
232 gboolean
log_threaded_source_driver_init_method(LogPipe * s)233 log_threaded_source_driver_init_method(LogPipe *s)
234 {
235 LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
236 GlobalConfig *cfg = log_pipe_get_config(s);
237
238 self->worker = log_threaded_source_worker_new(cfg);
239 self->worker->wakeup = log_threaded_source_wakeup;
240
241 if (!log_src_driver_init_method(s))
242 return FALSE;
243
244 g_assert(self->format_stats_instance);
245
246 log_threaded_source_worker_options_init(&self->worker_options, cfg, self->super.super.group);
247 log_threaded_source_worker_set_options(self->worker, self, &self->worker_options,
248 self->super.super.id, self->format_stats_instance(self));
249
250 LogPipe *worker_pipe = log_threaded_source_worker_logpipe(self->worker);
251 log_pipe_append(worker_pipe, s);
252 if (!log_pipe_init(worker_pipe))
253 {
254 log_pipe_unref(worker_pipe);
255 self->worker = NULL;
256 return FALSE;
257 }
258
259 return TRUE;
260 }
261
262 gboolean
log_threaded_source_driver_deinit_method(LogPipe * s)263 log_threaded_source_driver_deinit_method(LogPipe *s)
264 {
265 LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
266 LogPipe *worker_pipe = log_threaded_source_worker_logpipe(self->worker);
267
268 log_pipe_deinit(worker_pipe);
269 log_pipe_unref(worker_pipe);
270
271 return log_src_driver_deinit_method(s);
272 }
273
274 void
log_threaded_source_driver_free_method(LogPipe * s)275 log_threaded_source_driver_free_method(LogPipe *s)
276 {
277 LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
278
279 log_threaded_source_worker_options_destroy(&self->worker_options);
280
281 log_src_driver_free(s);
282 }
283
284 gboolean
log_threaded_source_driver_start_worker(LogPipe * s)285 log_threaded_source_driver_start_worker(LogPipe *s)
286 {
287 LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
288
289 main_loop_create_worker_thread((WorkerThreadFunc) log_threaded_source_worker_run,
290 (WorkerExitNotificationFunc) log_threaded_source_worker_request_exit,
291 self->worker, &self->worker->options);
292
293 return TRUE;
294 }
295
296 void
log_threaded_source_driver_set_worker_run_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerRunFunc run)297 log_threaded_source_driver_set_worker_run_func(LogThreadedSourceDriver *self, LogThreadedSourceWorkerRunFunc run)
298 {
299 self->worker->run = run;
300 }
301
302 void
log_threaded_source_driver_set_worker_request_exit_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerRequestExitFunc request_exit)303 log_threaded_source_driver_set_worker_request_exit_func(LogThreadedSourceDriver *self,
304 LogThreadedSourceWorkerRequestExitFunc request_exit)
305 {
306 self->worker->request_exit = request_exit;
307 }
308
309 void
log_threaded_source_set_wakeup_func(LogThreadedSourceDriver * self,LogThreadedSourceWorkerWakeupFunc wakeup)310 log_threaded_source_set_wakeup_func(LogThreadedSourceDriver *self, LogThreadedSourceWorkerWakeupFunc wakeup)
311 {
312 self->worker->wakeup = wakeup;
313 }
314
315 static gboolean
_is_default_priority_or_facility_set(MsgFormatOptions * parse_options)316 _is_default_priority_or_facility_set(MsgFormatOptions *parse_options)
317 {
318 return parse_options->default_pri != 0xFFFF;
319 }
320
321 static void
_apply_default_priority_and_facility(LogThreadedSourceDriver * self,LogMessage * msg)322 _apply_default_priority_and_facility(LogThreadedSourceDriver *self, LogMessage *msg)
323 {
324 MsgFormatOptions *parse_options = &self->worker_options.parse_options;
325 if (!_is_default_priority_or_facility_set(parse_options))
326 return;
327 msg->pri = parse_options->default_pri;
328 }
329
330 void
log_threaded_source_post(LogThreadedSourceDriver * self,LogMessage * msg)331 log_threaded_source_post(LogThreadedSourceDriver *self, LogMessage *msg)
332 {
333 msg_debug("Incoming log message", evt_tag_str("msg", log_msg_get_value(msg, LM_V_MESSAGE, NULL)));
334 _apply_default_priority_and_facility(self, msg);
335 log_source_post(&self->worker->super, msg);
336 }
337
338 gboolean
log_threaded_source_free_to_send(LogThreadedSourceDriver * self)339 log_threaded_source_free_to_send(LogThreadedSourceDriver *self)
340 {
341 return log_source_free_to_send(&self->worker->super);
342 }
343
344 void
log_threaded_source_blocking_post(LogThreadedSourceDriver * self,LogMessage * msg)345 log_threaded_source_blocking_post(LogThreadedSourceDriver *self, LogMessage *msg)
346 {
347 LogThreadedSourceWorker *worker = self->worker;
348
349 log_threaded_source_post(self, msg);
350
351 /*
352 * The wakeup lock must be held before calling free_to_send() and suspend(),
353 * otherwise g_cond_signal() might be called between free_to_send() and
354 * suspend(). We'd hang in that case.
355 *
356 * LogReader does not have such a lock, but this is because it runs an ivykis
357 * loop with a _synchronized_ event queue, where suspend() and the
358 * "schedule_wakeup" event are guaranteed to be scheduled in the right order.
359 */
360
361 wakeup_cond_lock(&worker->wakeup_cond);
362 if (!log_threaded_source_free_to_send(self))
363 log_threaded_source_suspend(self);
364 wakeup_cond_unlock(&worker->wakeup_cond);
365 }
366
367 void
log_threaded_source_driver_init_instance(LogThreadedSourceDriver * self,GlobalConfig * cfg)368 log_threaded_source_driver_init_instance(LogThreadedSourceDriver *self, GlobalConfig *cfg)
369 {
370 log_src_driver_init_instance(&self->super, cfg);
371
372 log_threaded_source_worker_options_defaults(&self->worker_options);
373
374 self->super.super.super.init = log_threaded_source_driver_init_method;
375 self->super.super.super.deinit = log_threaded_source_driver_deinit_method;
376 self->super.super.super.free_fn = log_threaded_source_driver_free_method;
377 self->super.super.super.on_config_inited = log_threaded_source_driver_start_worker;
378 }
379