1 /*
2  * Copyright (c) 2013, 2014 Balabit
3  * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "stats/stats-cluster-logpipe.h"
26 #include "stats/stats-cluster-single.h"
27 #include "stats/aggregator/stats-aggregator-registry.h"
28 #include "logthrdestdrv.h"
29 #include "seqnum.h"
30 #include "scratch-buffers.h"
31 #include "timeutils/misc.h"
32 
33 #define MAX_RETRIES_ON_ERROR_DEFAULT 3
34 #define MAX_RETRIES_BEFORE_SUSPEND_DEFAULT 3
35 
36 static void _init_stats_key(LogThreadedDestDriver *self, StatsClusterKey *sc_key);
37 
38 const gchar *
log_threaded_result_to_str(LogThreadedResult self)39 log_threaded_result_to_str(LogThreadedResult self)
40 {
41   g_assert(self <= LTR_MAX);
42 
43   static const gchar *as_str[] = { "DROP",
44                                    "ERROR",
45                                    "EXPLICIT_ACK_MGMT",
46                                    "SUCCESS",
47                                    "QUEUED",
48                                    "NOT_CONNECTED",
49                                    "RETRY",
50                                    "MAX"
51                                  };
52 
53   return as_str[self];
54 }
55 
56 /* LogThreadedDestWorker */
57 
58 void
log_threaded_dest_driver_set_batch_lines(LogDriver * s,gint batch_lines)59 log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines)
60 {
61   LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
62 
63   self->batch_lines = batch_lines;
64 }
65 
66 void
log_threaded_dest_driver_set_batch_timeout(LogDriver * s,gint batch_timeout)67 log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout)
68 {
69   LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
70 
71   self->batch_timeout = batch_timeout;
72 }
73 
74 void
log_threaded_dest_driver_set_time_reopen(LogDriver * s,time_t time_reopen)75 log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
76 {
77   LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
78 
79   self->time_reopen = time_reopen;
80 }
81 
82 /* this should be used in combination with LTR_EXPLICIT_ACK_MGMT to actually confirm message delivery. */
83 void
log_threaded_dest_worker_ack_messages(LogThreadedDestWorker * self,gint batch_size)84 log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size)
85 {
86   log_queue_ack_backlog(self->queue, batch_size);
87   stats_counter_add(self->owner->written_messages, batch_size);
88   self->retries_on_error_counter = 0;
89   self->batch_size -= batch_size;
90 }
91 
92 void
log_threaded_dest_worker_drop_messages(LogThreadedDestWorker * self,gint batch_size)93 log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size)
94 {
95   log_queue_ack_backlog(self->queue, batch_size);
96   stats_counter_add(self->owner->dropped_messages, batch_size);
97   self->retries_on_error_counter = 0;
98   self->batch_size -= batch_size;
99 }
100 
101 void
log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker * self,gint batch_size)102 log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size)
103 {
104   log_queue_rewind_backlog(self->queue, batch_size);
105   self->rewound_batch_size = self->batch_size;
106   self->batch_size -= batch_size;
107 }
108 
109 static gchar *
_format_queue_persist_name(LogThreadedDestWorker * self)110 _format_queue_persist_name(LogThreadedDestWorker *self)
111 {
112   LogPipe *owner = &self->owner->super.super.super;
113 
114   if (self->worker_index == 0)
115     {
116       /* the first worker uses the legacy persist name, e.g.  to be able to
117        * recover the queue previously used.  */
118       return g_strdup(log_pipe_get_persist_name(owner));
119     }
120   else
121     {
122       return g_strdup_printf("%s.%d.queue",
123                              log_pipe_get_persist_name(owner),
124                              self->worker_index);
125     }
126 }
127 
128 
129 static gboolean
_should_flush_now(LogThreadedDestWorker * self)130 _should_flush_now(LogThreadedDestWorker *self)
131 {
132   struct timespec now;
133   glong diff;
134 
135   if (self->owner->batch_timeout <= 0 ||
136       self->owner->batch_lines <= 1 ||
137       !self->enable_batching)
138     return TRUE;
139 
140   iv_validate_now();
141   now = iv_now;
142   diff = timespec_diff_msec(&now, &self->last_flush_time);
143 
144   return (diff >= self->owner->batch_timeout);
145 }
146 
147 static void
_stop_watches(LogThreadedDestWorker * self)148 _stop_watches(LogThreadedDestWorker *self)
149 {
150   if (iv_task_registered(&self->do_work))
151     {
152       iv_task_unregister(&self->do_work);
153     }
154   if (iv_timer_registered(&self->timer_reopen))
155     {
156       iv_timer_unregister(&self->timer_reopen);
157     }
158   if (iv_timer_registered(&self->timer_throttle))
159     {
160       iv_timer_unregister(&self->timer_throttle);
161     }
162   if (iv_timer_registered(&self->timer_flush))
163     {
164       iv_timer_unregister(&self->timer_flush);
165     }
166 }
167 
168 /* NOTE: runs in the worker thread in response to a wakeup event being
169  * posted, which happens if a new element is added to our queue while we
170  * were sleeping */
171 static void
_wakeup_event_callback(gpointer data)172 _wakeup_event_callback(gpointer data)
173 {
174   LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
175 
176   if (!iv_task_registered(&self->do_work))
177     {
178       iv_task_register(&self->do_work);
179     }
180 }
181 
182 /* NOTE: runs in the worker thread in response to the shutdown event being
183  * posted.  The shutdown event is initiated by the mainloop when the
184  * configuration is deinited */
185 static void
_shutdown_event_callback(gpointer data)186 _shutdown_event_callback(gpointer data)
187 {
188   LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
189 
190   log_queue_reset_parallel_push(self->queue);
191   _stop_watches(self);
192   iv_quit();
193 }
194 
195 /* NOTE: runs in the worker thread */
196 static void
_suspend(LogThreadedDestWorker * self)197 _suspend(LogThreadedDestWorker *self)
198 {
199   self->suspended = TRUE;
200 }
201 
202 /* NOTE: runs in the worker thread */
203 static void
_connect(LogThreadedDestWorker * self)204 _connect(LogThreadedDestWorker *self)
205 {
206   if (!log_threaded_dest_worker_connect(self))
207     {
208       msg_debug("Error establishing connection to server",
209                 evt_tag_str("driver", self->owner->super.super.id),
210                 evt_tag_int("worker_index", self->worker_index),
211                 log_expr_node_location_tag(self->owner->super.super.super.expr_node));
212       _suspend(self);
213     }
214 }
215 
216 /* NOTE: runs in the worker thread */
217 static void
_disconnect(LogThreadedDestWorker * self)218 _disconnect(LogThreadedDestWorker *self)
219 {
220   log_threaded_dest_worker_disconnect(self);
221 }
222 
223 /* NOTE: runs in the worker thread */
224 static void
_disconnect_and_suspend(LogThreadedDestWorker * self)225 _disconnect_and_suspend(LogThreadedDestWorker *self)
226 {
227   _disconnect(self);
228   _suspend(self);
229 }
230 
231 /* Accepts the current batch including the current message by acking it back
232  * to the source.
233  *
234  * NOTE: runs in the worker thread */
235 static void
_accept_batch(LogThreadedDestWorker * self)236 _accept_batch(LogThreadedDestWorker *self)
237 {
238   log_threaded_dest_worker_ack_messages(self, self->batch_size);
239 }
240 
241 /* NOTE: runs in the worker thread */
242 static void
_drop_batch(LogThreadedDestWorker * self)243 _drop_batch(LogThreadedDestWorker *self)
244 {
245   log_threaded_dest_worker_drop_messages(self, self->batch_size);
246 }
247 
248 /* NOTE: runs in the worker thread */
249 static void
_rewind_batch(LogThreadedDestWorker * self)250 _rewind_batch(LogThreadedDestWorker *self)
251 {
252   log_threaded_dest_worker_rewind_messages(self, self->batch_size);
253 }
254 
255 static void
_process_result_drop(LogThreadedDestWorker * self)256 _process_result_drop(LogThreadedDestWorker *self)
257 {
258   msg_error("Message(s) dropped while sending message to destination",
259             evt_tag_str("driver", self->owner->super.super.id),
260             evt_tag_int("worker_index", self->worker_index),
261             evt_tag_int("time_reopen", self->time_reopen),
262             evt_tag_int("batch_size", self->batch_size));
263 
264   _drop_batch(self);
265   _disconnect_and_suspend(self);
266 }
267 
268 static void
_process_result_error(LogThreadedDestWorker * self)269 _process_result_error(LogThreadedDestWorker *self)
270 {
271   self->retries_on_error_counter++;
272 
273   if (self->retries_on_error_counter >= self->owner->retries_on_error_max)
274     {
275       msg_error("Multiple failures while sending message(s) to destination, message(s) dropped",
276                 evt_tag_str("driver", self->owner->super.super.id),
277                 log_expr_node_location_tag(self->owner->super.super.super.expr_node),
278                 evt_tag_int("worker_index", self->worker_index),
279                 evt_tag_int("retries", self->retries_on_error_counter),
280                 evt_tag_int("batch_size", self->batch_size));
281 
282       _drop_batch(self);
283     }
284   else
285     {
286       msg_error("Error occurred while trying to send a message, trying again",
287                 evt_tag_str("driver", self->owner->super.super.id),
288                 log_expr_node_location_tag(self->owner->super.super.super.expr_node),
289                 evt_tag_int("worker_index", self->worker_index),
290                 evt_tag_int("retries", self->retries_on_error_counter),
291                 evt_tag_int("time_reopen", self->time_reopen),
292                 evt_tag_int("batch_size", self->batch_size));
293       _rewind_batch(self);
294       _disconnect_and_suspend(self);
295     }
296 }
297 
298 static void
_process_result_not_connected(LogThreadedDestWorker * self)299 _process_result_not_connected(LogThreadedDestWorker *self)
300 {
301   msg_info("Server disconnected while preparing messages for sending, trying again",
302            evt_tag_str("driver", self->owner->super.super.id),
303            log_expr_node_location_tag(self->owner->super.super.super.expr_node),
304            evt_tag_int("worker_index", self->worker_index),
305            evt_tag_int("time_reopen", self->time_reopen),
306            evt_tag_int("batch_size", self->batch_size));
307   self->retries_counter = 0;
308   _rewind_batch(self);
309   _disconnect_and_suspend(self);
310 }
311 
312 static void
_process_result_success(LogThreadedDestWorker * self)313 _process_result_success(LogThreadedDestWorker *self)
314 {
315   _accept_batch(self);
316 }
317 
318 static void
_process_result_queued(LogThreadedDestWorker * self)319 _process_result_queued(LogThreadedDestWorker *self)
320 {
321   self->enable_batching = TRUE;
322 }
323 
324 static void
_process_result_retry(LogThreadedDestWorker * self)325 _process_result_retry(LogThreadedDestWorker *self)
326 {
327   self->retries_counter++;
328   if (self->retries_counter >= self->owner->retries_max)
329     _process_result_not_connected(self);
330   else
331     _rewind_batch(self);
332 }
333 
334 static void
_process_result(LogThreadedDestWorker * self,gint result)335 _process_result(LogThreadedDestWorker *self, gint result)
336 {
337   switch (result)
338     {
339     case LTR_DROP:
340       _process_result_drop(self);
341       break;
342 
343     case LTR_ERROR:
344       _process_result_error(self);
345       break;
346 
347     case LTR_NOT_CONNECTED:
348       _process_result_not_connected(self);
349       break;
350 
351     case LTR_EXPLICIT_ACK_MGMT:
352       /* we require the instance to use explicit calls to ack_messages/rewind_messages */
353       break;
354 
355     case LTR_SUCCESS:
356       _process_result_success(self);
357       break;
358 
359     case LTR_QUEUED:
360       _process_result_queued(self);
361       break;
362 
363     case LTR_RETRY:
364       _process_result_retry(self);
365       break;
366 
367     default:
368       break;
369     }
370 
371 }
372 
373 static void
_perform_flush(LogThreadedDestWorker * self)374 _perform_flush(LogThreadedDestWorker *self)
375 {
376   /* NOTE: earlier we had a condition on only calling flush() if batch_size
377    * is non-zero.  This was removed, as the language bindings that were done
378    * _before_ the batching support in LogThreadedDestDriver relies on
379    * flush() being called always, even if LTR_SUCCESS is
380    * returned, in which case batch_size is already zero at this point.
381    */
382   if (!self->suspended)
383     {
384       msg_trace("Flushing batch",
385                 evt_tag_str("driver", self->owner->super.super.id),
386                 evt_tag_int("worker_index", self->worker_index),
387                 evt_tag_int("batch_size", self->batch_size));
388 
389       LogThreadedResult result = log_threaded_dest_worker_flush(self, LTF_FLUSH_NORMAL);
390       _process_result(self, result);
391     }
392 
393   iv_invalidate_now();
394 }
395 
396 /* NOTE: runs in the worker thread, whenever items on our queue are
397  * available. It iterates all elements on the queue, however will terminate
398  * if the mainloop requests that we exit. */
399 static void
_perform_inserts(LogThreadedDestWorker * self)400 _perform_inserts(LogThreadedDestWorker *self)
401 {
402   LogMessage *msg;
403   LogThreadedResult result;
404   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
405 
406   if (self->batch_size == 0)
407     {
408       /* first message in the batch sets the last_flush_time, so we
409        * won't expedite the flush even if the previous one was a long
410        * time ago */
411 
412       iv_validate_now();
413       self->last_flush_time = iv_now;
414     }
415 
416   while (G_LIKELY(!self->owner->under_termination) &&
417          !self->suspended &&
418          (msg = log_queue_pop_head(self->queue, &path_options)) != NULL)
419     {
420       msg_set_context(msg);
421       log_msg_refcache_start_consumer(msg, &path_options);
422 
423       self->batch_size++;
424       ScratchBuffersMarker mark;
425       scratch_buffers_mark(&mark);
426 
427       result = log_threaded_dest_worker_insert(self, msg);
428       scratch_buffers_reclaim_marked(mark);
429 
430       _process_result(self, result);
431 
432       if (self->enable_batching && self->batch_size >= self->owner->batch_lines)
433         _perform_flush(self);
434 
435       log_msg_unref(msg);
436       msg_set_context(NULL);
437       log_msg_refcache_stop();
438 
439       if (self->rewound_batch_size)
440         {
441           self->rewound_batch_size--;
442           if (self->rewound_batch_size == 0)
443             break;
444         }
445 
446       iv_invalidate_now();
447     }
448   self->rewound_batch_size = 0;
449 }
450 
451 /* this callback is invoked by LogQueue and is registered using
452  * log_queue_check_items().  This only gets registered if at that point
453  * we've decided to wait for the queue, e.g.  the work_task is not running.
454  *
455  * This callback is invoked from the source thread, e.g.  it is not safe to
456  * do anything, but ensure that our thread is woken up in response.
457  */
458 static void
_message_became_available_callback(gpointer user_data)459 _message_became_available_callback(gpointer user_data)
460 {
461   LogThreadedDestWorker *self = (LogThreadedDestWorker *) user_data;
462 
463   if (!self->owner->under_termination)
464     iv_event_post(&self->wake_up_event);
465 }
466 
467 static void
_schedule_restart_on_suspend_timeout(LogThreadedDestWorker * self)468 _schedule_restart_on_suspend_timeout(LogThreadedDestWorker *self)
469 {
470   iv_validate_now();
471   self->timer_reopen.expires  = iv_now;
472   self->timer_reopen.expires.tv_sec += self->time_reopen;
473   iv_timer_register(&self->timer_reopen);
474 }
475 
476 static void
_schedule_restart_on_batch_timeout(LogThreadedDestWorker * self)477 _schedule_restart_on_batch_timeout(LogThreadedDestWorker *self)
478 {
479   self->timer_flush.expires = self->last_flush_time;
480   timespec_add_msec(&self->timer_flush.expires, self->owner->batch_timeout);
481   iv_timer_register(&self->timer_flush);
482 }
483 
484 static void
_schedule_restart(LogThreadedDestWorker * self)485 _schedule_restart(LogThreadedDestWorker *self)
486 {
487   if (self->suspended)
488     _schedule_restart_on_suspend_timeout(self);
489   else
490     iv_task_register(&self->do_work);
491 }
492 
493 static void
_schedule_restart_on_next_flush(LogThreadedDestWorker * self)494 _schedule_restart_on_next_flush(LogThreadedDestWorker *self)
495 {
496   if (self->suspended)
497     _schedule_restart_on_suspend_timeout(self);
498   else if (!_should_flush_now(self))
499     _schedule_restart_on_batch_timeout(self);
500   else
501     iv_task_register(&self->do_work);
502 }
503 
504 static void
_schedule_restart_on_throttle_timeout(LogThreadedDestWorker * self,gint timeout_msec)505 _schedule_restart_on_throttle_timeout(LogThreadedDestWorker *self, gint timeout_msec)
506 {
507   iv_validate_now();
508   self->timer_throttle.expires = iv_now;
509   timespec_add_msec(&self->timer_throttle.expires, timeout_msec);
510   iv_timer_register(&self->timer_throttle);
511 }
512 
513 static void
_perform_work(gpointer data)514 _perform_work(gpointer data)
515 {
516   LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
517   gint timeout_msec = 0;
518 
519   self->suspended = FALSE;
520   main_loop_worker_run_gc();
521   _stop_watches(self);
522 
523   if (!self->connected)
524     {
525       /* try to connect and come back if successful, would be suspended otherwise. */
526       _connect(self);
527       _schedule_restart(self);
528     }
529   else if (log_queue_check_items(self->queue, &timeout_msec,
530                                  _message_became_available_callback,
531                                  self, NULL))
532     {
533 
534       msg_trace("Message(s) available in queue, starting inserts",
535                 evt_tag_str("driver", self->owner->super.super.id),
536                 evt_tag_int("worker_index", self->worker_index));
537 
538       /* Something is in the queue, buffer them up and flush (if needed) */
539       _perform_inserts(self);
540       if (_should_flush_now(self))
541         _perform_flush(self);
542       _schedule_restart(self);
543     }
544   else if (self->batch_size > 0)
545     {
546       /* nothing in the queue, but there are pending elements in the buffer
547        * (e.g.  batch size != 0).  perform a round of flushing.  We might
548        * get back here, as the flush() routine doesn't have to flush
549        * everything.  We are awoken either by the
550        * _message_became_available_callback() or if the next flush time has
551        * arrived.  */
552       msg_trace("Queue empty, flushing previously buffered data",
553                 evt_tag_str("driver", self->owner->super.super.id),
554                 evt_tag_int("worker_index", self->worker_index));
555 
556       if (_should_flush_now(self))
557         _perform_flush(self);
558       _schedule_restart_on_next_flush(self);
559     }
560   else if (timeout_msec != 0)
561     {
562       /* We probably have some items in the queue, but timeout_msec is set,
563        * indicating a throttle being active.
564        * _message_became_available_callback() is not set up in this case.
565        * we need to wake up after timeout_msec time.
566        *
567        * We are processing throttle delays _after_ we finished flushing, as
568        * items in the queue were already accepted by throttling, so they can
569        * be flushed.
570        */
571       msg_trace("Delaying output due to throttling",
572                 evt_tag_int("timeout_msec", timeout_msec),
573                 evt_tag_str("driver", self->owner->super.super.id),
574                 evt_tag_int("worker_index", self->worker_index));
575 
576       _schedule_restart_on_throttle_timeout(self, timeout_msec);
577 
578     }
579   else
580     {
581       /* NOTE: at this point we are not doing anything but keep the
582        * parallel_push callback alive, which will call
583        * _message_became_available_callback(), which in turn wakes us up by
584        * posting an event which causes this function to be run again
585        *
586        * NOTE/2: the parallel_push callback may need to be cancelled if we
587        * need to exit.  That happens in the shutdown_event_callback(), or
588        * here in this very function, as log_queue_check_items() will cancel
589        * outstanding parallel push callbacks automatically.
590        */
591     }
592 }
593 
594 void
log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker * self)595 log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self)
596 {
597   if (self->suspended)
598     _perform_work(self);
599 }
600 
601 static void
_flush_timer_cb(gpointer data)602 _flush_timer_cb(gpointer data)
603 {
604   LogThreadedDestWorker *self = (LogThreadedDestWorker *) data;
605   msg_trace("Flush timer expired",
606             evt_tag_str("driver", self->owner->super.super.id),
607             evt_tag_int("worker_index", self->worker_index),
608             evt_tag_int("batch_size", self->batch_size));
609   _perform_work(data);
610 }
611 
612 /* these are events of the _worker_ thread and are not registered to the
613  * actual main thread.  We basically run our workload in the handler of the
614  * do_work task, which might be invoked in a number of ways.
615  *
616  * Basic states:
617  *   1) disconnected state: _perform_work() will try to connect periodically
618  *      using the suspend() mechanism, which uses a timer to get up periodically.
619  *
620  *   2) once connected:
621  *      - if messages are already on the queue: flush them
622  *
623  *      - if no messages are on the queue: schedule
624  *        _message_became_available_callback() to be called by the LogQueue.
625  *
626  *      - if there's an error, disconnect go back to the #1 state above.
627  *
628  */
629 static void
_init_watches(LogThreadedDestWorker * self)630 _init_watches(LogThreadedDestWorker *self)
631 {
632   IV_EVENT_INIT(&self->wake_up_event);
633   self->wake_up_event.cookie = self;
634   self->wake_up_event.handler = _wakeup_event_callback;
635 
636   IV_EVENT_INIT(&self->shutdown_event);
637   self->shutdown_event.cookie = self;
638   self->shutdown_event.handler = _shutdown_event_callback;
639 
640   IV_TIMER_INIT(&self->timer_reopen);
641   self->timer_reopen.cookie = self;
642   self->timer_reopen.handler = _perform_work;
643 
644   IV_TIMER_INIT(&self->timer_throttle);
645   self->timer_throttle.cookie = self;
646   self->timer_throttle.handler = _perform_work;
647 
648   IV_TIMER_INIT(&self->timer_flush);
649   self->timer_flush.cookie = self;
650   self->timer_flush.handler = _flush_timer_cb;
651 
652   IV_TASK_INIT(&self->do_work);
653   self->do_work.cookie = self;
654   self->do_work.handler = _perform_work;
655 }
656 
657 static void
_signal_startup_finished(LogThreadedDestWorker * self,gboolean thread_failure)658 _signal_startup_finished(LogThreadedDestWorker *self, gboolean thread_failure)
659 {
660   g_mutex_lock(&self->owner->lock);
661   self->startup_finished = TRUE;
662   self->startup_failure |= thread_failure;
663   g_cond_signal(&self->started_up);
664   g_mutex_unlock(&self->owner->lock);
665 }
666 
667 static void
_signal_startup_success(LogThreadedDestWorker * self)668 _signal_startup_success(LogThreadedDestWorker *self)
669 {
670   _signal_startup_finished(self, FALSE);
671 }
672 
673 static void
_signal_startup_failure(LogThreadedDestWorker * self)674 _signal_startup_failure(LogThreadedDestWorker *self)
675 {
676   _signal_startup_finished(self, TRUE);
677 }
678 
679 static void
_wait_for_startup_finished(LogThreadedDestWorker * self)680 _wait_for_startup_finished(LogThreadedDestWorker *self)
681 {
682   g_mutex_lock(&self->owner->lock);
683   while (!self->startup_finished)
684     g_cond_wait(&self->started_up, &self->owner->lock);
685   g_mutex_unlock(&self->owner->lock);
686 }
687 
688 static void
_register_worker_stats(LogThreadedDestWorker * self)689 _register_worker_stats(LogThreadedDestWorker *self)
690 {
691   StatsClusterKey sc_key;
692 
693   stats_lock();
694   _init_stats_key(self->owner, &sc_key);
695   log_queue_register_stats_counters(self->queue, 0, &sc_key);
696   stats_unlock();
697 }
698 
699 static void
_unregister_worker_stats(LogThreadedDestWorker * self)700 _unregister_worker_stats(LogThreadedDestWorker *self)
701 {
702   StatsClusterKey sc_key;
703 
704   stats_lock();
705   _init_stats_key(self->owner, &sc_key);
706   log_queue_unregister_stats_counters(self->queue, &sc_key);
707   stats_unlock();
708 }
709 
710 static void
_perform_final_flush(LogThreadedDestWorker * self)711 _perform_final_flush(LogThreadedDestWorker *self)
712 {
713   GlobalConfig *cfg = log_pipe_get_config(&self->owner->super.super.super);
714   LogThreadedResult result;
715   LogThreadedFlushMode mode = LTF_FLUSH_NORMAL;
716 
717   if (!cfg_is_shutting_down(cfg))
718     mode = LTF_FLUSH_EXPEDITE;
719 
720   result = log_threaded_dest_worker_flush(self, mode);
721   _process_result(self, result);
722   log_queue_rewind_backlog_all(self->queue);
723 }
724 
725 static void
_worker_thread(gpointer arg)726 _worker_thread(gpointer arg)
727 {
728   LogThreadedDestWorker *self = (LogThreadedDestWorker *) arg;
729 
730   iv_init();
731 
732   msg_debug("Dedicated worker thread started",
733             evt_tag_int("worker_index", self->worker_index),
734             evt_tag_str("driver", self->owner->super.super.id),
735             log_expr_node_location_tag(self->owner->super.super.super.expr_node));
736 
737   iv_event_register(&self->wake_up_event);
738   iv_event_register(&self->shutdown_event);
739 
740   if (!log_threaded_dest_worker_thread_init(self))
741     goto error;
742 
743   _signal_startup_success(self);
744 
745   /* if we have anything on the backlog, that was a partial, potentially
746    * not-flushed batch.  Rewind it, so we start with that */
747 
748   log_queue_rewind_backlog_all(self->queue);
749 
750   _schedule_restart(self);
751   iv_main();
752 
753   _perform_final_flush(self);
754 
755   _disconnect(self);
756 
757   log_threaded_dest_worker_thread_deinit(self);
758 
759   msg_debug("Dedicated worker thread finished",
760             evt_tag_int("worker_index", self->worker_index),
761             evt_tag_str("driver", self->owner->super.super.id),
762             log_expr_node_location_tag(self->owner->super.super.super.expr_node));
763 
764   goto ok;
765 
766 error:
767   _signal_startup_failure(self);
768 ok:
769   iv_event_unregister(&self->wake_up_event);
770   iv_event_unregister(&self->shutdown_event);
771   iv_deinit();
772 }
773 
774 static gboolean
_acquire_worker_queue(LogThreadedDestWorker * self)775 _acquire_worker_queue(LogThreadedDestWorker *self)
776 {
777   gchar *persist_name = _format_queue_persist_name(self);
778   self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name);
779   g_free(persist_name);
780 
781   if (!self->queue)
782     return FALSE;
783 
784   log_queue_set_use_backlog(self->queue, TRUE);
785 
786   return TRUE;
787 }
788 
789 gboolean
log_threaded_dest_worker_init_method(LogThreadedDestWorker * self)790 log_threaded_dest_worker_init_method(LogThreadedDestWorker *self)
791 {
792   if (self->time_reopen == -1)
793     self->time_reopen = self->owner->time_reopen;
794 
795   _register_worker_stats(self);
796 
797   return TRUE;
798 }
799 
800 void
log_threaded_dest_worker_deinit_method(LogThreadedDestWorker * self)801 log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self)
802 {
803   _unregister_worker_stats(self);
804 }
805 
806 void
log_threaded_dest_worker_free_method(LogThreadedDestWorker * self)807 log_threaded_dest_worker_free_method(LogThreadedDestWorker *self)
808 {
809   g_cond_clear(&self->started_up);
810 }
811 
812 void
log_threaded_dest_worker_init_instance(LogThreadedDestWorker * self,LogThreadedDestDriver * owner,gint worker_index)813 log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self, LogThreadedDestDriver *owner, gint worker_index)
814 {
815   self->worker_index = worker_index;
816   self->thread_init = log_threaded_dest_worker_init_method;
817   self->thread_deinit = log_threaded_dest_worker_deinit_method;
818   self->free_fn = log_threaded_dest_worker_free_method;
819   self->owner = owner;
820   self->time_reopen = -1;
821   g_cond_init(&self->started_up);
822   _init_watches(self);
823 }
824 
825 void
log_threaded_dest_worker_free(LogThreadedDestWorker * self)826 log_threaded_dest_worker_free(LogThreadedDestWorker *self)
827 {
828   if (self->free_fn)
829     self->free_fn(self);
830   g_free(self);
831 }
832 
833 /* LogThreadedDestDriver */
834 
835 void
log_threaded_dest_driver_set_num_workers(LogDriver * s,gint num_workers)836 log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers)
837 {
838   LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
839 
840   self->num_workers = num_workers;
841 }
842 
843 /* compatibility bridge between LogThreadedDestWorker */
844 
845 static gboolean
_compat_thread_init(LogThreadedDestWorker * self)846 _compat_thread_init(LogThreadedDestWorker *self)
847 {
848   if (!log_threaded_dest_worker_init_method(self))
849     return FALSE;
850 
851   /* NOTE: driver level thread_init() didn't have a gboolean return */
852   if (self->owner->worker.thread_init)
853     self->owner->worker.thread_init(self->owner);
854   return TRUE;
855 }
856 
857 static void
_compat_thread_deinit(LogThreadedDestWorker * self)858 _compat_thread_deinit(LogThreadedDestWorker *self)
859 {
860   if (self->owner->worker.thread_deinit)
861     self->owner->worker.thread_deinit(self->owner);
862   log_threaded_dest_worker_deinit_method(self);
863 }
864 
865 static gboolean
_compat_connect(LogThreadedDestWorker * self)866 _compat_connect(LogThreadedDestWorker *self)
867 {
868   if (self->owner->worker.connect)
869     return self->owner->worker.connect(self->owner);
870   return TRUE;
871 }
872 
873 static void
_compat_disconnect(LogThreadedDestWorker * self)874 _compat_disconnect(LogThreadedDestWorker *self)
875 {
876   if (self->owner->worker.disconnect)
877     self->owner->worker.disconnect(self->owner);
878 }
879 
880 static LogThreadedResult
_compat_insert(LogThreadedDestWorker * self,LogMessage * msg)881 _compat_insert(LogThreadedDestWorker *self, LogMessage *msg)
882 {
883   return self->owner->worker.insert(self->owner, msg);
884 }
885 
886 static LogThreadedResult
_compat_flush(LogThreadedDestWorker * self,LogThreadedFlushMode mode)887 _compat_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
888 {
889   if (self->owner->worker.flush)
890     return self->owner->worker.flush(self->owner);
891   return LTR_SUCCESS;
892 }
893 
894 static void
_init_worker_compat_layer(LogThreadedDestWorker * self)895 _init_worker_compat_layer(LogThreadedDestWorker *self)
896 {
897   self->thread_init = _compat_thread_init;
898   self->thread_deinit = _compat_thread_deinit;
899   self->connect = _compat_connect;
900   self->disconnect = _compat_disconnect;
901   self->insert = _compat_insert;
902   self->flush = _compat_flush;
903 }
904 
905 static gboolean
_is_worker_compat_mode(LogThreadedDestDriver * self)906 _is_worker_compat_mode(LogThreadedDestDriver *self)
907 {
908   return !self->worker.construct;
909 }
910 
911 /* temporary function until proper LogThreadedDestWorker allocation logic is
912  * created.  Right now it is just using a singleton within the driver */
913 static LogThreadedDestWorker *
_construct_worker(LogThreadedDestDriver * self,gint worker_index)914 _construct_worker(LogThreadedDestDriver *self, gint worker_index)
915 {
916   if (_is_worker_compat_mode(self))
917     {
918       /* kick in the compat layer, this case self->worker.instance is the
919        * single worker we have and all Worker related state is in the
920        * (derived) Driver class. */
921 
922       return &self->worker.instance;
923     }
924   return self->worker.construct(self, worker_index);
925 }
926 
927 static void
_request_worker_exit(gpointer s)928 _request_worker_exit(gpointer s)
929 {
930   LogThreadedDestWorker *self = (LogThreadedDestWorker *) s;
931 
932   msg_debug("Shutting down dedicated worker thread",
933             evt_tag_int("worker_index", self->worker_index),
934             evt_tag_str("driver", self->owner->super.super.id),
935             log_expr_node_location_tag(self->owner->super.super.super.expr_node));
936   self->owner->under_termination = TRUE;
937   iv_event_post(&self->shutdown_event);
938 }
939 
940 static gboolean
_start_worker_thread(LogThreadedDestWorker * self)941 _start_worker_thread(LogThreadedDestWorker *self)
942 {
943   msg_debug("Starting dedicated worker thread",
944             evt_tag_int("worker_index", self->worker_index),
945             evt_tag_str("driver", self->owner->super.super.id),
946             log_expr_node_location_tag(self->owner->super.super.super.expr_node));
947 
948   main_loop_create_worker_thread(_worker_thread,
949                                  _request_worker_exit,
950                                  self, &self->owner->worker_options);
951   _wait_for_startup_finished(self);
952   return !self->startup_failure;
953 }
954 
955 void
log_threaded_dest_driver_set_max_retries_on_error(LogDriver * s,gint max_retries)956 log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries)
957 {
958   LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
959 
960   self->retries_on_error_max = max_retries;
961 }
962 
963 LogThreadedDestWorker *
_lookup_worker(LogThreadedDestDriver * self,LogMessage * msg)964 _lookup_worker(LogThreadedDestDriver *self, LogMessage *msg)
965 {
966   gint worker_index = self->last_worker % self->num_workers;
967   self->last_worker++;
968 
969   /* here would come the lookup mechanism that maps msg -> worker that doesn't exist yet. */
970   return self->workers[worker_index];
971 }
972 
973 /* the feeding side of the driver, runs in the source thread and puts an
974  * incoming message to the associated queue.
975  */
976 static void
log_threaded_dest_driver_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)977 log_threaded_dest_driver_queue(LogPipe *s, LogMessage *msg,
978                                const LogPathOptions *path_options)
979 {
980   LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
981   LogThreadedDestWorker *dw = _lookup_worker(self, msg);
982   LogPathOptions local_options;
983 
984   if (!path_options->flow_control_requested)
985     path_options = log_msg_break_ack(msg, path_options, &local_options);
986 
987   log_msg_add_ack(msg, path_options);
988   log_queue_push_tail(dw->queue, log_msg_ref(msg), path_options);
989 
990   stats_counter_inc(self->processed_messages);
991 
992   log_dest_driver_queue_method(s, msg, path_options);
993 }
994 
995 static void
_init_stats_key(LogThreadedDestDriver * self,StatsClusterKey * sc_key)996 _init_stats_key(LogThreadedDestDriver *self, StatsClusterKey *sc_key)
997 {
998   stats_cluster_logpipe_key_set(sc_key, self->stats_source | SCS_DESTINATION,
999                                 self->super.super.id,
1000                                 self->format_stats_instance(self));
1001 }
1002 
1003 void
log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver * self,gsize len)1004 log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len)
1005 {
1006   stats_aggregator_insert_data(self->max_message_size, len);
1007   stats_aggregator_insert_data(self->average_messages_size, len);
1008 }
1009 
1010 void
log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver * self,gsize len)1011 log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len)
1012 {
1013   stats_aggregator_insert_data(self->max_batch_size, len);
1014   stats_aggregator_insert_data(self->average_batch_size, len);
1015 }
1016 
1017 void
log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver * self)1018 log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self)
1019 {
1020   StatsClusterKey sc_key_eps_input;
1021   _init_stats_key(self, &sc_key_eps_input);
1022   stats_aggregator_lock();
1023   StatsClusterKey sc_key;
1024 
1025   stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1026                                          self->format_stats_instance(self), "msg_size_max");
1027   stats_register_aggregator_maximum(0, &sc_key, &self->max_message_size);
1028 
1029   stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1030                                          self->format_stats_instance(self), "msg_size_avg");
1031   stats_register_aggregator_average(0, &sc_key, &self->average_messages_size);
1032 
1033   stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1034                                          self->format_stats_instance(self), "batch_size_max");
1035   stats_register_aggregator_maximum(0, &sc_key, &self->max_batch_size);
1036 
1037   stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1038                                          self->format_stats_instance(self), "batch_size_avg");
1039   stats_register_aggregator_average(0, &sc_key, &self->average_batch_size);
1040 
1041   stats_cluster_single_key_set_with_name(&sc_key, self->stats_source | SCS_DESTINATION, self->super.super.id,
1042                                          self->format_stats_instance(self), "eps");
1043   stats_register_aggregator_cps(0, &sc_key, &sc_key_eps_input, SC_TYPE_WRITTEN, &self->CPS);
1044 
1045   stats_aggregator_unlock();
1046 }
1047 
1048 void
log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver * self)1049 log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self)
1050 {
1051   stats_aggregator_lock();
1052 
1053   stats_unregister_aggregator_maximum(&self->max_message_size);
1054   stats_unregister_aggregator_average(&self->average_messages_size);
1055   stats_unregister_aggregator_maximum(&self->max_batch_size);
1056   stats_unregister_aggregator_average(&self->average_batch_size);
1057   stats_unregister_aggregator_cps(&self->CPS);
1058 
1059   stats_aggregator_unlock();
1060 }
1061 
1062 static void
_register_stats(LogThreadedDestDriver * self)1063 _register_stats(LogThreadedDestDriver *self)
1064 {
1065   stats_lock();
1066   {
1067     StatsClusterKey sc_key;
1068 
1069     _init_stats_key(self, &sc_key);
1070     stats_register_counter(0, &sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1071     stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1072     stats_register_counter(0, &sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1073 
1074   }
1075   stats_unlock();
1076 }
1077 
1078 static void
_unregister_stats(LogThreadedDestDriver * self)1079 _unregister_stats(LogThreadedDestDriver *self)
1080 {
1081   stats_lock();
1082   {
1083     StatsClusterKey sc_key;
1084 
1085     _init_stats_key(self, &sc_key);
1086     stats_unregister_counter(&sc_key, SC_TYPE_DROPPED, &self->dropped_messages);
1087     stats_unregister_counter(&sc_key, SC_TYPE_PROCESSED, &self->processed_messages);
1088     stats_unregister_counter(&sc_key, SC_TYPE_WRITTEN, &self->written_messages);
1089 
1090   }
1091   stats_unlock();
1092 }
1093 
1094 static gchar *
_format_seqnum_persist_name(LogThreadedDestDriver * self)1095 _format_seqnum_persist_name(LogThreadedDestDriver *self)
1096 {
1097   static gchar persist_name[256];
1098 
1099   g_snprintf(persist_name, sizeof(persist_name), "%s.seqnum",
1100              self->super.super.super.generate_persist_name((const LogPipe *)self));
1101 
1102   return persist_name;
1103 }
1104 
1105 static gboolean
_create_workers(LogThreadedDestDriver * self)1106 _create_workers(LogThreadedDestDriver *self)
1107 {
1108   /* free previous workers array if set to cope with num_workers change */
1109   g_free(self->workers);
1110   self->workers = g_new0(LogThreadedDestWorker *, self->num_workers);
1111 
1112   for (self->created_workers = 0; self->created_workers < self->num_workers; self->created_workers++)
1113     {
1114       LogThreadedDestWorker *dw = _construct_worker(self, self->created_workers);
1115 
1116       self->workers[self->created_workers] = dw;
1117       if (!_acquire_worker_queue(dw))
1118         return FALSE;
1119     }
1120 
1121   return TRUE;
1122 }
1123 
1124 gboolean
log_threaded_dest_driver_init_method(LogPipe * s)1125 log_threaded_dest_driver_init_method(LogPipe *s)
1126 {
1127   LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1128   GlobalConfig *cfg = log_pipe_get_config(s);
1129 
1130   if (!log_dest_driver_init_method(&self->super.super.super))
1131     return FALSE;
1132 
1133   self->under_termination = FALSE;
1134 
1135   if (self->time_reopen == -1)
1136     self->time_reopen = cfg->time_reopen;
1137 
1138   self->shared_seq_num = GPOINTER_TO_INT(cfg_persist_config_fetch(cfg,
1139                                          _format_seqnum_persist_name(self)));
1140   if (!self->shared_seq_num)
1141     init_sequence_number(&self->shared_seq_num);
1142 
1143   _register_stats(self);
1144 
1145   if (!_create_workers(self))
1146     return FALSE;
1147 
1148   return TRUE;
1149 }
1150 
1151 /* This method is only used when a LogThreadedDestDriver is directly used
1152  * without overriding its on_config_inited method.  If there's an overridden
1153  * method, the caller is responsible for explicitly calling _start_workers() at
1154  * the end of on_config_inited(). */
1155 gboolean
log_threaded_dest_driver_start_workers(LogPipe * s)1156 log_threaded_dest_driver_start_workers(LogPipe *s)
1157 {
1158   LogThreadedDestDriver *self = (LogThreadedDestDriver *) s;
1159 
1160   for (gint worker_index = 0; worker_index < self->num_workers; worker_index++)
1161     {
1162       if (!_start_worker_thread(self->workers[worker_index]))
1163         return FALSE;
1164     }
1165   return TRUE;
1166 }
1167 
1168 gboolean
log_threaded_dest_driver_deinit_method(LogPipe * s)1169 log_threaded_dest_driver_deinit_method(LogPipe *s)
1170 {
1171   LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1172 
1173   /* NOTE: workers are shut down by the time we get here, through the
1174    * request_exit mechanism of main loop worker threads */
1175 
1176   cfg_persist_config_add(log_pipe_get_config(s),
1177                          _format_seqnum_persist_name(self),
1178                          GINT_TO_POINTER(self->shared_seq_num), NULL, FALSE);
1179 
1180   _unregister_stats(self);
1181 
1182   if (!_is_worker_compat_mode(self))
1183     {
1184       for (int i = 0; i < self->created_workers; i++)
1185         log_threaded_dest_worker_free(self->workers[i]);
1186     }
1187 
1188   return log_dest_driver_deinit_method(s);
1189 }
1190 
1191 
1192 void
log_threaded_dest_driver_free(LogPipe * s)1193 log_threaded_dest_driver_free(LogPipe *s)
1194 {
1195   LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;
1196 
1197   log_threaded_dest_worker_free_method(&self->worker.instance);
1198   g_mutex_clear(&self->lock);
1199   g_free(self->workers);
1200   log_dest_driver_free((LogPipe *)self);
1201 }
1202 
1203 void
log_threaded_dest_driver_init_instance(LogThreadedDestDriver * self,GlobalConfig * cfg)1204 log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg)
1205 {
1206   log_dest_driver_init_instance(&self->super, cfg);
1207 
1208   self->worker_options.is_output_thread = TRUE;
1209 
1210   self->super.super.super.init = log_threaded_dest_driver_init_method;
1211   self->super.super.super.deinit = log_threaded_dest_driver_deinit_method;
1212   self->super.super.super.queue = log_threaded_dest_driver_queue;
1213   self->super.super.super.free_fn = log_threaded_dest_driver_free;
1214   self->super.super.super.on_config_inited = log_threaded_dest_driver_start_workers;
1215   self->time_reopen = -1;
1216   self->batch_lines = -1;
1217   self->batch_timeout = -1;
1218   self->num_workers = 1;
1219   self->last_worker = 0;
1220 
1221   self->retries_on_error_max = MAX_RETRIES_ON_ERROR_DEFAULT;
1222   self->retries_max = MAX_RETRIES_BEFORE_SUSPEND_DEFAULT;
1223   g_mutex_init(&self->lock);
1224   log_threaded_dest_worker_init_instance(&self->worker.instance, self, 0);
1225   _init_worker_compat_layer(&self->worker.instance);
1226 }
1227