1 /*
2 * Copyright (c) 2013, 2014 Balabit
3 * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "stats/stats-cluster-logpipe.h"
26 #include "stats/stats-cluster-single.h"
27 #include "stats/aggregator/stats-aggregator-registry.h"
28 #include "logthrdestdrv.h"
29 #include "seqnum.h"
30 #include "scratch-buffers.h"
31 #include "timeutils/misc.h"
32
33 #define MAX_RETRIES_ON_ERROR_DEFAULT 3
34 #define MAX_RETRIES_BEFORE_SUSPEND_DEFAULT 3
35
36 static void _init_stats_key(LogThreadedDestDriver *self, StatsClusterKey *sc_key);
37
38 const gchar *
log_threaded_result_to_str(LogThreadedResult self)39 log_threaded_result_to_str(LogThreadedResult self)
40 {
41 g_assert(self <= LTR_MAX);
42
43 static const gchar *as_str[] = { "DROP",
44 "ERROR",
45 "EXPLICIT_ACK_MGMT",
46 "SUCCESS",
47 "QUEUED",
48 "NOT_CONNECTED",
49 "RETRY",
50 "MAX"
51 };
52
53 return as_str[self];
54 }
55
56 /* LogThreadedDestWorker */
57
58 void
log_threaded_dest_driver_set_batch_lines(LogDriver * s,gint batch_lines)59 log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines)
60 {
61 LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
62
63 self->batch_lines = batch_lines;
64 }
65
66 void
log_threaded_dest_driver_set_batch_timeout(LogDriver * s,gint batch_timeout)67 log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout)
68 {
69 LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
70
71 self->batch_timeout = batch_timeout;
72 }
73
74 void
log_threaded_dest_driver_set_time_reopen(LogDriver * s,time_t time_reopen)75 log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
76 {
77 LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
78
79 self->time_reopen = time_reopen;
80 }
81
82 /* this should be used in combination with LTR_EXPLICIT_ACK_MGMT to actually confirm message delivery. */
83 void
log_threaded_dest_worker_ack_messages(LogThreadedDestWorker * self,gint batch_size)84 log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size)
85 {
86 log_queue_ack_backlog(self->queue, batch_size);
87 stats_counter_add(self->owner->written_messages, batch_size);
88 self->retries_on_error_counter = 0;
89 self->batch_size -= batch_size;
90 }
91
92 void
log_threaded_dest_worker_drop_messages(LogThreadedDestWorker * self,gint batch_size)93 log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size)
94 {
95 log_queue_ack_backlog(self->queue, batch_size);
96 stats_counter_add(self->owner->dropped_messages, batch_size);
97 self->retries_on_error_counter = 0;
98 self->batch_size -= batch_size;
99 }
100
101 void
log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker * self,gint batch_size)102 log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size)
103 {
104 log_queue_rewind_backlog(self->queue, batch_size);
105 self->rewound_batch_size = self->batch_size;
106 self->batch_size -= batch_size;
107 }
108
109 static gchar *
_format_queue_persist_name(LogThreadedDestWorker * self)110 _format_queue_persist_name(LogThreadedDestWorker *self)
111 {
112 LogPipe *owner = &self->owner->super.super.super;
113
114 if (self->worker_index == 0)
115 {
116 /* the first worker uses the legacy persist name, e.g. to be able to
117 * recover the queue previously used. */
118 return g_strdup(log_pipe_get_persist_name(owner));
119 }
120 else
121 {
122 return g_strdup_printf("%s.%d.queue",
123 log_pipe_get_persist_name(owner),
124 self->worker_index);
125 }
126 }
127
128
129 static gboolean
_should_flush_now(LogThreadedDestWorker * self)130 _should_flush_now(LogThreadedDestWorker *self)
131 {
132 struct timespec now;
133 glong diff;
134
135 if (self->owner->batch_timeout <= 0 ||
136 self->owner->batch_lines <= 1 ||
137 !self->enable_batching)
138 return TRUE;
139
140 iv_validate_now();
141 now = iv_now;
142 diff = timespec_diff_msec(&now, &self->last_flush_time);
143
144 return (diff >= self->owner->batch_timeout);
145 }
146
147 static void
_stop_watches(LogThreadedDestWorker * self)148 _stop_watches(LogThreadedDestWorker *self)
149 {
150 if (iv_task_registered(&self->do_work))
151 {
152 iv_task_unregister(&self->do_work);
153 }
154 if (iv_timer_registered(&self->timer_reopen))
155 {
156 iv_timer_unregister(&self->timer_reopen);
157 }
158 if (iv_timer_registered(&self->timer_throttle))
159 {
160 iv_timer_unregister(&self->timer_throttle);
161 }
162 if (iv_timer_registered(&self->timer_flush))
163 {
164 iv_timer_unregister(&self->timer_flush);
165 }
166 }
167
168 /* NOTE: runs in the worker thread in response to a wakeup event being
169 * posted, which happens if a new element is added to our queue while we
170 * were sleeping */
171 static void
_wakeup_event_callback(gpointer data)172 _wakeup_event_callback(gpointer data)
173 {
174 LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
175
176 if (!iv_task_registered(&self->do_work))
177 {
178 iv_task_register(&self->do_work);
179 }
180 }
181
182 /* NOTE: runs in the worker thread in response to the shutdown event being
183 * posted. The shutdown event is initiated by the mainloop when the
184 * configuration is deinited */
185 static void
_shutdown_event_callback(gpointer data)186 _shutdown_event_callback(gpointer data)
187 {
188 LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
189
190 log_queue_reset_parallel_push(self->queue);
191 _stop_watches(self);
192 iv_quit();
193 }
194
195 /* NOTE: runs in the worker thread */
196 static void
_suspend(LogThreadedDestWorker * self)197 _suspend(LogThreadedDestWorker *self)
198 {
199 self->suspended = TRUE;
200 }
201
202 /* NOTE: runs in the worker thread */
203 static void
_connect(LogThreadedDestWorker * self)204 _connect(LogThreadedDestWorker *self)
205 {
206 if (!log_threaded_dest_worker_connect(self))
207 {
208 msg_debug("Error establishing connection to server",
209 evt_tag_str("driver", self->owner->super.super.id),
210 evt_tag_int("worker_index", self->worker_index),
211 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
212 _suspend(self);
213 }
214 }
215
216 /* NOTE: runs in the worker thread */
217 static void
_disconnect(LogThreadedDestWorker * self)218 _disconnect(LogThreadedDestWorker *self)
219 {
220 log_threaded_dest_worker_disconnect(self);
221 }
222
223 /* NOTE: runs in the worker thread */
224 static void
_disconnect_and_suspend(LogThreadedDestWorker * self)225 _disconnect_and_suspend(LogThreadedDestWorker *self)
226 {
227 _disconnect(self);
228 _suspend(self);
229 }
230
231 /* Accepts the current batch including the current message by acking it back
232 * to the source.
233 *
234 * NOTE: runs in the worker thread */
235 static void
_accept_batch(LogThreadedDestWorker * self)236 _accept_batch(LogThreadedDestWorker *self)
237 {
238 log_threaded_dest_worker_ack_messages(self, self->batch_size);
239 }
240
241 /* NOTE: runs in the worker thread */
242 static void
_drop_batch(LogThreadedDestWorker * self)243 _drop_batch(LogThreadedDestWorker *self)
244 {
245 log_threaded_dest_worker_drop_messages(self, self->batch_size);
246 }
247
248 /* NOTE: runs in the worker thread */
249 static void
_rewind_batch(LogThreadedDestWorker * self)250 _rewind_batch(LogThreadedDestWorker *self)
251 {
252 log_threaded_dest_worker_rewind_messages(self, self->batch_size);
253 }
254
255 static void
_process_result_drop(LogThreadedDestWorker * self)256 _process_result_drop(LogThreadedDestWorker *self)
257 {
258 msg_error("Message(s) dropped while sending message to destination",
259 evt_tag_str("driver", self->owner->super.super.id),
260 evt_tag_int("worker_index", self->worker_index),
261 evt_tag_int("time_reopen", self->time_reopen),
262 evt_tag_int("batch_size", self->batch_size));
263
264 _drop_batch(self);
265 _disconnect_and_suspend(self);
266 }
267
268 static void
_process_result_error(LogThreadedDestWorker * self)269 _process_result_error(LogThreadedDestWorker *self)
270 {
271 self->retries_on_error_counter++;
272
273 if (self->retries_on_error_counter >= self->owner->retries_on_error_max)
274 {
275 msg_error("Multiple failures while sending message(s) to destination, message(s) dropped",
276 evt_tag_str("driver", self->owner->super.super.id),
277 log_expr_node_location_tag(self->owner->super.super.super.expr_node),
278 evt_tag_int("worker_index", self->worker_index),
279 evt_tag_int("retries", self->retries_on_error_counter),
280 evt_tag_int("batch_size", self->batch_size));
281
282 _drop_batch(self);
283 }
284 else
285 {
286 msg_error("Error occurred while trying to send a message, trying again",
287 evt_tag_str("driver", self->owner->super.super.id),
288 log_expr_node_location_tag(self->owner->super.super.super.expr_node),
289 evt_tag_int("worker_index", self->worker_index),
290 evt_tag_int("retries", self->retries_on_error_counter),
291 evt_tag_int("time_reopen", self->time_reopen),
292 evt_tag_int("batch_size", self->batch_size));
293 _rewind_batch(self);
294 _disconnect_and_suspend(self);
295 }
296 }
297
298 static void
_process_result_not_connected(LogThreadedDestWorker * self)299 _process_result_not_connected(LogThreadedDestWorker *self)
300 {
301 msg_info("Server disconnected while preparing messages for sending, trying again",
302 evt_tag_str("driver", self->owner->super.super.id),
303 log_expr_node_location_tag(self->owner->super.super.super.expr_node),
304 evt_tag_int("worker_index", self->worker_index),
305 evt_tag_int("time_reopen", self->time_reopen),
306 evt_tag_int("batch_size", self->batch_size));
307 self->retries_counter = 0;
308 _rewind_batch(self);
309 _disconnect_and_suspend(self);
310 }
311
312 static void
_process_result_success(LogThreadedDestWorker * self)313 _process_result_success(LogThreadedDestWorker *self)
314 {
315 _accept_batch(self);
316 }
317
318 static void
_process_result_queued(LogThreadedDestWorker * self)319 _process_result_queued(LogThreadedDestWorker *self)
320 {
321 self->enable_batching = TRUE;
322 }
323
324 static void
_process_result_retry(LogThreadedDestWorker * self)325 _process_result_retry(LogThreadedDestWorker *self)
326 {
327 self->retries_counter++;
328 if (self->retries_counter >= self->owner->retries_max)
329 _process_result_not_connected(self);
330 else
331 _rewind_batch(self);
332 }
333
334 static void
_process_result(LogThreadedDestWorker * self,gint result)335 _process_result(LogThreadedDestWorker *self, gint result)
336 {
337 switch (result)
338 {
339 case LTR_DROP:
340 _process_result_drop(self);
341 break;
342
343 case LTR_ERROR:
344 _process_result_error(self);
345 break;
346
347 case LTR_NOT_CONNECTED:
348 _process_result_not_connected(self);
349 break;
350
351 case LTR_EXPLICIT_ACK_MGMT:
352 /* we require the instance to use explicit calls to ack_messages/rewind_messages */
353 break;
354
355 case LTR_SUCCESS:
356 _process_result_success(self);
357 break;
358
359 case LTR_QUEUED:
360 _process_result_queued(self);
361 break;
362
363 case LTR_RETRY:
364 _process_result_retry(self);
365 break;
366
367 default:
368 break;
369 }
370
371 }
372
373 static void
_perform_flush(LogThreadedDestWorker * self)374 _perform_flush(LogThreadedDestWorker *self)
375 {
376 /* NOTE: earlier we had a condition on only calling flush() if batch_size
377 * is non-zero. This was removed, as the language bindings that were done
378 * _before_ the batching support in LogThreadedDestDriver relies on
379 * flush() being called always, even if LTR_SUCCESS is
380 * returned, in which case batch_size is already zero at this point.
381 */
382 if (!self->suspended)
383 {
384 msg_trace("Flushing batch",
385 evt_tag_str("driver", self->owner->super.super.id),
386 evt_tag_int("worker_index", self->worker_index),
387 evt_tag_int("batch_size", self->batch_size));
388
389 LogThreadedResult result = log_threaded_dest_worker_flush(self, LTF_FLUSH_NORMAL);
390 _process_result(self, result);
391 }
392
393 iv_invalidate_now();
394 }
395
396 /* NOTE: runs in the worker thread, whenever items on our queue are
397 * available. It iterates all elements on the queue, however will terminate
398 * if the mainloop requests that we exit. */
399 static void
_perform_inserts(LogThreadedDestWorker * self)400 _perform_inserts(LogThreadedDestWorker *self)
401 {
402 LogMessage *msg;
403 LogThreadedResult result;
404 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
405
406 if (self->batch_size == 0)
407 {
408 /* first message in the batch sets the last_flush_time, so we
409 * won't expedite the flush even if the previous one was a long
410 * time ago */
411
412 iv_validate_now();
413 self->last_flush_time = iv_now;
414 }
415
416 while (G_LIKELY(!self->owner->under_termination) &&
417 !self->suspended &&
418 (msg = log_queue_pop_head(self->queue, &path_options)) != NULL)
419 {
420 msg_set_context(msg);
421 log_msg_refcache_start_consumer(msg, &path_options);
422
423 self->batch_size++;
424 ScratchBuffersMarker mark;
425 scratch_buffers_mark(&mark);
426
427 result = log_threaded_dest_worker_insert(self, msg);
428 scratch_buffers_reclaim_marked(mark);
429
430 _process_result(self, result);
431
432 if (self->enable_batching && self->batch_size >= self->owner->batch_lines)
433 _perform_flush(self);
434
435 log_msg_unref(msg);
436 msg_set_context(NULL);
437 log_msg_refcache_stop();
438
439 if (self->rewound_batch_size)
440 {
441 self->rewound_batch_size--;
442 if (self->rewound_batch_size == 0)
443 break;
444 }
445
446 iv_invalidate_now();
447 }
448 self->rewound_batch_size = 0;
449 }
450
451 /* this callback is invoked by LogQueue and is registered using
452 * log_queue_check_items(). This only gets registered if at that point
453 * we've decided to wait for the queue, e.g. the work_task is not running.
454 *
455 * This callback is invoked from the source thread, e.g. it is not safe to
456 * do anything, but ensure that our thread is woken up in response.
457 */
458 static void
_message_became_available_callback(gpointer user_data)459 _message_became_available_callback(gpointer user_data)
460 {
461 LogThreadedDestWorker *self = (LogThreadedDestWorker *) user_data;
462
463 if (!self->owner->under_termination)
464 iv_event_post(&self->wake_up_event);
465 }
466
467 static void
_schedule_restart_on_suspend_timeout(LogThreadedDestWorker * self)468 _schedule_restart_on_suspend_timeout(LogThreadedDestWorker *self)
469 {
470 iv_validate_now();
471 self->timer_reopen.expires = iv_now;
472 self->timer_reopen.expires.tv_sec += self->time_reopen;
473 iv_timer_register(&self->timer_reopen);
474 }
475
476 static void
_schedule_restart_on_batch_timeout(LogThreadedDestWorker * self)477 _schedule_restart_on_batch_timeout(LogThreadedDestWorker *self)
478 {
479 self->timer_flush.expires = self->last_flush_time;
480 timespec_add_msec(&self->timer_flush.expires, self->owner->batch_timeout);
481 iv_timer_register(&self->timer_flush);
482 }
483
484 static void
_schedule_restart(LogThreadedDestWorker * self)485 _schedule_restart(LogThreadedDestWorker *self)
486 {
487 if (self->suspended)
488 _schedule_restart_on_suspend_timeout(self);
489 else
490 iv_task_register(&self->do_work);
491 }
492
493 static void
_schedule_restart_on_next_flush(LogThreadedDestWorker * self)494 _schedule_restart_on_next_flush(LogThreadedDestWorker *self)
495 {
496 if (self->suspended)
497 _schedule_restart_on_suspend_timeout(self);
498 else if (!_should_flush_now(self))
499 _schedule_restart_on_batch_timeout(self);
500 else
501 iv_task_register(&self->do_work);
502 }
503
504 static void
_schedule_restart_on_throttle_timeout(LogThreadedDestWorker * self,gint timeout_msec)505 _schedule_restart_on_throttle_timeout(LogThreadedDestWorker *self, gint timeout_msec)
506 {
507 iv_validate_now();
508 self->timer_throttle.expires = iv_now;
509 timespec_add_msec(&self->timer_throttle.expires, timeout_msec);
510 iv_timer_register(&self->timer_throttle);
511 }
512
513 static void
_perform_work(gpointer data)514 _perform_work(gpointer data)
515 {
516 LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
517 gint timeout_msec = 0;
518
519 self->suspended = FALSE;
520 main_loop_worker_run_gc();
521 _stop_watches(self);
522
523 if (!self->connected)
524 {
525 /* try to connect and come back if successful, would be suspended otherwise. */
526 _connect(self);
527 _schedule_restart(self);
528 }
529 else if (log_queue_check_items(self->queue, &timeout_msec,
530 _message_became_available_callback,
531 self, NULL))
532 {
533
534 msg_trace("Message(s) available in queue, starting inserts",
535 evt_tag_str("driver", self->owner->super.super.id),
536 evt_tag_int("worker_index", self->worker_index));
537
538 /* Something is in the queue, buffer them up and flush (if needed) */
539 _perform_inserts(self);
540 if (_should_flush_now(self))
541 _perform_flush(self);
542 _schedule_restart(self);
543 }
544 else if (self->batch_size > 0)
545 {
546 /* nothing in the queue, but there are pending elements in the buffer
547 * (e.g. batch size != 0). perform a round of flushing. We might
548 * get back here, as the flush() routine doesn't have to flush
549 * everything. We are awoken either by the
550 * _message_became_available_callback() or if the next flush time has
551 * arrived. */
552 msg_trace("Queue empty, flushing previously buffered data",
553 evt_tag_str("driver", self->owner->super.super.id),
554 evt_tag_int("worker_index", self->worker_index));
555
556 if (_should_flush_now(self))
557 _perform_flush(self);
558 _schedule_restart_on_next_flush(self);
559 }
560 else if (timeout_msec != 0)
561 {
562 /* We probably have some items in the queue, but timeout_msec is set,
563 * indicating a throttle being active.
564 * _message_became_available_callback() is not set up in this case.
565 * we need to wake up after timeout_msec time.
566 *
567 * We are processing throttle delays _after_ we finished flushing, as
568 * items in the queue were already accepted by throttling, so they can
569 * be flushed.
570 */
571 msg_trace("Delaying output due to throttling",
572 evt_tag_int("timeout_msec", timeout_msec),
573 evt_tag_str("driver", self->owner->super.super.id),
574 evt_tag_int("worker_index", self->worker_index));
575
576 _schedule_restart_on_throttle_timeout(self, timeout_msec);
577
578 }
579 else
580 {
581 /* NOTE: at this point we are not doing anything but keep the
582 * parallel_push callback alive, which will call
583 * _message_became_available_callback(), which in turn wakes us up by
584 * posting an event which causes this function to be run again
585 *
586 * NOTE/2: the parallel_push callback may need to be cancelled if we
587 * need to exit. That happens in the shutdown_event_callback(), or
588 * here in this very function, as log_queue_check_items() will cancel
589 * outstanding parallel push callbacks automatically.
590 */
591 }
592 }
593
594 void
log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker * self)595 log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self)
596 {
597 if (self->suspended)
598 _perform_work(self);
599 }
600
601 static void
_flush_timer_cb(gpointer data)602 _flush_timer_cb(gpointer data)
603 {
604 LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
605 msg_trace("Flush timer expired",
606 evt_tag_str("driver", self->owner->super.super.id),
607 evt_tag_int("worker_index", self->worker_index),
608 evt_tag_int("batch_size", self->batch_size));
609 _perform_work(data);
610 }
611
612 /* these are events of the _worker_ thread and are not registered to the
613 * actual main thread. We basically run our workload in the handler of the
614 * do_work task, which might be invoked in a number of ways.
615 *
616 * Basic states:
617 * 1) disconnected state: _perform_work() will try to connect periodically
618 * using the suspend() mechanism, which uses a timer to get up periodically.
619 *
620 * 2) once connected:
621 * - if messages are already on the queue: flush them
622 *
623 * - if no messages are on the queue: schedule
624 * _message_became_available_callback() to be called by the LogQueue.
625 *
626 * - if there's an error, disconnect go back to the #1 state above.
627 *
628 */
629 static void
_init_watches(LogThreadedDestWorker * self)630 _init_watches(LogThreadedDestWorker *self)
631 {
632 IV_EVENT_INIT(&self->wake_up_event);
633 self->wake_up_event.cookie = self;
634 self->wake_up_event.handler = _wakeup_event_callback;
635
636 IV_EVENT_INIT(&self->shutdown_event);
637 self->shutdown_event.cookie = self;
638 self->shutdown_event.handler = _shutdown_event_callback;
639
640 IV_TIMER_INIT(&self->timer_reopen);
641 self->timer_reopen.cookie = self;
642 self->timer_reopen.handler = _perform_work;
643
644 IV_TIMER_INIT(&self->timer_throttle);
645 self->timer_throttle.cookie = self;
646 self->timer_throttle.handler = _perform_work;
647
648 IV_TIMER_INIT(&self->timer_flush);
649 self->timer_flush.cookie = self;
650 self->timer_flush.handler = _flush_timer_cb;
651
652 IV_TASK_INIT(&self->do_work);
653 self->do_work.cookie = self;
654 self->do_work.handler = _perform_work;
655 }
656
657 static void
_signal_startup_finished(LogThreadedDestWorker * self,gboolean thread_failure)658 _signal_startup_finished(LogThreadedDestWorker *self, gboolean thread_failure)
659 {
660 g_mutex_lock(&self->owner->lock);
661 self->startup_finished = TRUE;
662 self->startup_failure |= thread_failure;
663 g_cond_signal(&self->started_up);
664 g_mutex_unlock(&self->owner->lock);
665 }
666
667 static void
_signal_startup_success(LogThreadedDestWorker * self)668 _signal_startup_success(LogThreadedDestWorker *self)
669 {
670 _signal_startup_finished(self, FALSE);
671 }
672
673 static void
_signal_startup_failure(LogThreadedDestWorker * self)674 _signal_startup_failure(LogThreadedDestWorker *self)
675 {
676 _signal_startup_finished(self, TRUE);
677 }
678
679 static void
_wait_for_startup_finished(LogThreadedDestWorker * self)680 _wait_for_startup_finished(LogThreadedDestWorker *self)
681 {
682 g_mutex_lock(&self->owner->lock);
683 while (!self->startup_finished)
684 g_cond_wait(&self->started_up, &self->owner->lock);
685 g_mutex_unlock(&self->owner->lock);
686 }
687
688 static void
_register_worker_stats(LogThreadedDestWorker * self)689 _register_worker_stats(LogThreadedDestWorker *self)
690 {
691 StatsClusterKey sc_key;
692
693 stats_lock();
694 _init_stats_key(self->owner, &sc_key);
695 log_queue_register_stats_counters(self->queue, 0, &sc_key);
696 stats_unlock();
697 }
698
699 static void
_unregister_worker_stats(LogThreadedDestWorker * self)700 _unregister_worker_stats(LogThreadedDestWorker *self)
701 {
702 StatsClusterKey sc_key;
703
704 stats_lock();
705 _init_stats_key(self->owner, &sc_key);
706 log_queue_unregister_stats_counters(self->queue, &sc_key);
707 stats_unlock();
708 }
709
710 static void
_perform_final_flush(LogThreadedDestWorker * self)711 _perform_final_flush(LogThreadedDestWorker *self)
712 {
713 GlobalConfig *cfg = log_pipe_get_config(&self->owner->super.super.super);
714 LogThreadedResult result;
715 LogThreadedFlushMode mode = LTF_FLUSH_NORMAL;
716
717 if (!cfg_is_shutting_down(cfg))
718 mode = LTF_FLUSH_EXPEDITE;
719
720 result = log_threaded_dest_worker_flush(self, mode);
721 _process_result(self, result);
722 log_queue_rewind_backlog_all(self->queue);
723 }
724
725 static void
_worker_thread(gpointer arg)726 _worker_thread(gpointer arg)
727 {
728 LogThreadedDestWorker *self = (LogThreadedDestWorker *) arg;
729
730 iv_init();
731
732 msg_debug("Dedicated worker thread started",
733 evt_tag_int("worker_index", self->worker_index),
734 evt_tag_str("driver", self->owner->super.super.id),
735 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
736
737 iv_event_register(&self->wake_up_event);
738 iv_event_register(&self->shutdown_event);
739
740 if (!log_threaded_dest_worker_thread_init(self))
741 goto error;
742
743 _signal_startup_success(self);
744
745 /* if we have anything on the backlog, that was a partial, potentially
746 * not-flushed batch. Rewind it, so we start with that */
747
748 log_queue_rewind_backlog_all(self->queue);
749
750 _schedule_restart(self);
751 iv_main();
752
753 _perform_final_flush(self);
754
755 _disconnect(self);
756
757 log_threaded_dest_worker_thread_deinit(self);
758
759 msg_debug("Dedicated worker thread finished",
760 evt_tag_int("worker_index", self->worker_index),
761 evt_tag_str("driver", self->owner->super.super.id),
762 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
763
764 goto ok;
765
766 error:
767 _signal_startup_failure(self);
768 ok:
769 iv_event_unregister(&self->wake_up_event);
770 iv_event_unregister(&self->shutdown_event);
771 iv_deinit();
772 }
773
774 static gboolean
_acquire_worker_queue(LogThreadedDestWorker * self)775 _acquire_worker_queue(LogThreadedDestWorker *self)
776 {
777 gchar *persist_name = _format_queue_persist_name(self);
778 self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name);
779 g_free(persist_name);
780
781 if (!self->queue)
782 return FALSE;
783
784 log_queue_set_use_backlog(self->queue, TRUE);
785
786 return TRUE;
787 }
788
789 gboolean
log_threaded_dest_worker_init_method(LogThreadedDestWorker * self)790 log_threaded_dest_worker_init_method(LogThreadedDestWorker *self)
791 {
792 if (self->time_reopen == -1)
793 self->time_reopen = self->owner->time_reopen;
794
795 _register_worker_stats(self);
796
797 return TRUE;
798 }
799
800 void
log_threaded_dest_worker_deinit_method(LogThreadedDestWorker * self)801 log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self)
802 {
803 _unregister_worker_stats(self);
804 }
805
806 void
log_threaded_dest_worker_free_method(LogThreadedDestWorker * self)807 log_threaded_dest_worker_free_method(LogThreadedDestWorker *self)
808 {
809 g_cond_clear(&self->started_up);
810 }
811
812 void
log_threaded_dest_worker_init_instance(LogThreadedDestWorker * self,LogThreadedDestDriver * owner,gint worker_index)813 log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self, LogThreadedDestDriver *owner, gint worker_index)
814 {
815 self->worker_index = worker_index;
816 self->thread_init = log_threaded_dest_worker_init_method;
817 self->thread_deinit = log_threaded_dest_worker_deinit_method;
818 self->free_fn = log_threaded_dest_worker_free_method;
819 self->owner = owner;
820 self->time_reopen = -1;
821 g_cond_init(&self->started_up);
822 _init_watches(self);
823 }
824
825 void
log_threaded_dest_worker_free(LogThreadedDestWorker * self)826 log_threaded_dest_worker_free(LogThreadedDestWorker *self)
827 {
828 if (self->free_fn)
829 self->free_fn(self);
830 g_free(self);
831 }
832
833 /* LogThreadedDestDriver */
834
835 void
log_threaded_dest_driver_set_num_workers(LogDriver * s,gint num_workers)836 log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers)
837 {
838 LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
839
840 self->num_workers = num_workers;
841 }
842
843 /* compatibility bridge between LogThreadedDestWorker */
844
845 static gboolean
_compat_thread_init(LogThreadedDestWorker * self)846 _compat_thread_init(LogThreadedDestWorker *self)
847 {
848 if (!log_threaded_dest_worker_init_method(self))
849 return FALSE;
850
851 /* NOTE: driver level thread_init() didn't have a gboolean return */
852 if (self->owner->worker.thread_init)
853 self->owner->worker.thread_init(self->owner);
854 return TRUE;
855 }
856
857 static void
_compat_thread_deinit(LogThreadedDestWorker * self)858 _compat_thread_deinit(LogThreadedDestWorker *self)
859 {
860 if (self->owner->worker.thread_deinit)
861 self->owner->worker.thread_deinit(self->owner);
862 log_threaded_dest_worker_deinit_method(self);
863 }
864
865 static gboolean
_compat_connect(LogThreadedDestWorker * self)866 _compat_connect(LogThreadedDestWorker *self)
867 {
868 if (self->owner->worker.connect)
869 return self->owner->worker.connect(self->owner);
870 return TRUE;
871 }
872
873 static void
_compat_disconnect(LogThreadedDestWorker * self)874 _compat_disconnect(LogThreadedDestWorker *self)
875 {
876 if (self->owner->worker.disconnect)
877 self->owner->worker.disconnect(self->owner);
878 }
879
880 static LogThreadedResult
_compat_insert(LogThreadedDestWorker * self,LogMessage * msg)881 _compat_insert(LogThreadedDestWorker *self, LogMessage *msg)
882 {
883 return self->owner->worker.insert(self->owner, msg);
884 }
885
886 static LogThreadedResult
_compat_flush(LogThreadedDestWorker * self,LogThreadedFlushMode mode)887 _compat_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
888 {
889 if (self->owner->worker.flush)
890 return self->owner->worker.flush(self->owner);
891 return LTR_SUCCESS;
892 }
893
894 static void
_init_worker_compat_layer(LogThreadedDestWorker * self)895 _init_worker_compat_layer(LogThreadedDestWorker *self)
896 {
897 self->thread_init = _compat_thread_init;
898 self->thread_deinit = _compat_thread_deinit;
899 self->connect = _compat_connect;
900 self->disconnect = _compat_disconnect;
901 self->insert = _compat_insert;
902 self->flush = _compat_flush;
903 }
904
905 static gboolean
_is_worker_compat_mode(LogThreadedDestDriver * self)906 _is_worker_compat_mode(LogThreadedDestDriver *self)
907 {
908 return !self->worker.construct;
909 }
910
911 /* temporary function until proper LogThreadedDestWorker allocation logic is
912 * created. Right now it is just using a singleton within the driver */
913 static LogThreadedDestWorker *
_construct_worker(LogThreadedDestDriver * self,gint worker_index)914 _construct_worker(LogThreadedDestDriver *self, gint worker_index)
915 {
916 if (_is_worker_compat_mode(self))
917 {
918 /* kick in the compat layer, this case self->worker.instance is the
919 * single worker we have and all Worker related state is in the
920 * (derived) Driver class. */
921
922 return &self->worker.instance;
923 }
924 return self->worker.construct(self, worker_index);
925 }
926
927 static void
_request_worker_exit(gpointer s)928 _request_worker_exit(gpointer s)
929 {
930 LogThreadedDestWorker *self = (LogThreadedDestWorker *) s;
931
932 msg_debug("Shutting down dedicated worker thread",
933 evt_tag_int("worker_index", self->worker_index),
934 evt_tag_str("driver", self->owner->super.super.id),
935 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
936 self->owner->under_termination = TRUE;
937 iv_event_post(&self->shutdown_event);
938 }
939
940 static gboolean
_start_worker_thread(LogThreadedDestWorker * self)941 _start_worker_thread(LogThreadedDestWorker *self)
942 {
943 msg_debug("Starting dedicated worker thread",
944 evt_tag_int("worker_index", self->worker_index),
945 evt_tag_str("driver", self->owner->super.super.id),
946 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
947
948 main_loop_create_worker_thread(_worker_thread,
949 _request_worker_exit,
950 self, &self->owner->worker_options);
951 _wait_for_startup_finished(self);
952 return !self->startup_failure;
953 }
954
955 void
log_threaded_dest_driver_set_max_retries_on_error(LogDriver * s,gint max_retries)956 log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries)
957 {
958 LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
959
960 self->retries_on_error_max = max_retries;
961 }
962
963 LogThreadedDestWorker *
_lookup_worker(LogThreadedDestDriver * self,LogMessage * msg)964 _lookup_worker(LogThreadedDestDriver *self, LogMessage *msg)
965 {
966 gint worker_index = self->last_worker % self->num_workers;
967 self->last_worker++;
968
969 /* here would come the lookup mechanism that maps msg -> worker that doesn't exist yet. */
970 return self->workers[worker_index];
971 }
972
973 /* the feeding side of the driver, runs in the source thread and puts an
974 * incoming message to the associated queue.
975 */
976 static void
log_threaded_dest_driver_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)977 log_threaded_dest_driver_queue(LogPipe *s, LogMessage *msg,
978 const LogPathOptions *path_options)
979 {
980 LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
981 LogThreadedDestWorker *dw = _lookup_worker(self, msg);
982 LogPathOptions local_options;
983
984 if (!path_options->flow_control_requested)
985 path_options = log_msg_break_ack(msg, path_options, &local_options);
986
987 log_msg_add_ack(msg, path_options);
988 log_queue_push_tail(dw->queue, log_msg_ref(msg), path_options);
989
990 stats_counter_inc(self->processed_messages);
991
992 log_dest_driver_queue_method(s, msg, path_options);
993 }
994
995 static void
_init_stats_key(LogThreadedDestDriver * self,StatsClusterKey * sc_key)996 _init_stats_key(LogThreadedDestDriver *self, StatsClusterKey *sc_key)
997 {
998 stats_cluster_logpipe_key_set(sc_key, self->stats_source | SCS_DESTINATION,
999 self->super.super.id,
1000 self->format_stats_instance(self));
1001 }
1002
1003 void
log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver * self,gsize len)1004 log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len)
1005 {
1006 stats_aggregator_insert_data(self->max_message_size, len);
1007 stats_aggregator_insert_data(self->average_messages_size, len);
1008 }
1009
1010 void
log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver * self,gsize len)1011 log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len)
1012 {
1013 stats_aggregator_insert_data(self->max_batch_size, len);
1014 stats_aggregator_insert_data(self->average_batch_size, len);
1015 }
1016
1017 void
log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver * self)1018 log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self)
1019 {
1020 StatsClusterKey sc_key_eps_input;
1021 _init_stats_key(self, &sc_key_eps_input);
1022 stats_aggregator_lock();
1023 StatsClusterKey sc_key;
1024
1025 stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1026 self->format_stats_instance(self), "msg_size_max");
1027 stats_register_aggregator_maximum(0, &sc_key, &self->max_message_size);
1028
1029 stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1030 self->format_stats_instance(self), "msg_size_avg");
1031 stats_register_aggregator_average(0, &sc_key, &self->average_messages_size);
1032
1033 stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1034 self->format_stats_instance(self), "batch_size_max");
1035 stats_register_aggregator_maximum(0, &sc_key, &self->max_batch_size);
1036
1037 stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1038 self->format_stats_instance(self), "batch_size_avg");
1039 stats_register_aggregator_average(0, &sc_key, &self->average_batch_size);
1040
1041 stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1042 self->format_stats_instance(self), "eps");
1043 stats_register_aggregator_cps(0, &sc_key, &sc_key_eps_input, SC_TYPE_WRITTEN, &self->CPS);
1044
1045 stats_aggregator_unlock();
1046 }
1047
1048 void
log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver * self)1049 log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self)
1050 {
1051 stats_aggregator_lock();
1052
1053 stats_unregister_aggregator_maximum(&self->max_message_size);
1054 stats_unregister_aggregator_average(&self->average_messages_size);
1055 stats_unregister_aggregator_maximum(&self->max_batch_size);
1056 stats_unregister_aggregator_average(&self->average_batch_size);
1057 stats_unregister_aggregator_cps(&self->CPS);
1058
1059 stats_aggregator_unlock();
1060 }
1061
1062 static void
_register_stats(LogThreadedDestDriver * self)1063 _register_stats(LogThreadedDestDriver *self)
1064 {
1065 stats_lock();
1066 {
1067 StatsClusterKey sc_key;
1068
1069 _init_stats_key(self, &sc_key);
1070 stats_register_counter(0, &sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1071 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1072 stats_register_counter(0, &sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1073
1074 }
1075 stats_unlock();
1076 }
1077
1078 static void
_unregister_stats(LogThreadedDestDriver * self)1079 _unregister_stats(LogThreadedDestDriver *self)
1080 {
1081 stats_lock();
1082 {
1083 StatsClusterKey sc_key;
1084
1085 _init_stats_key(self, &sc_key);
1086 stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1087 stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1088 stats_unregister_counter(&sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1089
1090 }
1091 stats_unlock();
1092 }
1093
1094 static gchar *
_format_seqnum_persist_name(LogThreadedDestDriver * self)1095 _format_seqnum_persist_name(LogThreadedDestDriver *self)
1096 {
1097 static gchar persist_name[256];
1098
1099 g_snprintf(persist_name, sizeof(persist_name), "%s.seqnum",
1100 self->super.super.super.generate_persist_name((const LogPipe *)self));
1101
1102 return persist_name;
1103 }
1104
1105 static gboolean
_create_workers(LogThreadedDestDriver * self)1106 _create_workers(LogThreadedDestDriver *self)
1107 {
1108 /* free previous workers array if set to cope with num_workers change */
1109 g_free(self->workers);
1110 self->workers = g_new0(LogThreadedDestWorker *, self->num_workers);
1111
1112 for (self->created_workers = 0; self->created_workers < self->num_workers; self->created_workers++)
1113 {
1114 LogThreadedDestWorker *dw = _construct_worker(self, self->created_workers);
1115
1116 self->workers[self->created_workers] = dw;
1117 if (!_acquire_worker_queue(dw))
1118 return FALSE;
1119 }
1120
1121 return TRUE;
1122 }
1123
1124 gboolean
log_threaded_dest_driver_init_method(LogPipe * s)1125 log_threaded_dest_driver_init_method(LogPipe *s)
1126 {
1127 LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1128 GlobalConfig *cfg = log_pipe_get_config(s);
1129
1130 if (!log_dest_driver_init_method(&self->super.super.super))
1131 return FALSE;
1132
1133 self->under_termination = FALSE;
1134
1135 if (self->time_reopen == -1)
1136 self->time_reopen = cfg->time_reopen;
1137
1138 self->shared_seq_num = GPOINTER_TO_INT(cfg_persist_config_fetch(cfg,
1139 _format_seqnum_persist_name(self)));
1140 if (!self->shared_seq_num)
1141 init_sequence_number(&self->shared_seq_num);
1142
1143 _register_stats(self);
1144
1145 if (!_create_workers(self))
1146 return FALSE;
1147
1148 return TRUE;
1149 }
1150
1151 /* This method is only used when a LogThreadedDestDriver is directly used
1152 * without overriding its on_config_inited method. If there's an overridden
1153 * method, the caller is responsible for explicitly calling _start_workers() at
1154 * the end of on_config_inited(). */
1155 gboolean
log_threaded_dest_driver_start_workers(LogPipe * s)1156 log_threaded_dest_driver_start_workers(LogPipe *s)
1157 {
1158 LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
1159
1160 for (gint worker_index = 0; worker_index < self->num_workers; worker_index++)
1161 {
1162 if (!_start_worker_thread(self->workers[worker_index]))
1163 return FALSE;
1164 }
1165 return TRUE;
1166 }
1167
1168 gboolean
log_threaded_dest_driver_deinit_method(LogPipe * s)1169 log_threaded_dest_driver_deinit_method(LogPipe *s)
1170 {
1171 LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1172
1173 /* NOTE: workers are shut down by the time we get here, through the
1174 * request_exit mechanism of main loop worker threads */
1175
1176 cfg_persist_config_add(log_pipe_get_config(s),
1177 _format_seqnum_persist_name(self),
1178 GINT_TO_POINTER(self->shared_seq_num), NULL, FALSE);
1179
1180 _unregister_stats(self);
1181
1182 if (!_is_worker_compat_mode(self))
1183 {
1184 for (int i = 0; i < self->created_workers; i++)
1185 log_threaded_dest_worker_free(self->workers[i]);
1186 }
1187
1188 return log_dest_driver_deinit_method(s);
1189 }
1190
1191
1192 void
log_threaded_dest_driver_free(LogPipe * s)1193 log_threaded_dest_driver_free(LogPipe *s)
1194 {
1195 LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1196
1197 log_threaded_dest_worker_free_method(&self->worker.instance);
1198 g_mutex_clear(&self->lock);
1199 g_free(self->workers);
1200 log_dest_driver_free((LogPipe *)self);
1201 }
1202
1203 void
log_threaded_dest_driver_init_instance(LogThreadedDestDriver * self,GlobalConfig * cfg)1204 log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg)
1205 {
1206 log_dest_driver_init_instance(&self->super, cfg);
1207
1208 self->worker_options.is_output_thread = TRUE;
1209
1210 self->super.super.super.init = log_threaded_dest_driver_init_method;
1211 self->super.super.super.deinit = log_threaded_dest_driver_deinit_method;
1212 self->super.super.super.queue = log_threaded_dest_driver_queue;
1213 self->super.super.super.free_fn = log_threaded_dest_driver_free;
1214 self->super.super.super.on_config_inited = log_threaded_dest_driver_start_workers;
1215 self->time_reopen = -1;
1216 self->batch_lines = -1;
1217 self->batch_timeout = -1;
1218 self->num_workers = 1;
1219 self->last_worker = 0;
1220
1221 self->retries_on_error_max = MAX_RETRIES_ON_ERROR_DEFAULT;
1222 self->retries_max = MAX_RETRIES_BEFORE_SUSPEND_DEFAULT;
1223 g_mutex_init(&self->lock);
1224 log_threaded_dest_worker_init_instance(&self->worker.instance, self, 0);
1225 _init_worker_compat_layer(&self->worker.instance);
1226 }
1227