1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logqueue.h"
26 #include "logpipe.h"
27 #include "messages.h"
28 #include "serialize.h"
29 #include "stats/stats-registry.h"
30 #include "mainloop-worker.h"
31
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <fcntl.h>
35 #include <unistd.h>
36 #include <string.h>
37 #include <iv_thread.h>
38
39 QueueType log_queue_fifo_type = "FIFO";
40
41 /*
42 * LogFifo is a scalable first-in-first-output queue implementation, that:
43 *
44 * - has a per-thread, unlocked input queue where threads can put their items
45 *
46 * - has a locked wait-queue where items go once the per-thread input
47 * would be overflown or if the input thread goes to sleep (e.g. one
48 * lock acquisition per a longer period)
49 *
50 * - has an unlocked output queue where items from the wait queue go, once
51 * it becomes depleted.
52 *
53 * This means that items flow in this sequence from one list to the next:
54 *
55 * input queue (per-thread) -> wait queue (locked) -> output queue (single-threaded)
56 *
57 * Fastpath is:
58 * - input threads putting elements on their per-thread queue (lockless)
59 * - output threads removing elements from the output queue (lockless)
60 *
61 * Slowpath:
62 * - input queue is overflown (or the input thread goes to sleep), wait
63 * queue mutex is grabbed, all elements are put to the wait queue.
64 *
65 * - output queue is depleted, wait queue mutex is grabbed, all elements
66 * on the wait queue is put to the output queue
67 *
68 * Threading assumptions:
69 * - the head of the queue is only manipulated from the output thread
70 * - the tail of the queue is only manipulated from the input threads
71 *
72 */
73
74 typedef struct _InputQueue
75 {
76 struct iv_list_head items;
77 WorkerBatchCallback cb;
78 guint16 len;
79 guint16 non_flow_controlled_len;
80 guint16 finish_cb_registered;
81 } InputQueue;
82
83 typedef struct _OverflowQueue
84 {
85 struct iv_list_head items;
86 gint len;
87 gint non_flow_controlled_len;
88 } OverflowQueue;
89
90 typedef struct _LogQueueFifo
91 {
92 LogQueue super;
93
94 /* scalable qoverflow implementation */
95 OverflowQueue output_queue;
96 OverflowQueue wait_queue;
97 OverflowQueue backlog_queue; /* entries that were sent but not acked yet */
98
99 gint log_fifo_size;
100
101 /* legacy: flow-controlled messages are included in the log_fifo_size limit */
102 gboolean use_legacy_fifo_size;
103
104 InputQueue input_queues[0];
105 } LogQueueFifo;
106
107 /* NOTE: this is inherently racy. If the LogQueue->lock is taken, then the
108 * race is limited to the changes in output_queue queue changes.
109 *
110 * In the output thread, this means that this can get race-free. In the
111 * input thread, the output_queue can change because of a
112 * log_queue_fifo_push_head() or log_queue_fifo_rewind_backlog().
113 *
114 */
115
116 static void
iv_list_update_msg_size(LogQueueFifo * self,struct iv_list_head * head)117 iv_list_update_msg_size(LogQueueFifo *self, struct iv_list_head *head)
118 {
119 LogMessage *msg;
120 struct iv_list_head *ilh, *ilh2;
121 iv_list_for_each_safe(ilh, ilh2, head)
122 {
123 msg = iv_list_entry(ilh, LogMessageQueueNode, list)->msg;
124 log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
125 }
126 }
127
128 static gint64
log_queue_fifo_get_length(LogQueue * s)129 log_queue_fifo_get_length(LogQueue *s)
130 {
131 LogQueueFifo *self = (LogQueueFifo *) s;
132
133 return self->wait_queue.len + self->output_queue.len;
134 }
135
136 static gint64
log_queue_fifo_get_non_flow_controlled_length(LogQueueFifo * self)137 log_queue_fifo_get_non_flow_controlled_length(LogQueueFifo *self)
138 {
139 return self->wait_queue.non_flow_controlled_len + self->output_queue.non_flow_controlled_len;
140 }
141
142 gboolean
log_queue_fifo_is_empty_racy(LogQueue * s)143 log_queue_fifo_is_empty_racy(LogQueue *s)
144 {
145 LogQueueFifo *self = (LogQueueFifo *) s;
146 gboolean has_message_in_queue = FALSE;
147 g_mutex_lock(&self->super.lock);
148 if (log_queue_fifo_get_length(s) > 0)
149 {
150 has_message_in_queue = TRUE;
151 }
152 else
153 {
154 gint i;
155 for (i = 0; i < log_queue_max_threads && !has_message_in_queue; i++)
156 {
157 has_message_in_queue |= self->input_queues[i].finish_cb_registered;
158 }
159 }
160 g_mutex_unlock(&self->super.lock);
161 return !has_message_in_queue;
162 }
163
164 /* NOTE: this is inherently racy, can only be called if log processing is suspended (e.g. reload time) */
165 static gboolean
log_queue_fifo_keep_on_reload(LogQueue * s)166 log_queue_fifo_keep_on_reload(LogQueue *s)
167 {
168 LogQueueFifo *self = (LogQueueFifo *) s;
169 return log_queue_fifo_get_length(s) > 0 || self->backlog_queue.len > 0;
170 }
171
172 static inline void
log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo * self,InputQueue * input_queue,gint num_of_messages_to_drop)173 log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo *self, InputQueue *input_queue, gint num_of_messages_to_drop)
174 {
175 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
176
177 struct iv_list_head *item = input_queue->items.next;
178 for (gint dropped = 0; dropped < num_of_messages_to_drop;)
179 {
180 LogMessageQueueNode *node = iv_list_entry(item, LogMessageQueueNode, list);
181
182 item = item->next;
183
184 path_options.ack_needed = node->ack_needed;
185 path_options.flow_control_requested = node->flow_control_requested;
186
187 if (!self->use_legacy_fifo_size && path_options.flow_control_requested)
188 continue;
189
190 iv_list_del(&node->list);
191 input_queue->len--;
192 stats_counter_inc(self->super.dropped_messages);
193 log_msg_free_queue_node(node);
194
195 LogMessage *msg = node->msg;
196 if (path_options.flow_control_requested)
197 log_msg_drop(msg, &path_options, AT_SUSPENDED);
198 else
199 {
200 input_queue->non_flow_controlled_len--;
201 log_msg_drop(msg, &path_options, AT_PROCESSED);
202 }
203
204 dropped++;
205 }
206
207 msg_debug("Destination queue full, dropping messages",
208 evt_tag_int("log_fifo_size", self->log_fifo_size),
209 evt_tag_int("number_of_dropped_messages", num_of_messages_to_drop),
210 evt_tag_str("persist_name", self->super.persist_name));
211 }
212
213 static inline gboolean
log_queue_fifo_calculate_num_of_messages_to_drop(LogQueueFifo * self,InputQueue * input_queue,gint * num_of_messages_to_drop)214 log_queue_fifo_calculate_num_of_messages_to_drop(LogQueueFifo *self, InputQueue *input_queue,
215 gint *num_of_messages_to_drop)
216 {
217 /* since we're in the input thread, queue_len will be racy.
218 * It can increase due to log_queue_fifo_push_head() and can also decrease as
219 * items are removed from the output queue using log_queue_pop_head().
220 *
221 * The only reason we're using it here is to check for qoverflow
222 * overflows, however the only side-effect of the race (if lost) is that
223 * we would lose a couple of message too many or add some more messages to
224 * qoverflow than permitted by the user. Since if flow-control is used,
225 * the fifo size should be sized larger than the potential window sizes,
226 * otherwise we can lose messages anyway, this is not deemed a cost to
227 * justify proper locking in this case.
228 */
229
230 guint16 input_queue_len;
231 gint queue_len;
232
233 if (G_UNLIKELY(self->use_legacy_fifo_size))
234 {
235 queue_len = log_queue_fifo_get_length(&self->super);
236 input_queue_len = input_queue->len;
237 }
238 else
239 {
240 queue_len = log_queue_fifo_get_non_flow_controlled_length(self);
241 input_queue_len = input_queue->non_flow_controlled_len;
242 }
243
244 gboolean drop_messages = queue_len + input_queue_len > self->log_fifo_size;
245 if (!drop_messages)
246 return FALSE;
247
248 /* NOTE: MAX is needed here to ensure that the lost race on queue_len
249 * doesn't result in n < 0 */
250 *num_of_messages_to_drop = input_queue_len - MAX(0, (self->log_fifo_size - queue_len));
251
252 return TRUE;
253 }
254
255 /* move items from the per-thread input queue to the lock-protected "wait" queue */
256 static void
log_queue_fifo_move_input_unlocked(LogQueueFifo * self,gint thread_id)257 log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id)
258 {
259 gint num_of_messages_to_drop;
260 gboolean drop_messages = log_queue_fifo_calculate_num_of_messages_to_drop(self, &self->input_queues[thread_id],
261 &num_of_messages_to_drop);
262
263 if (drop_messages)
264 {
265 /* slow path, the input thread's queue would overflow the queue, let's drop some messages */
266 log_queue_fifo_drop_messages_from_input_queue(self, &self->input_queues[thread_id], num_of_messages_to_drop);
267 }
268
269 log_queue_queued_messages_add(&self->super, self->input_queues[thread_id].len);
270 iv_list_update_msg_size(self, &self->input_queues[thread_id].items);
271
272 iv_list_splice_tail_init(&self->input_queues[thread_id].items, &self->wait_queue.items);
273 self->wait_queue.len += self->input_queues[thread_id].len;
274 self->wait_queue.non_flow_controlled_len += self->input_queues[thread_id].non_flow_controlled_len;
275 self->input_queues[thread_id].len = 0;
276 self->input_queues[thread_id].non_flow_controlled_len = 0;
277 }
278
279 /* move items from the per-thread input queue to the lock-protected
280 * "wait" queue, but grabbing locks first. This is registered as a
281 * callback to be called when the input worker thread finishes its
282 * job.
283 */
284 static gpointer
log_queue_fifo_move_input(gpointer user_data)285 log_queue_fifo_move_input(gpointer user_data)
286 {
287 LogQueueFifo *self = (LogQueueFifo *) user_data;
288 gint thread_id;
289
290 thread_id = main_loop_worker_get_thread_id();
291
292 g_assert(thread_id >= 0);
293
294 g_mutex_lock(&self->super.lock);
295 log_queue_fifo_move_input_unlocked(self, thread_id);
296 log_queue_push_notify(&self->super);
297 g_mutex_unlock(&self->super.lock);
298 self->input_queues[thread_id].finish_cb_registered = FALSE;
299 log_queue_unref(&self->super);
300 return NULL;
301 }
302
303 /* lock must be held */
304 static inline gboolean
_message_has_to_be_dropped(LogQueueFifo * self,const LogPathOptions * path_options)305 _message_has_to_be_dropped(LogQueueFifo *self, const LogPathOptions *path_options)
306 {
307 if (G_UNLIKELY(self->use_legacy_fifo_size))
308 return log_queue_fifo_get_length(&self->super) >= self->log_fifo_size;
309
310 return !path_options->flow_control_requested
311 && log_queue_fifo_get_non_flow_controlled_length(self) >= self->log_fifo_size;
312 }
313
314 static inline void
_drop_message(LogMessage * msg,const LogPathOptions * path_options)315 _drop_message(LogMessage *msg, const LogPathOptions *path_options)
316 {
317 if (path_options->flow_control_requested)
318 {
319 log_msg_drop(msg, path_options, AT_SUSPENDED);
320 return;
321 }
322
323 log_msg_drop(msg, path_options, AT_PROCESSED);
324 }
325
326 /**
327 * Assumed to be called from one of the input threads. If the thread_id
328 * cannot be determined, the item is put directly in the wait queue.
329 *
330 * Puts the message to the queue, and logs an error if it caused the
331 * queue to be full.
332 *
333 * It attempts to put the item to the per-thread input queue.
334 *
335 * NOTE: It consumes the reference passed by the caller.
336 **/
337 static void
log_queue_fifo_push_tail(LogQueue * s,LogMessage * msg,const LogPathOptions * path_options)338 log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
339 {
340 LogQueueFifo *self = (LogQueueFifo *) s;
341 gint thread_id;
342 LogMessageQueueNode *node;
343
344 thread_id = main_loop_worker_get_thread_id();
345
346 g_assert(thread_id < 0 || log_queue_max_threads > thread_id);
347
348 /* NOTE: we don't use high-water marks for now, as log_fetch_limit
349 * limits the number of items placed on the per-thread input queue
350 * anyway, and any sane number decreased the performance measurably.
351 *
352 * This means that per-thread input queues contain _all_ items that
353 * a single poll iteration produces. And once the reader is finished
354 * (either because the input is depleted or because of
355 * log_fetch_limit / window_size) the whole bunch is propagated to
356 * the "wait" queue.
357 */
358
359 if (thread_id >= 0)
360 {
361 /* fastpath, use per-thread input FIFOs */
362 if (!self->input_queues[thread_id].finish_cb_registered)
363 {
364 /* this is the first item in the input FIFO, register a finish
365 * callback to make sure it gets moved to the wait_queue if the
366 * input thread finishes
367 * One reference should be held, while the callback is registered
368 * avoiding use-after-free situation
369 */
370
371 main_loop_worker_register_batch_callback(&self->input_queues[thread_id].cb);
372 self->input_queues[thread_id].finish_cb_registered = TRUE;
373 log_queue_ref(&self->super);
374 }
375
376 node = log_msg_alloc_queue_node(msg, path_options);
377 iv_list_add_tail(&node->list, &self->input_queues[thread_id].items);
378 self->input_queues[thread_id].len++;
379
380 if (!path_options->flow_control_requested)
381 self->input_queues[thread_id].non_flow_controlled_len++;
382
383 log_msg_unref(msg);
384 return;
385 }
386
387 /* slow path, put the pending item and the whole input queue to the wait_queue */
388
389 g_mutex_lock(&self->super.lock);
390
391 if (_message_has_to_be_dropped(self, path_options))
392 {
393 stats_counter_inc(self->super.dropped_messages);
394 g_mutex_unlock(&self->super.lock);
395
396 _drop_message(msg, path_options);
397
398 msg_debug("Destination queue full, dropping message",
399 evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
400 evt_tag_int("log_fifo_size", self->log_fifo_size),
401 evt_tag_str("persist_name", self->super.persist_name));
402 return;
403 }
404
405 node = log_msg_alloc_queue_node(msg, path_options);
406
407 iv_list_add_tail(&node->list, &self->wait_queue.items);
408 self->wait_queue.len++;
409
410 if (!path_options->flow_control_requested)
411 self->wait_queue.non_flow_controlled_len++;
412
413 log_queue_push_notify(&self->super);
414 log_queue_queued_messages_inc(&self->super);
415
416 log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
417 g_mutex_unlock(&self->super.lock);
418
419 log_msg_unref(msg);
420 }
421
422 /*
423 * Put an item back to the front of the queue.
424 *
425 * This is assumed to be called only from the output thread.
426 *
427 * NOTE: It consumes the reference passed by the caller.
428 */
429 static void
log_queue_fifo_push_head(LogQueue * s,LogMessage * msg,const LogPathOptions * path_options)430 log_queue_fifo_push_head(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
431 {
432 LogQueueFifo *self = (LogQueueFifo *) s;
433 LogMessageQueueNode *node;
434
435 /* we don't check limits when putting items "in-front", as it
436 * normally happens when we start processing an item, but at the end
437 * can't deliver it. No checks, no drops either. */
438
439 node = log_msg_alloc_dynamic_queue_node(msg, path_options);
440 iv_list_add(&node->list, &self->output_queue.items);
441 self->output_queue.len++;
442
443 if (!path_options->flow_control_requested)
444 self->output_queue.non_flow_controlled_len++;
445
446 log_msg_unref(msg);
447
448 log_queue_queued_messages_inc(&self->super);
449 log_queue_memory_usage_add(&self->super, log_msg_get_size(msg));
450 }
451
452 /*
453 * Can only run from the output thread.
454 *
455 * NOTE: this returns a reference which the caller must take care to free.
456 */
457 static LogMessage *
log_queue_fifo_pop_head(LogQueue * s,LogPathOptions * path_options)458 log_queue_fifo_pop_head(LogQueue *s, LogPathOptions *path_options)
459 {
460 LogQueueFifo *self = (LogQueueFifo *) s;
461 LogMessageQueueNode *node;
462 LogMessage *msg = NULL;
463
464 if (self->output_queue.len == 0)
465 {
466 /* slow path, output queue is empty, get some elements from the wait queue */
467 g_mutex_lock(&self->super.lock);
468 iv_list_splice_tail_init(&self->wait_queue.items, &self->output_queue.items);
469 self->output_queue.len = self->wait_queue.len;
470 self->output_queue.non_flow_controlled_len = self->wait_queue.non_flow_controlled_len;
471 self->wait_queue.len = 0;
472 self->wait_queue.non_flow_controlled_len = 0;
473 g_mutex_unlock(&self->super.lock);
474 }
475
476 if (self->output_queue.len > 0)
477 {
478 node = iv_list_entry(self->output_queue.items.next, LogMessageQueueNode, list);
479
480 msg = node->msg;
481 path_options->ack_needed = node->ack_needed;
482 self->output_queue.len--;
483
484 if (!node->flow_control_requested)
485 self->output_queue.non_flow_controlled_len--;
486
487 if (!self->super.use_backlog)
488 {
489 iv_list_del(&node->list);
490 log_msg_free_queue_node(node);
491 }
492 else
493 {
494 iv_list_del_init(&node->list);
495 }
496 }
497 else
498 {
499 /* no items either on the wait queue nor the output queue.
500 *
501 * NOTE: the input queues may contain items even in this case,
502 * however we don't touch them here, they'll be migrated to the
503 * wait_queue once the input threads finish their processing (or
504 * the high watermark is reached). Also, they are unlocked, so
505 * no way to touch them safely.
506 */
507 return NULL;
508 }
509 log_queue_queued_messages_dec(&self->super);
510 log_queue_memory_usage_sub(&self->super, log_msg_get_size(msg));
511
512 if (self->super.use_backlog)
513 {
514 log_msg_ref(msg);
515 iv_list_add_tail(&node->list, &self->backlog_queue.items);
516 self->backlog_queue.len++;
517
518 if (!node->flow_control_requested)
519 self->backlog_queue.non_flow_controlled_len++;
520 }
521
522 return msg;
523 }
524
525 /*
526 * Can only run from the output thread.
527 */
528 static void
log_queue_fifo_ack_backlog(LogQueue * s,gint rewind_count)529 log_queue_fifo_ack_backlog(LogQueue *s, gint rewind_count)
530 {
531 LogQueueFifo *self = (LogQueueFifo *) s;
532 LogMessage *msg;
533 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
534 gint pos;
535
536 for (pos = 0; pos < rewind_count && self->backlog_queue.len > 0; pos++)
537 {
538 LogMessageQueueNode *node;
539 node = iv_list_entry(self->backlog_queue.items.next, LogMessageQueueNode, list);
540 msg = node->msg;
541
542 iv_list_del(&node->list);
543 self->backlog_queue.len--;
544
545 if (!node->flow_control_requested)
546 self->backlog_queue.non_flow_controlled_len--;
547
548 path_options.ack_needed = node->ack_needed;
549 log_msg_ack(msg, &path_options, AT_PROCESSED);
550 log_msg_free_queue_node(node);
551 log_msg_unref(msg);
552 }
553 }
554
555
556 /*
557 * log_queue_rewind_backlog_all:
558 *
559 * Move items on our backlog back to our qoverflow queue. Please note that this
560 * function does not really care about qoverflow size, it has to put the backlog
561 * somewhere. The backlog is emptied as that will be filled if we send the
562 * items again.
563 *
564 * NOTE: this is assumed to be called from the output thread.
565 */
566 static void
log_queue_fifo_rewind_backlog_all(LogQueue * s)567 log_queue_fifo_rewind_backlog_all(LogQueue *s)
568 {
569 LogQueueFifo *self = (LogQueueFifo *) s;
570
571 iv_list_update_msg_size(self, &self->backlog_queue.items);
572 iv_list_splice_tail_init(&self->backlog_queue.items, &self->output_queue.items);
573
574 self->output_queue.len += self->backlog_queue.len;
575 self->output_queue.non_flow_controlled_len += self->backlog_queue.non_flow_controlled_len;
576 log_queue_queued_messages_add(&self->super, self->backlog_queue.len);
577 self->backlog_queue.len = 0;
578 self->backlog_queue.non_flow_controlled_len = 0;
579 }
580
581 static void
log_queue_fifo_rewind_backlog(LogQueue * s,guint rewind_count)582 log_queue_fifo_rewind_backlog(LogQueue *s, guint rewind_count)
583 {
584 LogQueueFifo *self = (LogQueueFifo *) s;
585 guint pos;
586
587 if (rewind_count > self->backlog_queue.len)
588 rewind_count = self->backlog_queue.len;
589
590 for (pos = 0; pos < rewind_count; pos++)
591 {
592 LogMessageQueueNode *node = iv_list_entry(self->backlog_queue.items.prev, LogMessageQueueNode, list);
593 /*
594 * Because the message go to the backlog only in case of pop_head
595 * and pop_head add ack and ref when it pushes the message into the backlog
596 * The rewind must decrease the ack and ref too
597 */
598 iv_list_del_init(&node->list);
599 iv_list_add(&node->list, &self->output_queue.items);
600
601 self->backlog_queue.len--;
602 self->output_queue.len++;
603
604 if (!node->flow_control_requested)
605 {
606 self->backlog_queue.non_flow_controlled_len--;
607 self->output_queue.non_flow_controlled_len++;
608 }
609
610 log_queue_queued_messages_inc(&self->super);
611 log_queue_memory_usage_add(&self->super, log_msg_get_size(node->msg));
612 }
613 }
614
615 static void
log_queue_fifo_free_queue(struct iv_list_head * q)616 log_queue_fifo_free_queue(struct iv_list_head *q)
617 {
618 while (!iv_list_empty(q))
619 {
620 LogMessageQueueNode *node;
621 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
622 LogMessage *msg;
623
624 node = iv_list_entry(q->next, LogMessageQueueNode, list);
625 iv_list_del(&node->list);
626
627 path_options.ack_needed = node->ack_needed;
628 msg = node->msg;
629 log_msg_free_queue_node(node);
630 log_msg_ack(msg, &path_options, AT_ABORTED);
631 log_msg_unref(msg);
632 }
633 }
634
635 static void
log_queue_fifo_free(LogQueue * s)636 log_queue_fifo_free(LogQueue *s)
637 {
638 LogQueueFifo *self = (LogQueueFifo *) s;
639 gint i;
640
641 for (i = 0; i < log_queue_max_threads; i++)
642 {
643 g_assert(self->input_queues[i].finish_cb_registered == FALSE);
644 log_queue_fifo_free_queue(&self->input_queues[i].items);
645 }
646
647 log_queue_fifo_free_queue(&self->wait_queue.items);
648 log_queue_fifo_free_queue(&self->output_queue.items);
649 log_queue_fifo_free_queue(&self->backlog_queue.items);
650 log_queue_free_method(s);
651 }
652
653 LogQueue *
log_queue_fifo_new(gint log_fifo_size,const gchar * persist_name)654 log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name)
655 {
656 LogQueueFifo *self;
657 gint i;
658
659 self = g_malloc0(sizeof(LogQueueFifo) + log_queue_max_threads * sizeof(self->input_queues[0]));
660
661 log_queue_init_instance(&self->super, persist_name);
662 self->super.type = log_queue_fifo_type;
663 self->super.use_backlog = FALSE;
664 self->super.get_length = log_queue_fifo_get_length;
665 self->super.is_empty_racy = log_queue_fifo_is_empty_racy;
666 self->super.keep_on_reload = log_queue_fifo_keep_on_reload;
667 self->super.push_tail = log_queue_fifo_push_tail;
668 self->super.push_head = log_queue_fifo_push_head;
669 self->super.pop_head = log_queue_fifo_pop_head;
670 self->super.ack_backlog = log_queue_fifo_ack_backlog;
671 self->super.rewind_backlog = log_queue_fifo_rewind_backlog;
672 self->super.rewind_backlog_all = log_queue_fifo_rewind_backlog_all;
673
674 self->super.free_fn = log_queue_fifo_free;
675
676 for (i = 0; i < log_queue_max_threads; i++)
677 {
678 INIT_IV_LIST_HEAD(&self->input_queues[i].items);
679 worker_batch_callback_init(&self->input_queues[i].cb);
680 self->input_queues[i].cb.func = log_queue_fifo_move_input;
681 self->input_queues[i].cb.user_data = self;
682 }
683 INIT_IV_LIST_HEAD(&self->wait_queue.items);
684 INIT_IV_LIST_HEAD(&self->output_queue.items);
685 INIT_IV_LIST_HEAD(&self->backlog_queue.items);
686
687 self->log_fifo_size = log_fifo_size;
688 return &self->super;
689 }
690
691 LogQueue *
log_queue_fifo_legacy_new(gint log_fifo_size,const gchar * persist_name)692 log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name)
693 {
694 LogQueueFifo *self = (LogQueueFifo *) log_queue_fifo_new(log_fifo_size, persist_name);
695 self->use_legacy_fifo_size = TRUE;
696 return &self->super;
697 }
698
699 QueueType
log_queue_fifo_get_type(void)700 log_queue_fifo_get_type(void)
701 {
702 return log_queue_fifo_type;
703 }
704