1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logqueue.h"
26 #include "logpipe.h"
27 #include "messages.h"
28 #include "serialize.h"
29 #include "stats/stats-registry.h"
30 #include "mainloop-worker.h"
31 
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <fcntl.h>
35 #include <unistd.h>
36 #include <string.h>
37 #include <iv_thread.h>
38 
39 QueueType log_queue_fifo_type = "FIFO";
40 
41 /*
42  * LogFifo is a scalable first-in-first-output queue implementation, that:
43  *
44  *   - has a per-thread, unlocked input queue where threads can put their items
45  *
46  *   - has a locked wait-queue where items go once the per-thread input
47  *     would be overflown or if the input thread goes to sleep (e.g.  one
48  *     lock acquisition per a longer period)
49  *
50  *   - has an unlocked output queue where items from the wait queue go, once
51  *     it becomes depleted.
52  *
53  * This means that items flow in this sequence from one list to the next:
54  *
55  *    input queue (per-thread) -> wait queue (locked) -> output queue (single-threaded)
56  *
57  * Fastpath is:
58  *   - input threads putting elements on their per-thread queue (lockless)
59  *   - output threads removing elements from the output queue (lockless)
60  *
61  * Slowpath:
62  *   - input queue is overflown (or the input thread goes to sleep), wait
63  *     queue mutex is grabbed, all elements are put to the wait queue.
64  *
65  *   - output queue is depleted, wait queue mutex is grabbed, all elements
66  *     on the wait queue is put to the output queue
67  *
68  * Threading assumptions:
69  *   - the head of the queue is only manipulated from the output thread
70  *   - the tail of the queue is only manipulated from the input threads
71  *
72  */
73 
74 typedef struct _InputQueue
75 {
76   struct iv_list_head items;
77   WorkerBatchCallback cb;
78   guint16 len;
79   guint16 non_flow_controlled_len;
80   guint16 finish_cb_registered;
81 } InputQueue;
82 
83 typedef struct _OverflowQueue
84 {
85   struct iv_list_head items;
86   gint len;
87   gint non_flow_controlled_len;
88 } OverflowQueue;
89 
90 typedef struct _LogQueueFifo
91 {
92   LogQueue super;
93 
94   /* scalable qoverflow implementation */
95   OverflowQueue output_queue;
96   OverflowQueue wait_queue;
97   OverflowQueue backlog_queue; /* entries that were sent but not acked yet */
98 
99   gint log_fifo_size;
100 
101   /* legacy: flow-controlled messages are included in the log_fifo_size limit */
102   gboolean use_legacy_fifo_size;
103 
104   InputQueue input_queues[0];
105 } LogQueueFifo;
106 
107 /* NOTE: this is inherently racy. If the LogQueue->lock is taken, then the
108  * race is limited to the changes in output_queue queue changes.
109  *
110  * In the output thread, this means that this can get race-free. In the
111  * input thread, the output_queue can change because of a
112  * log_queue_fifo_push_head() or log_queue_fifo_rewind_backlog().
113  *
114  */
115 
116 static void
iv_list_update_msg_size(LogQueueFifo * self,struct iv_list_head * head)117 iv_list_update_msg_size(LogQueueFifo *self, struct iv_list_head *head)
118 {
119   LogMessage *msg;
120   struct iv_list_head *ilh, *ilh2;
121   iv_list_for_each_safe(ilh, ilh2, head)
122   {
123     msg = iv_list_entry(ilh, LogMessageQueueNode, list)->msg;
124     log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
125   }
126 }
127 
128 static gint64
log_queue_fifo_get_length(LogQueue * s)129 log_queue_fifo_get_length(LogQueue *s)
130 {
131   LogQueueFifo *self = (LogQueueFifo *) s;
132 
133   return self->wait_queue.len + self->output_queue.len;
134 }
135 
136 static gint64
log_queue_fifo_get_non_flow_controlled_length(LogQueueFifo * self)137 log_queue_fifo_get_non_flow_controlled_length(LogQueueFifo *self)
138 {
139   return self->wait_queue.non_flow_controlled_len + self->output_queue.non_flow_controlled_len;
140 }
141 
142 gboolean
log_queue_fifo_is_empty_racy(LogQueue * s)143 log_queue_fifo_is_empty_racy(LogQueue *s)
144 {
145   LogQueueFifo *self = (LogQueueFifo *) s;
146   gboolean has_message_in_queue = FALSE;
147   g_mutex_lock(&self->super.lock);
148   if (log_queue_fifo_get_length(s) > 0)
149     {
150       has_message_in_queue = TRUE;
151     }
152   else
153     {
154       gint i;
155       for (i = 0; i < log_queue_max_threads && !has_message_in_queue; i++)
156         {
157           has_message_in_queue |= self->input_queues[i].finish_cb_registered;
158         }
159     }
160   g_mutex_unlock(&self->super.lock);
161   return !has_message_in_queue;
162 }
163 
164 /* NOTE: this is inherently racy, can only be called if log processing is suspended (e.g. reload time) */
165 static gboolean
log_queue_fifo_keep_on_reload(LogQueue * s)166 log_queue_fifo_keep_on_reload(LogQueue *s)
167 {
168   LogQueueFifo *self = (LogQueueFifo *) s;
169   return log_queue_fifo_get_length(s) > 0 || self->backlog_queue.len > 0;
170 }
171 
172 static inline void
log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo * self,InputQueue * input_queue,gint num_of_messages_to_drop)173 log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo *self, InputQueue *input_queue, gint num_of_messages_to_drop)
174 {
175   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
176 
177   struct iv_list_head *item = input_queue->items.next;
178   for (gint dropped = 0; dropped < num_of_messages_to_drop;)
179     {
180       LogMessageQueueNode *node = iv_list_entry(item, LogMessageQueueNode, list);
181 
182       item = item->next;
183 
184       path_options.ack_needed = node->ack_needed;
185       path_options.flow_control_requested = node->flow_control_requested;
186 
187       if (!self->use_legacy_fifo_size && path_options.flow_control_requested)
188         continue;
189 
190       iv_list_del(&node->list);
191       input_queue->len--;
192       stats_counter_inc(self->super.dropped_messages);
193       log_msg_free_queue_node(node);
194 
195       LogMessage *msg = node->msg;
196       if (path_options.flow_control_requested)
197         log_msg_drop(msg, &path_options, AT_SUSPENDED);
198       else
199         {
200           input_queue->non_flow_controlled_len--;
201           log_msg_drop(msg, &path_options, AT_PROCESSED);
202         }
203 
204       dropped++;
205     }
206 
207   msg_debug("Destination queue full, dropping messages",
208             evt_tag_int("log_fifo_size", self->log_fifo_size),
209             evt_tag_int("number_of_dropped_messages", num_of_messages_to_drop),
210             evt_tag_str("persist_name", self->super.persist_name));
211 }
212 
213 static inline gboolean
log_queue_fifo_calculate_num_of_messages_to_drop(LogQueueFifo * self,InputQueue * input_queue,gint * num_of_messages_to_drop)214 log_queue_fifo_calculate_num_of_messages_to_drop(LogQueueFifo *self, InputQueue *input_queue,
215                                                  gint *num_of_messages_to_drop)
216 {
217   /* since we're in the input thread, queue_len will be racy.
218    * It can increase due to log_queue_fifo_push_head() and can also decrease as
219    * items are removed from the output queue using log_queue_pop_head().
220    *
221    * The only reason we're using it here is to check for qoverflow
222    * overflows, however the only side-effect of the race (if lost) is that
223    * we would lose a couple of message too many or add some more messages to
224    * qoverflow than permitted by the user.  Since if flow-control is used,
225    * the fifo size should be sized larger than the potential window sizes,
226    * otherwise we can lose messages anyway, this is not deemed a cost to
227    * justify proper locking in this case.
228    */
229 
230   guint16 input_queue_len;
231   gint queue_len;
232 
233   if (G_UNLIKELY(self->use_legacy_fifo_size))
234     {
235       queue_len = log_queue_fifo_get_length(&self->super);
236       input_queue_len = input_queue->len;
237     }
238   else
239     {
240       queue_len = log_queue_fifo_get_non_flow_controlled_length(self);
241       input_queue_len = input_queue->non_flow_controlled_len;
242     }
243 
244   gboolean drop_messages = queue_len + input_queue_len > self->log_fifo_size;
245   if (!drop_messages)
246     return FALSE;
247 
248   /* NOTE: MAX is needed here to ensure that the lost race on queue_len
249    * doesn't result in n < 0 */
250   *num_of_messages_to_drop = input_queue_len - MAX(0, (self->log_fifo_size - queue_len));
251 
252   return TRUE;
253 }
254 
255 /* move items from the per-thread input queue to the lock-protected "wait" queue */
256 static void
log_queue_fifo_move_input_unlocked(LogQueueFifo * self,gint thread_id)257 log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id)
258 {
259   gint num_of_messages_to_drop;
260   gboolean drop_messages = log_queue_fifo_calculate_num_of_messages_to_drop(self, &self->input_queues[thread_id],
261                            &num_of_messages_to_drop);
262 
263   if (drop_messages)
264     {
265       /* slow path, the input thread's queue would overflow the queue, let's drop some messages */
266       log_queue_fifo_drop_messages_from_input_queue(self, &self->input_queues[thread_id], num_of_messages_to_drop);
267     }
268 
269   log_queue_queued_messages_add(&self->super, self->input_queues[thread_id].len);
270   iv_list_update_msg_size(self, &self->input_queues[thread_id].items);
271 
272   iv_list_splice_tail_init(&self->input_queues[thread_id].items, &self->wait_queue.items);
273   self->wait_queue.len += self->input_queues[thread_id].len;
274   self->wait_queue.non_flow_controlled_len += self->input_queues[thread_id].non_flow_controlled_len;
275   self->input_queues[thread_id].len = 0;
276   self->input_queues[thread_id].non_flow_controlled_len = 0;
277 }
278 
279 /* move items from the per-thread input queue to the lock-protected
280  * "wait" queue, but grabbing locks first. This is registered as a
281  * callback to be called when the input worker thread finishes its
282  * job.
283  */
284 static gpointer
log_queue_fifo_move_input(gpointer user_data)285 log_queue_fifo_move_input(gpointer user_data)
286 {
287   LogQueueFifo *self = (LogQueueFifo *) user_data;
288   gint thread_id;
289 
290   thread_id = main_loop_worker_get_thread_id();
291 
292   g_assert(thread_id >= 0);
293 
294   g_mutex_lock(&self->super.lock);
295   log_queue_fifo_move_input_unlocked(self, thread_id);
296   log_queue_push_notify(&self->super);
297   g_mutex_unlock(&self->super.lock);
298   self->input_queues[thread_id].finish_cb_registered = FALSE;
299   log_queue_unref(&self->super);
300   return NULL;
301 }
302 
303 /* lock must be held */
304 static inline gboolean
_message_has_to_be_dropped(LogQueueFifo * self,const LogPathOptions * path_options)305 _message_has_to_be_dropped(LogQueueFifo *self, const LogPathOptions *path_options)
306 {
307   if (G_UNLIKELY(self->use_legacy_fifo_size))
308     return log_queue_fifo_get_length(&self->super) >= self->log_fifo_size;
309 
310   return !path_options->flow_control_requested
311          && log_queue_fifo_get_non_flow_controlled_length(self) >= self->log_fifo_size;
312 }
313 
314 static inline void
_drop_message(LogMessage * msg,const LogPathOptions * path_options)315 _drop_message(LogMessage *msg, const LogPathOptions *path_options)
316 {
317   if (path_options->flow_control_requested)
318     {
319       log_msg_drop(msg, path_options, AT_SUSPENDED);
320       return;
321     }
322 
323   log_msg_drop(msg, path_options, AT_PROCESSED);
324 }
325 
326 /**
327  * Assumed to be called from one of the input threads. If the thread_id
328  * cannot be determined, the item is put directly in the wait queue.
329  *
330  * Puts the message to the queue, and logs an error if it caused the
331  * queue to be full.
332  *
333  * It attempts to put the item to the per-thread input queue.
334  *
335  * NOTE: It consumes the reference passed by the caller.
336  **/
337 static void
log_queue_fifo_push_tail(LogQueue * s,LogMessage * msg,const LogPathOptions * path_options)338 log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
339 {
340   LogQueueFifo *self = (LogQueueFifo *) s;
341   gint thread_id;
342   LogMessageQueueNode *node;
343 
344   thread_id = main_loop_worker_get_thread_id();
345 
346   g_assert(thread_id < 0 || log_queue_max_threads > thread_id);
347 
348   /* NOTE: we don't use high-water marks for now, as log_fetch_limit
349    * limits the number of items placed on the per-thread input queue
350    * anyway, and any sane number decreased the performance measurably.
351    *
352    * This means that per-thread input queues contain _all_ items that
353    * a single poll iteration produces. And once the reader is finished
354    * (either because the input is depleted or because of
355    * log_fetch_limit / window_size) the whole bunch is propagated to
356    * the "wait" queue.
357    */
358 
359   if (thread_id >= 0)
360     {
361       /* fastpath, use per-thread input FIFOs */
362       if (!self->input_queues[thread_id].finish_cb_registered)
363         {
364           /* this is the first item in the input FIFO, register a finish
365            * callback to make sure it gets moved to the wait_queue if the
366            * input thread finishes
367            * One reference should be held, while the callback is registered
368            * avoiding use-after-free situation
369            */
370 
371           main_loop_worker_register_batch_callback(&self->input_queues[thread_id].cb);
372           self->input_queues[thread_id].finish_cb_registered = TRUE;
373           log_queue_ref(&self->super);
374         }
375 
376       node = log_msg_alloc_queue_node(msg, path_options);
377       iv_list_add_tail(&node->list, &self->input_queues[thread_id].items);
378       self->input_queues[thread_id].len++;
379 
380       if (!path_options->flow_control_requested)
381         self->input_queues[thread_id].non_flow_controlled_len++;
382 
383       log_msg_unref(msg);
384       return;
385     }
386 
387   /* slow path, put the pending item and the whole input queue to the wait_queue */
388 
389   g_mutex_lock(&self->super.lock);
390 
391   if (_message_has_to_be_dropped(self, path_options))
392     {
393       stats_counter_inc(self->super.dropped_messages);
394       g_mutex_unlock(&self->super.lock);
395 
396       _drop_message(msg, path_options);
397 
398       msg_debug("Destination queue full, dropping message",
399                 evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
400                 evt_tag_int("log_fifo_size", self->log_fifo_size),
401                 evt_tag_str("persist_name", self->super.persist_name));
402       return;
403     }
404 
405   node = log_msg_alloc_queue_node(msg, path_options);
406 
407   iv_list_add_tail(&node->list, &self->wait_queue.items);
408   self->wait_queue.len++;
409 
410   if (!path_options->flow_control_requested)
411     self->wait_queue.non_flow_controlled_len++;
412 
413   log_queue_push_notify(&self->super);
414   log_queue_queued_messages_inc(&self->super);
415 
416   log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
417   g_mutex_unlock(&self->super.lock);
418 
419   log_msg_unref(msg);
420 }
421 
422 /*
423  * Put an item back to the front of the queue.
424  *
425  * This is assumed to be called only from the output thread.
426  *
427  * NOTE: It consumes the reference passed by the caller.
428  */
429 static void
log_queue_fifo_push_head(LogQueue * s,LogMessage * msg,const LogPathOptions * path_options)430 log_queue_fifo_push_head(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
431 {
432   LogQueueFifo *self = (LogQueueFifo *) s;
433   LogMessageQueueNode *node;
434 
435   /* we don't check limits when putting items "in-front", as it
436    * normally happens when we start processing an item, but at the end
437    * can't deliver it. No checks, no drops either. */
438 
439   node = log_msg_alloc_dynamic_queue_node(msg, path_options);
440   iv_list_add(&node->list, &self->output_queue.items);
441   self->output_queue.len++;
442 
443   if (!path_options->flow_control_requested)
444     self->output_queue.non_flow_controlled_len++;
445 
446   log_msg_unref(msg);
447 
448   log_queue_queued_messages_inc(&self->super);
449   log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
450 }
451 
452 /*
453  * Can only run from the output thread.
454  *
455  * NOTE: this returns a reference which the caller must take care to free.
456  */
457 static LogMessage *
log_queue_fifo_pop_head(LogQueue * s,LogPathOptions * path_options)458 log_queue_fifo_pop_head(LogQueue *s, LogPathOptions *path_options)
459 {
460   LogQueueFifo *self = (LogQueueFifo *) s;
461   LogMessageQueueNode *node;
462   LogMessage *msg = NULL;
463 
464   if (self->output_queue.len == 0)
465     {
466       /* slow path, output queue is empty, get some elements from the wait queue */
467       g_mutex_lock(&self->super.lock);
468       iv_list_splice_tail_init(&self->wait_queue.items, &self->output_queue.items);
469       self->output_queue.len = self->wait_queue.len;
470       self->output_queue.non_flow_controlled_len = self->wait_queue.non_flow_controlled_len;
471       self->wait_queue.len = 0;
472       self->wait_queue.non_flow_controlled_len = 0;
473       g_mutex_unlock(&self->super.lock);
474     }
475 
476   if (self->output_queue.len > 0)
477     {
478       node = iv_list_entry(self->output_queue.items.next, LogMessageQueueNode, list);
479 
480       msg = node->msg;
481       path_options->ack_needed = node->ack_needed;
482       self->output_queue.len--;
483 
484       if (!node->flow_control_requested)
485         self->output_queue.non_flow_controlled_len--;
486 
487       if (!self->super.use_backlog)
488         {
489           iv_list_del(&node->list);
490           log_msg_free_queue_node(node);
491         }
492       else
493         {
494           iv_list_del_init(&node->list);
495         }
496     }
497   else
498     {
499       /* no items either on the wait queue nor the output queue.
500        *
501        * NOTE: the input queues may contain items even in this case,
502        * however we don't touch them here, they'll be migrated to the
503        * wait_queue once the input threads finish their processing (or
504        * the high watermark is reached). Also, they are unlocked, so
505        * no way to touch them safely.
506        */
507       return NULL;
508     }
509   log_queue_queued_messages_dec(&self->super);
510   log_queue_memory_usage_sub(&self->super, log_msg_get_size(msg));
511 
512   if (self->super.use_backlog)
513     {
514       log_msg_ref(msg);
515       iv_list_add_tail(&node->list, &self->backlog_queue.items);
516       self->backlog_queue.len++;
517 
518       if (!node->flow_control_requested)
519         self->backlog_queue.non_flow_controlled_len++;
520     }
521 
522   return msg;
523 }
524 
525 /*
526  * Can only run from the output thread.
527  */
528 static void
log_queue_fifo_ack_backlog(LogQueue * s,gint rewind_count)529 log_queue_fifo_ack_backlog(LogQueue *s, gint rewind_count)
530 {
531   LogQueueFifo *self = (LogQueueFifo *) s;
532   LogMessage *msg;
533   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
534   gint pos;
535 
536   for (pos = 0; pos < rewind_count && self->backlog_queue.len > 0; pos++)
537     {
538       LogMessageQueueNode *node;
539       node = iv_list_entry(self->backlog_queue.items.next, LogMessageQueueNode, list);
540       msg = node->msg;
541 
542       iv_list_del(&node->list);
543       self->backlog_queue.len--;
544 
545       if (!node->flow_control_requested)
546         self->backlog_queue.non_flow_controlled_len--;
547 
548       path_options.ack_needed = node->ack_needed;
549       log_msg_ack(msg, &path_options, AT_PROCESSED);
550       log_msg_free_queue_node(node);
551       log_msg_unref(msg);
552     }
553 }
554 
555 
556 /*
557  * log_queue_rewind_backlog_all:
558  *
559  * Move items on our backlog back to our qoverflow queue. Please note that this
560  * function does not really care about qoverflow size, it has to put the backlog
561  * somewhere. The backlog is emptied as that will be filled if we send the
562  * items again.
563  *
564  * NOTE: this is assumed to be called from the output thread.
565  */
566 static void
log_queue_fifo_rewind_backlog_all(LogQueue * s)567 log_queue_fifo_rewind_backlog_all(LogQueue *s)
568 {
569   LogQueueFifo *self = (LogQueueFifo *) s;
570 
571   iv_list_update_msg_size(self, &self->backlog_queue.items);
572   iv_list_splice_tail_init(&self->backlog_queue.items, &self->output_queue.items);
573 
574   self->output_queue.len += self->backlog_queue.len;
575   self->output_queue.non_flow_controlled_len += self->backlog_queue.non_flow_controlled_len;
576   log_queue_queued_messages_add(&self->super, self->backlog_queue.len);
577   self->backlog_queue.len = 0;
578   self->backlog_queue.non_flow_controlled_len = 0;
579 }
580 
581 static void
log_queue_fifo_rewind_backlog(LogQueue * s,guint rewind_count)582 log_queue_fifo_rewind_backlog(LogQueue *s, guint rewind_count)
583 {
584   LogQueueFifo *self = (LogQueueFifo *) s;
585   guint pos;
586 
587   if (rewind_count > self->backlog_queue.len)
588     rewind_count = self->backlog_queue.len;
589 
590   for (pos = 0; pos < rewind_count; pos++)
591     {
592       LogMessageQueueNode *node = iv_list_entry(self->backlog_queue.items.prev, LogMessageQueueNode, list);
593       /*
594        * Because the message go to the backlog only in case of pop_head
595        * and pop_head add ack and ref when it pushes the message into the backlog
596        * The rewind must decrease the ack and ref too
597        */
598       iv_list_del_init(&node->list);
599       iv_list_add(&node->list, &self->output_queue.items);
600 
601       self->backlog_queue.len--;
602       self->output_queue.len++;
603 
604       if (!node->flow_control_requested)
605         {
606           self->backlog_queue.non_flow_controlled_len--;
607           self->output_queue.non_flow_controlled_len++;
608         }
609 
610       log_queue_queued_messages_inc(&self->super);
611       log_queue_memory_usage_add(&self->super, log_msg_get_size(node->msg));
612     }
613 }
614 
615 static void
log_queue_fifo_free_queue(struct iv_list_head * q)616 log_queue_fifo_free_queue(struct iv_list_head *q)
617 {
618   while (!iv_list_empty(q))
619     {
620       LogMessageQueueNode *node;
621       LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
622       LogMessage *msg;
623 
624       node = iv_list_entry(q->next, LogMessageQueueNode, list);
625       iv_list_del(&node->list);
626 
627       path_options.ack_needed = node->ack_needed;
628       msg = node->msg;
629       log_msg_free_queue_node(node);
630       log_msg_ack(msg, &path_options, AT_ABORTED);
631       log_msg_unref(msg);
632     }
633 }
634 
635 static void
log_queue_fifo_free(LogQueue * s)636 log_queue_fifo_free(LogQueue *s)
637 {
638   LogQueueFifo *self = (LogQueueFifo *) s;
639   gint i;
640 
641   for (i = 0; i < log_queue_max_threads; i++)
642     {
643       g_assert(self->input_queues[i].finish_cb_registered == FALSE);
644       log_queue_fifo_free_queue(&self->input_queues[i].items);
645     }
646 
647   log_queue_fifo_free_queue(&self->wait_queue.items);
648   log_queue_fifo_free_queue(&self->output_queue.items);
649   log_queue_fifo_free_queue(&self->backlog_queue.items);
650   log_queue_free_method(s);
651 }
652 
653 LogQueue *
log_queue_fifo_new(gint log_fifo_size,const gchar * persist_name)654 log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name)
655 {
656   LogQueueFifo *self;
657   gint i;
658 
659   self = g_malloc0(sizeof(LogQueueFifo) + log_queue_max_threads * sizeof(self->input_queues[0]));
660 
661   log_queue_init_instance(&self->super, persist_name);
662   self->super.type = log_queue_fifo_type;
663   self->super.use_backlog = FALSE;
664   self->super.get_length = log_queue_fifo_get_length;
665   self->super.is_empty_racy = log_queue_fifo_is_empty_racy;
666   self->super.keep_on_reload = log_queue_fifo_keep_on_reload;
667   self->super.push_tail = log_queue_fifo_push_tail;
668   self->super.push_head = log_queue_fifo_push_head;
669   self->super.pop_head = log_queue_fifo_pop_head;
670   self->super.ack_backlog = log_queue_fifo_ack_backlog;
671   self->super.rewind_backlog = log_queue_fifo_rewind_backlog;
672   self->super.rewind_backlog_all = log_queue_fifo_rewind_backlog_all;
673 
674   self->super.free_fn = log_queue_fifo_free;
675 
676   for (i = 0; i < log_queue_max_threads; i++)
677     {
678       INIT_IV_LIST_HEAD(&self->input_queues[i].items);
679       worker_batch_callback_init(&self->input_queues[i].cb);
680       self->input_queues[i].cb.func = log_queue_fifo_move_input;
681       self->input_queues[i].cb.user_data = self;
682     }
683   INIT_IV_LIST_HEAD(&self->wait_queue.items);
684   INIT_IV_LIST_HEAD(&self->output_queue.items);
685   INIT_IV_LIST_HEAD(&self->backlog_queue.items);
686 
687   self->log_fifo_size = log_fifo_size;
688   return &self->super;
689 }
690 
691 LogQueue *
log_queue_fifo_legacy_new(gint log_fifo_size,const gchar * persist_name)692 log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name)
693 {
694   LogQueueFifo *self = (LogQueueFifo *) log_queue_fifo_new(log_fifo_size, persist_name);
695   self->use_legacy_fifo_size = TRUE;
696   return &self->super;
697 }
698 
699 QueueType
log_queue_fifo_get_type(void)700 log_queue_fifo_get_type(void)
701 {
702   return log_queue_fifo_type;
703 }
704